diff options
| author | Michael Bebenita <[email protected]> | 2010-07-19 14:05:18 -0700 |
|---|---|---|
| committer | Michael Bebenita <[email protected]> | 2010-07-19 14:05:18 -0700 |
| commit | 00d1465d13980fc3acf650f182ee0723fbda0e06 (patch) | |
| tree | a73cf5f0f20c0bee6722b33d975eb930919fefdf /src/rt/rust_dom.cpp | |
| parent | Add a test for an obvious-seeming (but not actually legal) kind of cast attem... (diff) | |
| download | rust-00d1465d13980fc3acf650f182ee0723fbda0e06.tar.xz rust-00d1465d13980fc3acf650f182ee0723fbda0e06.zip | |
Added a message passing system based on lock free queues for inter-thread communication. Channels now buffer on the sending side, and no longer require blocking when sending. Lots of other refactoring and bug fixes.
Diffstat (limited to 'src/rt/rust_dom.cpp')
| -rw-r--r-- | src/rt/rust_dom.cpp | 175 |
1 files changed, 162 insertions, 13 deletions
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 3b5e23b2..39124491 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,6 +4,24 @@ template class ptr_vec<rust_task>; +rust_message::rust_message(rust_dom *dom) : dom(dom) { + +} + +void rust_message::process() { + +} + +kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) : + rust_message(dom), _task(task) { + +} + +void kill_task_message::process() { + _task->ref_count--; + _task->kill(); +} + rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) : interrupt_flag(0), root_crate(root_crate), @@ -81,13 +99,25 @@ rust_dom::activate(rust_task *task) { } void +rust_dom::log(rust_task *task, uint32_t type_bits, char const *fmt, ...) { + char buf[256]; + if (_log.is_tracing(type_bits)) { + va_list args; + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + _log.trace_ln(task, type_bits, buf); + va_end(args); + } +} + +void rust_dom::log(uint32_t type_bits, char const *fmt, ...) { char buf[256]; if (_log.is_tracing(type_bits)) { va_list args; va_start(args, fmt); vsnprintf(buf, sizeof(buf), fmt, args); - _log.trace_ln(type_bits, buf); + _log.trace_ln(NULL, type_bits, buf); va_end(args); } } @@ -189,7 +219,7 @@ rust_dom::remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task) "removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR, (uintptr_t)task, state_vec_name(v), (uintptr_t)v); I(this, (*v)[task->idx] == task); - v->swapdel(task); + v->swap_delete(task); } const char * @@ -203,25 +233,67 @@ rust_dom::state_vec_name(ptr_vec<rust_task> *v) return "dead"; } +/** + * Delete any dead tasks. + */ void -rust_dom::reap_dead_tasks() -{ +rust_dom::reap_dead_tasks() { for (size_t i = 0; i < dead_tasks.length(); ) { - rust_task *t = dead_tasks[i]; - if (t == root_task || t->refcnt == 0) { - I(this, !t->waiting_tasks.length()); - dead_tasks.swapdel(t); + rust_task *task = dead_tasks[i]; +// log(rust_log::TASK, "dead task 0x%" PRIxPTR " with ref_count: %d", +// task, task->ref_count); + if (task->ref_count == 0) { + I(this, !task->waiting_tasks.length()); + dead_tasks.swap_delete(task); log(rust_log::TASK, - "deleting unreferenced dead task 0x%" PRIxPTR, t); - delete t; + "deleting unreferenced dead task 0x%" PRIxPTR, task); + delete task; continue; } ++i; } } + +/** + * Enqueues a message in this domain's incoming message queue. It's the + * responsibility of the receiver to free the message once it's processed. + */ +void rust_dom::send_message(rust_message *message) { + log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR + " in queue 0x%" PRIxPTR, + message, + &_incoming_message_queue); + _incoming_message_queue.enqueue(message); + _incoming_message_pending.signal(); +} + +/** + * Drains and processes incoming pending messages. + */ +void rust_dom::drain_incoming_message_queue() { + rust_message *message; + while ((message = (rust_message *) _incoming_message_queue.dequeue())) { + log(rust_log::COMM, "read 0x%" PRIxPTR + " from queue 0x%" PRIxPTR, + message, + &_incoming_message_queue); + log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR, + message); + message->process(); + delete message; + } +} + +/** + * Schedules a running task for execution. Only running tasks can be + * activated. Blocked tasks have to be unblocked before they can be + * activated. + * + * Returns NULL if no tasks can be scheduled. + */ rust_task * -rust_dom::sched() +rust_dom::schedule_task() { I(this, this); // FIXME: in the face of failing tasks, this is not always right. @@ -231,11 +303,88 @@ rust_dom::sched() i %= running_tasks.length(); return (rust_task *)running_tasks[i]; } - log(rust_log::DOM|rust_log::TASK, - "no schedulable tasks"); + // log(rust_log::DOM|rust_log::TASK, "no schedulable tasks"); return NULL; } +/** + * Starts the main scheduler loop which performs task scheduling for this + * domain. + * + * Returns once no more tasks can be scheduled. + */ +int +rust_dom::start_main_loop() +{ + // Make sure someone is watching, to pull us out of infinite loops. + rust_timer timer(this); + + log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this); + logptr("exit-task glue", root_crate->get_exit_task_glue()); + + while (n_live_tasks() > 0) { + rust_task *scheduled_task = schedule_task(); + + // If we cannot schedule a task because all other live tasks + // are blocked, wait on a condition variable which is signaled + // if progress is made in other domains. + + if (scheduled_task == NULL) { + log(rust_log::TASK, + "all tasks are blocked, waiting for progress ..."); + _progress.wait(); + continue; + } + + I(this, scheduled_task->running()); + + log(rust_log::TASK, + "activating task 0x%" PRIxPTR ", sp=x%" PRIxPTR, + (uintptr_t)scheduled_task, scheduled_task->rust_sp); + + interrupt_flag = 0; + + activate(scheduled_task); + + log(rust_log::TASK, + "returned from task 0x%" PRIxPTR + " in state '%s', sp=0x%" PRIxPTR, + (uintptr_t)scheduled_task, + state_vec_name(scheduled_task->state), + scheduled_task->rust_sp); + + I(this, scheduled_task->rust_sp >= + (uintptr_t) &scheduled_task->stk->data[0]); + I(this, scheduled_task->rust_sp < scheduled_task->stk->limit); + + drain_incoming_message_queue(); + + reap_dead_tasks(); + } + + log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); + + while (dead_tasks.length() > 0) { + log(rust_log::DOM, + "waiting for %d dead tasks to become dereferenced ...", + dead_tasks.length()); + + log(rust_log::DOM, + "waiting for %" PRIxPTR, dead_tasks[0]); + + if (_incoming_message_queue.is_empty()) { + _incoming_message_pending.wait(); + } else { + drain_incoming_message_queue(); + } + reap_dead_tasks(); + } + + log(rust_log::DOM, "finished main-loop (dom.rval = %d)", rval); + return rval; +} + + rust_crate_cache * rust_dom::get_cache(rust_crate const *crate) { log(rust_log::CACHE, |