aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-28 16:24:50 -0700
committerGraydon Hoare <[email protected]>2010-07-28 20:30:29 -0700
commit4ff8e15128f90d4e9e57897c48280c6f82bb8343 (patch)
tree86c3808e729b4f596c3c23e228738d3d25b108bc /src
parentRename rust_proxy_delegate to maybe_proxy, flesh out logic in it. Add strong-... (diff)
downloadrust-4ff8e15128f90d4e9e57897c48280c6f82bb8343.tar.xz
rust-4ff8e15128f90d4e9e57897c48280c6f82bb8343.zip
Move notification-messages out into their own file and unify into notify_message, make them use proxies, cache task proxies in dom.
Diffstat (limited to 'src')
-rw-r--r--src/Makefile2
-rw-r--r--src/rt/rust_dom.cpp68
-rw-r--r--src/rt/rust_dom.h22
-rw-r--r--src/rt/rust_message.cpp75
-rw-r--r--src/rt/rust_message.h72
-rw-r--r--src/rt/rust_task.cpp18
-rw-r--r--src/rt/rust_task.h8
-rw-r--r--src/rt/rust_upcall.cpp46
8 files changed, 246 insertions, 65 deletions
diff --git a/src/Makefile b/src/Makefile
index e923af02..9bd6ed3c 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -259,6 +259,7 @@ RUNTIME_CS := rt/sync/spin_lock.cpp \
rt/rust_chan.cpp \
rt/rust_upcall.cpp \
rt/rust_log.cpp \
+ rt/rust_message.cpp \
rt/rust_timer.cpp \
rt/circular_buffer.cpp \
rt/isaac/randport.cpp
@@ -272,6 +273,7 @@ RUNTIME_HDR := rt/globals.h \
rt/rust_dom.h \
rt/rust_task.h \
rt/rust_proxy.h \
+ rt/rust_message.h \
rt/circular_buffer.h
RUNTIME_INCS := -Irt/isaac -Irt/uthash
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index 17cb1498..4d1917f4 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -4,23 +4,6 @@
template class ptr_vec<rust_task>;
-rust_message::rust_message(rust_dom *dom) : dom(dom) {
-
-}
-
-void rust_message::process() {
-
-}
-
-kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) :
- rust_message(dom), _task(task) {
-
-}
-
-void kill_task_message::process() {
- _task->ref_count--;
- _task->kill();
-}
rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) :
interrupt_flag(0),
@@ -54,7 +37,23 @@ del_all_tasks(rust_dom *dom, ptr_vec<rust_task> *v) {
}
}
+void
+rust_dom::delete_proxies() {
+ rust_task *task;
+ rust_proxy<rust_task> *task_proxy;
+ while (_task_proxies.pop(&task, &task_proxy)) {
+ log(rust_log::TASK, "deleting proxy %" PRIxPTR
+ " in dom %" PRIxPTR, task_proxy, task_proxy->dom);
+ delete task_proxy;
+ }
+}
+
rust_dom::~rust_dom() {
+ log(rust_log::MEM | rust_log::DOM,
+ "~rust_dom 0x%" PRIxPTR, (uintptr_t)this);
+
+ log(rust_log::TASK, "deleting all proxies");
+ delete_proxies();
log(rust_log::TASK, "deleting all running tasks");
del_all_tasks(this, &running_tasks);
log(rust_log::TASK, "deleting all blocked tasks");
@@ -126,8 +125,8 @@ void *
rust_dom::malloc(size_t sz) {
void *p = srv->malloc(sz);
I(this, p);
- log(rust_log::MEM, "rust_dom::malloc(%d) -> 0x%" PRIxPTR,
- sz, p);
+ log(rust_log::MEM, "0x%" PRIxPTR " rust_dom::malloc(%d) -> 0x%" PRIxPTR,
+ (uintptr_t) this, sz, p);
return p;
}
@@ -219,6 +218,8 @@ rust_dom::reap_dead_tasks() {
rust_task *task = dead_tasks[i];
if (task->ref_count == 0) {
I(this, !task->waiting_tasks.length());
+ I(this, task->tasks_waiting_to_join.is_empty());
+
dead_tasks.swap_delete(task);
log(rust_log::TASK,
"deleting unreferenced dead task 0x%" PRIxPTR, task);
@@ -229,18 +230,20 @@ rust_dom::reap_dead_tasks() {
}
}
-
/**
* Enqueues a message in this domain's incoming message queue. It's the
* responsibility of the receiver to free the message once it's processed.
*/
void rust_dom::send_message(rust_message *message) {
- log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR
+ log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR
" in queue 0x%" PRIxPTR,
+ message->label,
message,
&_incoming_message_queue);
+ A(this, message->dom == this, "Message owned by non-local domain.");
_incoming_message_queue.enqueue(message);
_incoming_message_pending.signal();
+ _progress.signal();
}
/**
@@ -249,17 +252,24 @@ void rust_dom::send_message(rust_message *message) {
void rust_dom::drain_incoming_message_queue() {
rust_message *message;
while ((message = (rust_message *) _incoming_message_queue.dequeue())) {
- log(rust_log::COMM, "read 0x%" PRIxPTR
- " from queue 0x%" PRIxPTR,
- message,
- &_incoming_message_queue);
- log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR,
- message);
+ log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%"
+ PRIxPTR, message->label, message);
message->process();
delete message;
}
}
+rust_proxy<rust_task> *
+rust_dom::get_task_proxy(rust_task *task) {
+ rust_proxy<rust_task> *proxy = NULL;
+ if (_task_proxies.get(task, &proxy)) {
+ return proxy;
+ }
+ log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, task);
+ proxy = new (this) rust_proxy<rust_task> (this, task, false);
+ _task_proxies.put(task, proxy);
+ return proxy;
+}
/**
* Schedules a running task for execution. Only running tasks can be
* activated. Blocked tasks have to be unblocked before they can be
@@ -324,6 +334,8 @@ rust_dom::start_main_loop()
logptr("exit-task glue", root_crate->get_exit_task_glue());
while (n_live_tasks() > 0) {
+ drain_incoming_message_queue();
+
rust_task *scheduled_task = schedule_task();
// If we cannot schedule a task because all other live tasks
@@ -362,8 +374,6 @@ rust_dom::start_main_loop()
(uintptr_t) &scheduled_task->stk->data[0]);
I(this, scheduled_task->rust_sp < scheduled_task->stk->limit);
- drain_incoming_message_queue();
-
reap_dead_tasks();
}
diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h
index 8247cbbd..2f162972 100644
--- a/src/rt/rust_dom.h
+++ b/src/rt/rust_dom.h
@@ -6,22 +6,10 @@
#define RUST_DOM_H
#include "sync/lock_free_queue.h"
+#include "util/hash_map.h"
-class rust_message : public lock_free_queue_node,
- public dom_owned<rust_message> {
-public:
- rust_dom *dom;
- rust_message(rust_dom *dom);
- virtual ~rust_message() {}
- virtual void process();
-};
-
-class kill_task_message : public rust_message {
- rust_task *_task;
-public:
- kill_task_message(rust_dom *dom, rust_task *task);
- void process();
-};
+#include "rust_proxy.h"
+#include "rust_message.h"
struct rust_dom
{
@@ -48,6 +36,8 @@ struct rust_dom
condition_variable _progress;
+ hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies;
+
// Incoming messages from other domains.
condition_variable _incoming_message_pending;
lock_free_queue _incoming_message_queue;
@@ -74,6 +64,8 @@ struct rust_dom
void send_message(rust_message *message);
void drain_incoming_message_queue();
+ rust_proxy<rust_task> *get_task_proxy(rust_task *task);
+ void delete_proxies();
#ifdef __WIN32__
void win32_require(LPCTSTR fn, BOOL ok);
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
new file mode 100644
index 00000000..22f8ffe5
--- /dev/null
+++ b/src/rt/rust_message.cpp
@@ -0,0 +1,75 @@
+#include "rust_internal.h"
+#include "rust_message.h"
+
+rust_message::
+rust_message(const char* label, rust_task *source, rust_task *target) :
+ dom(target->dom), label(label),
+ _source(source),
+ _target(target) {
+}
+
+rust_message::~rust_message() {
+}
+
+void rust_message::process() {
+ I(dom, false);
+}
+
+rust_proxy<rust_task> *
+rust_message::get_source_proxy() {
+ return dom->get_task_proxy(_source);
+}
+
+notify_message::
+notify_message(notification_type type, const char* label,
+ rust_task *source,
+ rust_task *target) :
+ rust_message(label, source, target), type(type) {
+}
+
+/**
+ * Sends a message to the target task via a proxy. The message is allocated
+ * in the target task domain along with a proxy which points back to the
+ * source task.
+ */
+void notify_message::
+send(notification_type type, const char* label, rust_task *source,
+ rust_proxy<rust_task> *target) {
+ rust_task *target_task = target->delegate();
+ rust_dom *target_domain = target_task->dom;
+ notify_message *message = new (target_domain)
+ notify_message(type, label, source, target_task);
+ target_domain->send_message(message);
+}
+
+void notify_message::process() {
+ rust_task *task = _target;
+ switch (type) {
+ case KILL:
+ task->ref_count--;
+ task->kill();
+ break;
+ case JOIN: {
+ if (task->dead() == false) {
+ task->tasks_waiting_to_join.append(get_source_proxy());
+ } else {
+ send(WAKEUP, "wakeup", task, get_source_proxy());
+ }
+ break;
+ }
+ case WAKEUP:
+ task->wakeup(get_source_proxy()->delegate());
+ break;
+ }
+}
+
+//
+// Local Variables:
+// mode: C++
+// fill-column: 78;
+// indent-tabs-mode: nil
+// c-basic-offset: 4
+// buffer-file-coding-system: utf-8-unix
+// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
+// End:
+//
diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h
new file mode 100644
index 00000000..f0c6bc0a
--- /dev/null
+++ b/src/rt/rust_message.h
@@ -0,0 +1,72 @@
+#ifndef RUST_MESSAGE_H
+#define RUST_MESSAGE_H
+
+/**
+ * Rust messages are used for inter-thread communication. They are enqueued
+ * and allocated in the target domain.
+ */
+
+/**
+ * Abstract base class for all message types.
+ */
+class rust_message : public lock_free_queue_node,
+ public dom_owned<rust_message> {
+public:
+ rust_dom *dom;
+ const char* label;
+private:
+ rust_task *_source;
+protected:
+ rust_task *_target;
+public:
+ rust_message(const char* label, rust_task *source, rust_task *target);
+ virtual ~rust_message();
+
+ /**
+ * We can only access the source task through a proxy, so create one
+ * on demand if we need it.
+ */
+ rust_proxy<rust_task> *get_source_proxy();
+
+ /**
+ * Processes the message in the target domain thread.
+ */
+ virtual void process();
+};
+
+/**
+ * Notify messages are simple argument-less messages.
+ */
+class notify_message : public rust_message {
+public:
+ enum notification_type {
+ KILL, JOIN, WAKEUP
+ };
+
+ const notification_type type;
+
+ notify_message(notification_type type, const char* label,
+ rust_task *source, rust_task *target);
+
+ void process();
+
+ /**
+ * This code executes in the sending domain's thread.
+ */
+ static void
+ send(notification_type type, const char* label, rust_task *source,
+ rust_proxy<rust_task> *target);
+};
+
+//
+// Local Variables:
+// mode: C++
+// fill-column: 78;
+// indent-tabs-mode: nil
+// c-basic-offset: 4
+// buffer-file-coding-system: utf-8-unix
+// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
+// End:
+//
+
+#endif /* RUST_MESSAGE_H */
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index 189824ac..b3caac25 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -385,6 +385,24 @@ rust_task::notify_waiting_tasks()
}
}
+void
+rust_task::notify_tasks_waiting_to_join() {
+ while (tasks_waiting_to_join.is_empty() == false) {
+ log(rust_log::ALL, "notify_tasks_waiting_to_join: %d",
+ tasks_waiting_to_join.size());
+ maybe_proxy<rust_task> *waiting_task = tasks_waiting_to_join.pop();
+ if (waiting_task->is_proxy()) {
+ notify_message::send(notify_message::WAKEUP, "wakeup",
+ this, waiting_task->as_proxy());
+ } else {
+ rust_task *task = waiting_task->delegate();
+ if (task->dead() == false) {
+ task->wakeup(this);
+ }
+ }
+ }
+}
+
uintptr_t
rust_task::get_fp() {
// sp in any suspended task points to the last callee-saved reg on
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
index 0c723a9d..34553b6c 100644
--- a/src/rt/rust_task.h
+++ b/src/rt/rust_task.h
@@ -4,6 +4,10 @@
#ifndef RUST_TASK_H
#define RUST_TASK_H
+
+#include "util/array_list.h"
+
+
struct
rust_task : public maybe_proxy<rust_task>,
public dom_owned<rust_task>
@@ -35,6 +39,9 @@ rust_task : public maybe_proxy<rust_task>,
// that location before waking us up.
uintptr_t* rendezvous_ptr;
+ // List of tasks waiting for this task to finish.
+ array_list<maybe_proxy<rust_task> *> tasks_waiting_to_join;
+
rust_alarm alarm;
rust_task(rust_dom *dom,
@@ -95,6 +102,7 @@ rust_task : public maybe_proxy<rust_task>,
// Notify tasks waiting for us that we are about to die.
void notify_waiting_tasks();
+ void notify_tasks_waiting_to_join();
uintptr_t get_fp();
uintptr_t get_previous_fp(uintptr_t fp);
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index e4604049..42203bec 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -122,19 +122,25 @@ extern "C" CDECL void upcall_yield(rust_task *task) {
}
extern "C" CDECL void
-upcall_join(rust_task *task, maybe_proxy<rust_task> *proxy) {
+upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::COMM,
- "join proxy 0x%" PRIxPTR " -> task = 0x%" PRIxPTR,
- proxy, proxy->delegate());
-
- rust_task *other = proxy->delegate();
-
- // If the other task is already dying, we don't have to wait for it.
- if (!other->dead()) {
- other->waiting_tasks.push(&task->alarm);
- task->block(other);
+ "target: 0x%" PRIxPTR ", task: 0x%" PRIxPTR,
+ target, target->delegate());
+
+ rust_task *target_task = target->delegate();
+ if (target->is_proxy()) {
+ notify_message::
+ send(notify_message::JOIN, "join", task, target->as_proxy());
+ task->block(target_task);
task->yield(2);
+ } else {
+ // If the other task is already dying, we don't have to wait for it.
+ if (target_task->dead() == false) {
+ target_task->tasks_waiting_to_join.push(task);
+ task->block(target_task);
+ task->yield(2);
+ }
}
}
@@ -194,22 +200,20 @@ extern "C" CDECL void upcall_fail(rust_task *task, char const *expr,
* Called whenever a task's ref count drops to zero.
*/
extern "C" CDECL void
-upcall_kill(rust_task *task, maybe_proxy<rust_task> *target_proxy) {
+upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
- rust_task *target_task = target_proxy->delegate();
- if (target_proxy != target_task) {
- task->dom->free(target_proxy);
- }
+ rust_task *target_task = target->delegate();
+
task->log(rust_log::UPCALL | rust_log::TASK,
"kill task 0x%" PRIxPTR ", ref count %d",
target_task,
target_task->ref_count);
- if (requires_message_passing(task, target_task)) {
- rust_dom *target_domain = target_task->dom;
- target_domain->send_message(
- new (target_domain)
- kill_task_message(target_domain, target_task));
+ if (target->is_proxy()) {
+ notify_message::
+ send(notify_message::KILL, "kill", task, target->as_proxy());
+ // The proxy ref_count dropped to zero, delete it here.
+ delete target->as_proxy();
} else {
target_task->kill();
}
@@ -224,7 +228,7 @@ upcall_exit(rust_task *task) {
task->log(rust_log::UPCALL | rust_log::TASK,
"task ref_count: %d", task->ref_count);
task->die();
- task->notify_waiting_tasks();
+ task->notify_tasks_waiting_to_join();
task->yield(1);
}