diff options
| author | Michael Bebenita <[email protected]> | 2010-09-07 18:39:07 -0700 |
|---|---|---|
| committer | Michael Bebenita <[email protected]> | 2010-09-07 18:44:12 -0700 |
| commit | de611a309006f0976bc9a579eb1087e7a89f79a7 (patch) | |
| tree | cd30b33ab1986c0cc84e0fc0743593bd99b0caaa /src/rt/rust_upcall.cpp | |
| parent | Started work on a framework for writing runtime tests, added some simple test... (diff) | |
| download | rust-de611a309006f0976bc9a579eb1087e7a89f79a7.tar.xz rust-de611a309006f0976bc9a579eb1087e7a89f79a7.zip | |
Lots of design changes around proxies and message passing. Made it so that domains can only talk to other domains via handles, and with the help of the rust_kernel.
Diffstat (limited to 'src/rt/rust_upcall.cpp')
| -rw-r--r-- | src/rt/rust_upcall.cpp | 212 |
1 files changed, 100 insertions, 112 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 0e3961bc..4b5fa1d6 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -23,20 +23,6 @@ (task)->dom->get_log().indent(); #endif -void -log_task_state(rust_task *task, maybe_proxy<rust_task> *target) { - rust_task *delegate = target->delegate(); - if (target->is_proxy()) { - task->log(rust_log::TASK, - "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s", - delegate, delegate->ref_count, delegate->state_str()); - } else { - task->log(rust_log::TASK, - "local task: 0x%" PRIxPTR ", ref_count: %d state: %s", - delegate, delegate->ref_count, delegate->state_str()); - } -} - extern "C" CDECL char const * str_buf(rust_task *task, rust_str *s); @@ -104,7 +90,7 @@ upcall_new_chan(rust_task *task, rust_port *port) { "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", (uintptr_t) task, task->name, port); I(dom, port); - return new (dom) rust_chan(task, port); + return new (dom) rust_chan(task, port, port->unit_sz); } /** @@ -135,43 +121,55 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) { "Channel's ref count should be zero."); if (chan->is_associated()) { - // We're trying to delete a channel that another task may be reading - // from. We have two options: - // - // 1. We can flush the channel by blocking in upcall_flush_chan() - // and resuming only when the channel is flushed. The problem - // here is that we can get ourselves in a deadlock if the parent - // task tries to join us. - // - // 2. We can leave the channel in a "dormnat" state by not freeing - // it and letting the receiver task delete it for us instead. - if (chan->buffer.is_empty() == false) { - return; + if (chan->port->is_proxy()) { + // Here is a good place to delete the port proxy we allocated + // in upcall_clone_chan. + rust_proxy<rust_port> *proxy = chan->port->as_proxy(); + chan->disassociate(); + delete proxy; + } else { + // We're trying to delete a channel that another task may be + // reading from. We have two options: + // + // 1. We can flush the channel by blocking in upcall_flush_chan() + // and resuming only when the channel is flushed. The problem + // here is that we can get ourselves in a deadlock if the + // parent task tries to join us. + // + // 2. We can leave the channel in a "dormnat" state by not freeing + // it and letting the receiver task delete it for us instead. + if (chan->buffer.is_empty() == false) { + return; + } + chan->disassociate(); } - chan->disassociate(); } delete chan; } /** * Clones a channel and stores it in the spawnee's domain. Each spawned task - * has it's own copy of the channel. + * has its own copy of the channel. */ extern "C" CDECL rust_chan * -upcall_clone_chan(rust_task *task, - maybe_proxy<rust_task> *target, +upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - task->log(rust_log::UPCALL | rust_log::COMM, - "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, - target, chan); - rust_task *target_task = target->delegate(); + size_t unit_sz = chan->buffer.unit_sz; maybe_proxy<rust_port> *port = chan->port; - if (target->is_proxy()) { - port = target_task->dom->get_port_proxy_synchronized( - chan->port->as_delegate()); + rust_task *target_task = NULL; + if (target->is_proxy() == false) { + port = chan->port; + target_task = target->referent(); + } else { + rust_handle<rust_port> *handle = + task->dom->kernel->get_port_handle(port->as_referent()); + maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle); + task->log(rust_log::MEM, "new proxy: " PTR, proxy); + port = proxy; + target_task = target->as_proxy()->handle()->referent(); } - return new (target_task->dom) rust_chan(target_task, port); + return new (target_task->dom) rust_chan(target_task, port, unit_sz); } extern "C" CDECL void @@ -193,17 +191,15 @@ upcall_sleep(rust_task *task, size_t time_in_us) { extern "C" CDECL void upcall_join(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); - rust_task *target_task = target->delegate(); - task->log(rust_log::UPCALL | rust_log::COMM, - "target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR, - target, target_task->name, target_task); if (target->is_proxy()) { - notify_message:: - send(notify_message::JOIN, "join", task, target->as_proxy()); - task->block(target_task, "joining remote task"); + rust_handle<rust_task> *task_handle = target->as_proxy()->handle(); + notify_message::send(notify_message::JOIN, "join", + task->get_handle(), task_handle); + task->block(task_handle, "joining remote task"); task->yield(2); } else { + rust_task *target_task = target->referent(); // If the other task is already dying, we don't have to wait for it. if (target_task->dead() == false) { target_task->tasks_waiting_to_join.push(task); @@ -221,10 +217,6 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) { extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); - task->log(rust_log::UPCALL | rust_log::COMM, - "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", - (uintptr_t) chan, (uintptr_t) sptr, - chan->port->delegate()->unit_sz); chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -269,21 +261,14 @@ upcall_fail(rust_task *task, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); - log_task_state(task, target); - rust_task *target_task = target->delegate(); - - task->log(rust_log::UPCALL | rust_log::TASK, - "kill task %s @0x%" PRIxPTR ", ref count %d", - target_task->name, target_task, - target_task->ref_count); - if (target->is_proxy()) { notify_message:: - send(notify_message::KILL, "kill", task, target->as_proxy()); + send(notify_message::KILL, "kill", task->get_handle(), + target->as_proxy()->handle()); // The proxy ref_count dropped to zero, delete it here. delete target->as_proxy(); } else { - target_task->kill(); + target->referent()->kill(); } } @@ -554,25 +539,6 @@ upcall_get_type_desc(rust_task *task, return td; } -#if defined(__WIN32__) -static DWORD WINAPI rust_thread_start(void *ptr) -#elif defined(__GNUC__) -static void *rust_thread_start(void *ptr) -#else -#error "Platform not supported" -#endif -{ - // We were handed the domain we are supposed to run. - rust_dom *dom = (rust_dom *) ptr; - - // Start a new rust main loop for this thread. - dom->start_main_loop(); - rust_srv *srv = dom->srv; - srv->kernel->deregister_domain(dom); - delete dom; - return 0; -} - extern "C" CDECL rust_task * upcall_new_task(rust_task *spawner, const char *name) { LOG_UPCALL_ENTRY(spawner); @@ -604,54 +570,76 @@ upcall_start_task(rust_task *spawner, return task; } +/** + * Called whenever a new domain is created. + */ extern "C" CDECL maybe_proxy<rust_task> * upcall_new_thread(rust_task *task, const char *name) { LOG_UPCALL_ENTRY(task); - - rust_dom *old_dom = task->dom; - rust_dom *new_dom = new rust_dom(old_dom->srv, - old_dom->root_crate, - name); - old_dom->srv->kernel->register_domain(new_dom); + rust_dom *parent_dom = task->dom; + rust_kernel *kernel = parent_dom->kernel; + rust_handle<rust_dom> *child_dom_handle = + kernel->create_domain(parent_dom->root_crate, name); + rust_handle<rust_task> *child_task_handle = + kernel->get_task_handle(child_dom_handle->referent()->root_task); task->log(rust_log::UPCALL | rust_log::MEM, - "upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, - name, new_dom, new_dom->root_task); - rust_proxy<rust_task> *proxy = - new (old_dom) rust_proxy<rust_task>(old_dom, - new_dom->root_task, true); - task->log(rust_log::UPCALL | rust_log::MEM, - "new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, - proxy, proxy->delegate()); - return proxy; + "child name: %s, child_dom_handle: " PTR + ", child_task_handle: " PTR, + name, child_dom_handle, child_task_handle); + rust_proxy<rust_task> *child_task_proxy = + new rust_proxy<rust_task> (child_task_handle); + return child_task_proxy; +} + +#if defined(__WIN32__) +static DWORD WINAPI rust_thread_start(void *ptr) +#elif defined(__GNUC__) +static void *rust_thread_start(void *ptr) +#else +#error "Platform not supported" +#endif +{ + // We were handed the domain we are supposed to run. + rust_dom *dom = (rust_dom *) ptr; + + // Start a new rust main loop for this thread. + dom->start_main_loop(); + + // Destroy the domain. + dom->kernel->destroy_domain(dom); + + return 0; } +/** + * Called after a new domain is created. Here we create a new thread and + * and start the domain main loop. + */ extern "C" CDECL maybe_proxy<rust_task> * -upcall_start_thread(rust_task *spawner, - maybe_proxy<rust_task> *root_task_proxy, +upcall_start_thread(rust_task *task, + rust_proxy<rust_task> *child_task_proxy, uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) { - LOG_UPCALL_ENTRY(spawner); - - rust_dom *dom = spawner->dom; - rust_task *root_task = root_task_proxy->delegate(); - dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, - "upcall start_thread(exit_task_glue 0x%" PRIxPTR - ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz); - root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); - + LOG_UPCALL_ENTRY(task); + rust_dom *parenet_dom = task->dom; + rust_handle<rust_task> *child_task_handle = child_task_proxy->handle(); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, + "exit_task_glue: " PTR ", spawnee_fn " PTR + ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz); + rust_task *child_task = child_task_handle->referent(); + child_task->start(exit_task_glue, spawnee_fn, task->rust_sp, callsz); #if defined(__WIN32__) HANDLE thread; - thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom, - 0, NULL); - dom->win32_require("CreateThread", thread != NULL); + thread = CreateThread(NULL, 0, rust_thread_start, child_task->dom, 0, + NULL); + parenet_dom->win32_require("CreateThread", thread != NULL); #else pthread_t thread; - pthread_create(&thread, &dom->attr, rust_thread_start, - (void *) root_task->dom); + pthread_create(&thread, &parenet_dom->attr, rust_thread_start, + (void *) child_task->dom); #endif - return root_task_proxy; + return child_task_proxy; } // |