aboutsummaryrefslogtreecommitdiff
path: root/src/rt/rust_dom.cpp
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-19 14:05:18 -0700
committerMichael Bebenita <[email protected]>2010-07-19 14:05:18 -0700
commit00d1465d13980fc3acf650f182ee0723fbda0e06 (patch)
treea73cf5f0f20c0bee6722b33d975eb930919fefdf /src/rt/rust_dom.cpp
parentAdd a test for an obvious-seeming (but not actually legal) kind of cast attem... (diff)
downloadrust-00d1465d13980fc3acf650f182ee0723fbda0e06.tar.xz
rust-00d1465d13980fc3acf650f182ee0723fbda0e06.zip
Added a message passing system based on lock free queues for inter-thread communication. Channels now buffer on the sending side, and no longer require blocking when sending. Lots of other refactoring and bug fixes.
Diffstat (limited to 'src/rt/rust_dom.cpp')
-rw-r--r--src/rt/rust_dom.cpp175
1 files changed, 162 insertions, 13 deletions
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index 3b5e23b2..39124491 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -4,6 +4,24 @@
template class ptr_vec<rust_task>;
+rust_message::rust_message(rust_dom *dom) : dom(dom) {
+
+}
+
+void rust_message::process() {
+
+}
+
+kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) :
+ rust_message(dom), _task(task) {
+
+}
+
+void kill_task_message::process() {
+ _task->ref_count--;
+ _task->kill();
+}
+
rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) :
interrupt_flag(0),
root_crate(root_crate),
@@ -81,13 +99,25 @@ rust_dom::activate(rust_task *task) {
}
void
+rust_dom::log(rust_task *task, uint32_t type_bits, char const *fmt, ...) {
+ char buf[256];
+ if (_log.is_tracing(type_bits)) {
+ va_list args;
+ va_start(args, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, args);
+ _log.trace_ln(task, type_bits, buf);
+ va_end(args);
+ }
+}
+
+void
rust_dom::log(uint32_t type_bits, char const *fmt, ...) {
char buf[256];
if (_log.is_tracing(type_bits)) {
va_list args;
va_start(args, fmt);
vsnprintf(buf, sizeof(buf), fmt, args);
- _log.trace_ln(type_bits, buf);
+ _log.trace_ln(NULL, type_bits, buf);
va_end(args);
}
}
@@ -189,7 +219,7 @@ rust_dom::remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task)
"removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR,
(uintptr_t)task, state_vec_name(v), (uintptr_t)v);
I(this, (*v)[task->idx] == task);
- v->swapdel(task);
+ v->swap_delete(task);
}
const char *
@@ -203,25 +233,67 @@ rust_dom::state_vec_name(ptr_vec<rust_task> *v)
return "dead";
}
+/**
+ * Delete any dead tasks.
+ */
void
-rust_dom::reap_dead_tasks()
-{
+rust_dom::reap_dead_tasks() {
for (size_t i = 0; i < dead_tasks.length(); ) {
- rust_task *t = dead_tasks[i];
- if (t == root_task || t->refcnt == 0) {
- I(this, !t->waiting_tasks.length());
- dead_tasks.swapdel(t);
+ rust_task *task = dead_tasks[i];
+// log(rust_log::TASK, "dead task 0x%" PRIxPTR " with ref_count: %d",
+// task, task->ref_count);
+ if (task->ref_count == 0) {
+ I(this, !task->waiting_tasks.length());
+ dead_tasks.swap_delete(task);
log(rust_log::TASK,
- "deleting unreferenced dead task 0x%" PRIxPTR, t);
- delete t;
+ "deleting unreferenced dead task 0x%" PRIxPTR, task);
+ delete task;
continue;
}
++i;
}
}
+
+/**
+ * Enqueues a message in this domain's incoming message queue. It's the
+ * responsibility of the receiver to free the message once it's processed.
+ */
+void rust_dom::send_message(rust_message *message) {
+ log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR
+ " in queue 0x%" PRIxPTR,
+ message,
+ &_incoming_message_queue);
+ _incoming_message_queue.enqueue(message);
+ _incoming_message_pending.signal();
+}
+
+/**
+ * Drains and processes incoming pending messages.
+ */
+void rust_dom::drain_incoming_message_queue() {
+ rust_message *message;
+ while ((message = (rust_message *) _incoming_message_queue.dequeue())) {
+ log(rust_log::COMM, "read 0x%" PRIxPTR
+ " from queue 0x%" PRIxPTR,
+ message,
+ &_incoming_message_queue);
+ log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR,
+ message);
+ message->process();
+ delete message;
+ }
+}
+
+/**
+ * Schedules a running task for execution. Only running tasks can be
+ * activated. Blocked tasks have to be unblocked before they can be
+ * activated.
+ *
+ * Returns NULL if no tasks can be scheduled.
+ */
rust_task *
-rust_dom::sched()
+rust_dom::schedule_task()
{
I(this, this);
// FIXME: in the face of failing tasks, this is not always right.
@@ -231,11 +303,88 @@ rust_dom::sched()
i %= running_tasks.length();
return (rust_task *)running_tasks[i];
}
- log(rust_log::DOM|rust_log::TASK,
- "no schedulable tasks");
+ // log(rust_log::DOM|rust_log::TASK, "no schedulable tasks");
return NULL;
}
+/**
+ * Starts the main scheduler loop which performs task scheduling for this
+ * domain.
+ *
+ * Returns once no more tasks can be scheduled.
+ */
+int
+rust_dom::start_main_loop()
+{
+ // Make sure someone is watching, to pull us out of infinite loops.
+ rust_timer timer(this);
+
+ log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this);
+ logptr("exit-task glue", root_crate->get_exit_task_glue());
+
+ while (n_live_tasks() > 0) {
+ rust_task *scheduled_task = schedule_task();
+
+ // If we cannot schedule a task because all other live tasks
+ // are blocked, wait on a condition variable which is signaled
+ // if progress is made in other domains.
+
+ if (scheduled_task == NULL) {
+ log(rust_log::TASK,
+ "all tasks are blocked, waiting for progress ...");
+ _progress.wait();
+ continue;
+ }
+
+ I(this, scheduled_task->running());
+
+ log(rust_log::TASK,
+ "activating task 0x%" PRIxPTR ", sp=x%" PRIxPTR,
+ (uintptr_t)scheduled_task, scheduled_task->rust_sp);
+
+ interrupt_flag = 0;
+
+ activate(scheduled_task);
+
+ log(rust_log::TASK,
+ "returned from task 0x%" PRIxPTR
+ " in state '%s', sp=0x%" PRIxPTR,
+ (uintptr_t)scheduled_task,
+ state_vec_name(scheduled_task->state),
+ scheduled_task->rust_sp);
+
+ I(this, scheduled_task->rust_sp >=
+ (uintptr_t) &scheduled_task->stk->data[0]);
+ I(this, scheduled_task->rust_sp < scheduled_task->stk->limit);
+
+ drain_incoming_message_queue();
+
+ reap_dead_tasks();
+ }
+
+ log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
+
+ while (dead_tasks.length() > 0) {
+ log(rust_log::DOM,
+ "waiting for %d dead tasks to become dereferenced ...",
+ dead_tasks.length());
+
+ log(rust_log::DOM,
+ "waiting for %" PRIxPTR, dead_tasks[0]);
+
+ if (_incoming_message_queue.is_empty()) {
+ _incoming_message_pending.wait();
+ } else {
+ drain_incoming_message_queue();
+ }
+ reap_dead_tasks();
+ }
+
+ log(rust_log::DOM, "finished main-loop (dom.rval = %d)", rval);
+ return rval;
+}
+
+
rust_crate_cache *
rust_dom::get_cache(rust_crate const *crate) {
log(rust_log::CACHE,