diff options
| author | Michael Bebenita <[email protected]> | 2010-07-19 14:05:18 -0700 |
|---|---|---|
| committer | Michael Bebenita <[email protected]> | 2010-07-19 14:05:18 -0700 |
| commit | 00d1465d13980fc3acf650f182ee0723fbda0e06 (patch) | |
| tree | a73cf5f0f20c0bee6722b33d975eb930919fefdf /src | |
| parent | Add a test for an obvious-seeming (but not actually legal) kind of cast attem... (diff) | |
| download | rust-00d1465d13980fc3acf650f182ee0723fbda0e06.tar.xz rust-00d1465d13980fc3acf650f182ee0723fbda0e06.zip | |
Added a message passing system based on lock free queues for inter-thread communication. Channels now buffer on the sending side, and no longer require blocking when sending. Lots of other refactoring and bug fixes.
Diffstat (limited to 'src')
35 files changed, 1494 insertions, 834 deletions
diff --git a/src/Makefile b/src/Makefile index d94e3e25..4449418e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -244,7 +244,10 @@ BOOT_CMXS := $(BOOT_MLS:.ml=.cmx) BOOT_OBJS := $(BOOT_MLS:.ml=.o) BOOT_CMIS := $(BOOT_MLS:.ml=.cmi) -RUNTIME_CS := rt/rust.cpp \ +RUNTIME_CS := rt/sync/spin_lock.cpp \ + rt/sync/lock_free_queue.cpp \ + rt/sync/condition_variable.cpp \ + rt/rust.cpp \ rt/rust_builtin.cpp \ rt/rust_crate.cpp \ rt/rust_crate_cache.cpp \ @@ -256,12 +259,19 @@ RUNTIME_CS := rt/rust.cpp \ rt/rust_upcall.cpp \ rt/rust_log.cpp \ rt/rust_timer.cpp \ + rt/circular_buffer.cpp \ rt/isaac/randport.cpp -RUNTIME_HDR := rt/rust.h \ +RUNTIME_HDR := rt/globals.h \ + rt/rust.h \ rt/rust_dwarf.h \ rt/rust_internal.h \ - rt/rust_util.h + rt/rust_util.h \ + rt/rust_chan.h \ + rt/rust_dom.h \ + rt/rust_task.h \ + rt/rust_proxy.h \ + rt/circular_buffer.h RUNTIME_INCS := -Irt/isaac -Irt/uthash RUNTIME_OBJS := $(RUNTIME_CS:.cpp=$(CFG_OBJ_SUFFIX)) @@ -363,6 +373,8 @@ TEST_XFAILS_X86 := $(MUT_BOX_XFAILS) \ test/run-pass/task-comm.rs \ test/run-pass/vec-alloc-append.rs \ test/run-pass/vec-slice.rs \ + test/run-pass/task-comm-3.rs \ + test/run-pass/task-comm-4.rs \ test/compile-fail/bad-recv.rs \ test/compile-fail/bad-send.rs \ test/compile-fail/infinite-tag-type-recursion.rs \ @@ -452,6 +464,11 @@ TEST_XFAILS_LLVM := $(addprefix test/run-pass/, \ tail-cps.rs \ tail-direct.rs \ task-comm.rs \ + task-comm-0.rs \ + task-comm-1.rs \ + task-comm-2.rs \ + task-comm-3.rs \ + task-comm-4.rs \ threads.rs \ tup.rs \ type-sizes.rs \ diff --git a/src/boot/be/abi.ml b/src/boot/be/abi.ml index c3e72293..e4bb3c9c 100644 --- a/src/boot/be/abi.ml +++ b/src/boot/be/abi.ml @@ -13,7 +13,7 @@ let rc_base_field_refcnt = 0;; let task_field_refcnt = rc_base_field_refcnt;; -let task_field_stk = task_field_refcnt + 1;; +let task_field_stk = task_field_refcnt + 2;; let task_field_runtime_sp = task_field_stk + 1;; let task_field_rust_sp = task_field_runtime_sp + 1;; let task_field_gc_alloc_chain = task_field_rust_sp + 1;; diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp new file mode 100644 index 00000000..0e1979c1 --- /dev/null +++ b/src/rt/circular_buffer.cpp @@ -0,0 +1,118 @@ +/* + * A simple resizable circular buffer. + */ + +#include "rust_internal.h" + +circular_buffer::circular_buffer(rust_dom *dom, size_t unit_sz) : + dom(dom), + _buffer_sz(INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz), + unit_sz(unit_sz), + _next(0), + _unread(0), + _buffer((uint8_t *)dom->calloc(_buffer_sz)) { + + A(dom, unit_sz, "Unit size must be larger than zero."); + + dom->log(rust_log::MEM | rust_log::COMM, + "new circular_buffer(buffer_sz=%d, unread=%d)" + "-> circular_buffer=0x%" PRIxPTR, + _buffer_sz, _unread, this); + + A(dom, _buffer, "Failed to allocate buffer."); +} + +circular_buffer::~circular_buffer() { + dom->log(rust_log::MEM | rust_log::COMM, + "~circular_buffer 0x%" PRIxPTR, + this); + I(dom, _buffer); + // I(dom, unread == 0); + dom->free(_buffer); +} + +/** + * Copies the unread data from this buffer to the "dst" address. + */ +void +circular_buffer::transfer(void *dst) { + I(dom, dst); + uint8_t *ptr = (uint8_t *) dst; + for (size_t i = 0; i < _unread; i += unit_sz) { + memcpy(&ptr[i], &_buffer[_next + i % _buffer_sz], unit_sz); + } +} + +/** + * Copies the data at the "src" address into this buffer. The buffer is + * grown if it isn't large enough. + */ +void +circular_buffer::enqueue(void *src) { + I(dom, src); + I(dom, _unread <= _buffer_sz); + + // Grow if necessary. + if (_unread == _buffer_sz) { + I(dom, _buffer_sz <= MAX_CIRCULAR_BUFFFER_SIZE); + void *tmp = dom->malloc(_buffer_sz << 1); + transfer(tmp); + _buffer_sz <<= 1; + dom->free(_buffer); + _buffer = (uint8_t *)tmp; + } + + dom->log(rust_log::MEM | rust_log::COMM, + "circular_buffer enqueue " + "unread: %d, buffer_sz: %d, unit_sz: %d", + _unread, _buffer_sz, unit_sz); + + I(dom, _unread < _buffer_sz); + I(dom, _unread + unit_sz <= _buffer_sz); + + // Copy data + size_t i = (_next + _unread) % _buffer_sz; + memcpy(&_buffer[i], src, unit_sz); + _unread += unit_sz; + + dom->log(rust_log::MEM | rust_log::COMM, + "circular_buffer pushed data at index: %d", i); +} + +/** + * Copies data from this buffer to the "dst" address. The buffer is + * shrunk if possible. + */ +void +circular_buffer::dequeue(void *dst) { + I(dom, dst); + I(dom, unit_sz > 0); + I(dom, _unread >= unit_sz); + I(dom, _unread <= _buffer_sz); + I(dom, _buffer); + size_t i = _next; + memcpy(dst, &_buffer[i], unit_sz); + dom->log(rust_log::MEM | rust_log::COMM, + "shifted data from index %d", i); + _unread -= unit_sz; + _next += unit_sz; + I(dom, _next <= _buffer_sz); + if (_next == _buffer_sz) { + _next = 0; + } + + // Shrink if possible. + if (_buffer_sz >= INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz && + _unread <= _buffer_sz / 4) { + void *tmp = dom->malloc(_buffer_sz / 2); + transfer(tmp); + _buffer_sz >>= 1; + dom->free(_buffer); + _buffer = (uint8_t *)tmp; + } +} + +bool +circular_buffer::is_empty() { + return _unread == 0; +} diff --git a/src/rt/circular_buffer.h b/src/rt/circular_buffer.h new file mode 100644 index 00000000..c0c0da5e --- /dev/null +++ b/src/rt/circular_buffer.h @@ -0,0 +1,30 @@ +/* + * + */ + +#ifndef CIRCULAR_BUFFER_H +#define CIRCULAR_BUFFER_H + +class +circular_buffer : public dom_owned<circular_buffer> { + static const size_t INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS = 8; + static const size_t MAX_CIRCULAR_BUFFFER_SIZE = 1 << 24; + +public: + rust_dom *dom; + circular_buffer(rust_dom *dom, size_t unit_sz); + ~circular_buffer(); + void transfer(void *dst); + void enqueue(void *src); + void dequeue(void *dst); + bool is_empty(); + +private: + size_t _buffer_sz; + size_t unit_sz; + size_t _next; + size_t _unread; + uint8_t *_buffer; +}; + +#endif /* CIRCULAR_BUFFER_H */ diff --git a/src/rt/globals.h b/src/rt/globals.h new file mode 100644 index 00000000..f8025a40 --- /dev/null +++ b/src/rt/globals.h @@ -0,0 +1,33 @@ +#ifndef GLOBALS_H +#define GLOBALS_H + +#define __STDC_LIMIT_MACROS 1 +#define __STDC_CONSTANT_MACROS 1 +#define __STDC_FORMAT_MACROS 1 + +#include <stdlib.h> +#include <stdint.h> +#include <inttypes.h> + +#include <stdio.h> +#include <string.h> + +#if defined(__WIN32__) +extern "C" { +#include <windows.h> +#include <tchar.h> +#include <wincrypt.h> +} +#elif defined(__GNUC__) +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <dlfcn.h> +#include <pthread.h> +#include <errno.h> +#else +#error "Platform not supported." +#endif + +#endif /* GLOBALS_H */ diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 235eb8d0..00e709c9 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -1,7 +1,6 @@ #include "rust_internal.h" #include "util/array_list.h" - // #define TRACK_ALLOCATIONS // For debugging, keeps track of live allocations, so you can find out // exactly what leaked. @@ -100,52 +99,6 @@ rust_srv::clone() return new rust_srv(); } - -int -rust_main_loop(rust_dom *dom) -{ - // Make sure someone is watching, to pull us out of infinite loops. - rust_timer timer(*dom); - - int rval; - rust_task *task; - - dom->log(rust_log::DOM, - "running main-loop on domain 0x%" PRIxPTR, dom); - dom->logptr("exit-task glue", - dom->root_crate->get_exit_task_glue()); - - while ((task = dom->sched()) != NULL) { - I(dom, task->running()); - - dom->log(rust_log::TASK, - "activating task 0x%" PRIxPTR ", sp=0x%" PRIxPTR, - (uintptr_t)task, task->rust_sp); - - dom->interrupt_flag = 0; - - dom->activate(task); - - dom->log(rust_log::TASK, - "returned from task 0x%" PRIxPTR - " in state '%s', sp=0x%" PRIxPTR, - (uintptr_t)task, - dom->state_vec_name(task->state), - task->rust_sp); - - I(dom, task->rust_sp >= (uintptr_t) &task->stk->data[0]); - I(dom, task->rust_sp < task->stk->limit); - - dom->reap_dead_tasks(); - } - - dom->log(rust_log::DOM, "finished main-loop (dom.rval = %d)", dom->rval); - rval = dom->rval; - - return rval; -} - - struct command_line_args { @@ -243,7 +196,7 @@ rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv) (uintptr_t)&main_args, sizeof(main_args)); - ret = rust_main_loop(&dom); + ret = dom.start_main_loop(); } #if !defined(__WIN32__) diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 339452c5..0b492de8 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -19,7 +19,7 @@ str_alloc(rust_task *task, size_t n_bytes) extern "C" CDECL rust_str* last_os_error(rust_task *task) { rust_dom *dom = task->dom; - dom->log(rust_log::TASK, "last_os_error()"); + task->log(rust_log::TASK, "last_os_error()"); #if defined(__WIN32__) LPTSTR buf; @@ -95,7 +95,7 @@ extern "C" CDECL rust_vec* vec_alloc(rust_task *task, type_desc *t, size_t n_elts) { rust_dom *dom = task->dom; - dom->log(rust_log::MEM, + task->log(rust_log::MEM, "vec_alloc %" PRIdPTR " elements of size %" PRIdPTR, n_elts, t->size); size_t fill = n_elts * t->size; diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 38f93a7d..6aa9121a 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -1,29 +1,29 @@ - #include "rust_internal.h" #include "rust_chan.h" rust_chan::rust_chan(rust_task *task, rust_port *port) : - task(task), - port(port), - buffer(task->dom, port->unit_sz), - token(this) -{ - if (port) + task(task), port(port), buffer(task->dom, port->unit_sz), token(this) { + + if (port) { port->chans.push(this); + ref(); + } + + task->log(rust_log::MEM | rust_log::COMM, + "new rust_chan(task=0x%" PRIxPTR + ", port=0x%" PRIxPTR ") -> chan=0x%" + PRIxPTR, (uintptr_t) task, (uintptr_t) port, (uintptr_t) this); } -rust_chan::~rust_chan() -{ +rust_chan::~rust_chan() { if (port) { if (token.pending()) token.withdraw(); - port->chans.swapdel(this); + port->chans.swap_delete(this); } } -void -rust_chan::disassociate() -{ +void rust_chan::disassociate() { I(task->dom, port); if (token.pending()) @@ -31,4 +31,32 @@ rust_chan::disassociate() // Delete reference to the port/ port = NULL; + + deref(); +} + +/** + * Attempt to transmit channel data to the associated port. + */ +int rust_chan::transmit() { + rust_dom *dom = task->dom; + + // TODO: Figure out how and why the port would become null. + if (port == NULL) { + dom->log(rust_log::COMM, "invalid port, transmission incomplete"); + return ERROR; + } + + if (buffer.is_empty()) { + dom->log(rust_log::COMM, "buffer is empty, transmission incomplete"); + return ERROR; + } + + if(port->task->blocked_on(port)) { + buffer.dequeue(port->task->rendezvous_ptr); + port->task->wakeup(port); + } + + return 0; + } diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index a56ba0ca..3e32d838 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -9,7 +9,7 @@ public: rust_task *task; rust_port *port; - circ_buf buffer; + circular_buffer buffer; size_t idx; // Index into port->chans. // Token belonging to this chan, it will be placed into a port's @@ -17,6 +17,8 @@ public: rust_token token; void disassociate(); + + int transmit(); }; #endif /* RUST_CHAN_H */ diff --git a/src/rt/rust_comm.cpp b/src/rt/rust_comm.cpp index 58b9ef4c..37929b8b 100644 --- a/src/rt/rust_comm.cpp +++ b/src/rt/rust_comm.cpp @@ -10,109 +10,6 @@ rust_alarm::rust_alarm(rust_task *receiver) : { } - -// Circular buffers. - -circ_buf::circ_buf(rust_dom *dom, size_t unit_sz) : - dom(dom), - alloc(INIT_CIRC_BUF_UNITS * unit_sz), - unit_sz(unit_sz), - next(0), - unread(0), - data((uint8_t *)dom->calloc(alloc)) -{ - I(dom, unit_sz); - dom->log(rust_log::MEM|rust_log::COMM, - "new circ_buf(alloc=%d, unread=%d) -> circ_buf=0x%" PRIxPTR, - alloc, unread, this); - I(dom, data); -} - -circ_buf::~circ_buf() -{ - dom->log(rust_log::MEM|rust_log::COMM, - "~circ_buf 0x%" PRIxPTR, - this); - I(dom, data); - // I(dom, unread == 0); - dom->free(data); -} - -void -circ_buf::transfer(void *dst) -{ - size_t i; - uint8_t *d = (uint8_t *)dst; - I(dom, dst); - for (i = 0; i < unread; i += unit_sz) - memcpy(&d[i], &data[next + i % alloc], unit_sz); -} - -void -circ_buf::push(void *src) -{ - size_t i; - void *tmp; - - I(dom, src); - I(dom, unread <= alloc); - - /* Grow if necessary. */ - if (unread == alloc) { - I(dom, alloc <= MAX_CIRC_BUF_SIZE); - tmp = dom->malloc(alloc << 1); - transfer(tmp); - alloc <<= 1; - dom->free(data); - data = (uint8_t *)tmp; - } - - dom->log(rust_log::MEM|rust_log::COMM, - "circ buf push, unread=%d, alloc=%d, unit_sz=%d", - unread, alloc, unit_sz); - - I(dom, unread < alloc); - I(dom, unread + unit_sz <= alloc); - - i = (next + unread) % alloc; - memcpy(&data[i], src, unit_sz); - - dom->log(rust_log::MEM|rust_log::COMM, "pushed data at index %d", i); - unread += unit_sz; -} - -void -circ_buf::shift(void *dst) -{ - size_t i; - void *tmp; - - I(dom, dst); - I(dom, unit_sz > 0); - I(dom, unread >= unit_sz); - I(dom, unread <= alloc); - I(dom, data); - i = next; - memcpy(dst, &data[i], unit_sz); - dom->log(rust_log::MEM|rust_log::COMM, "shifted data from index %d", i); - unread -= unit_sz; - next += unit_sz; - I(dom, next <= alloc); - if (next == alloc) - next = 0; - - /* Shrink if necessary. */ - if (alloc >= INIT_CIRC_BUF_UNITS * unit_sz && - unread <= alloc / 4) { - tmp = dom->malloc(alloc / 2); - transfer(tmp); - alloc >>= 1; - dom->free(data); - data = (uint8_t *)tmp; - } -} - - // Ports. rust_port::rust_port(rust_task *task, size_t unit_sz) : @@ -121,18 +18,16 @@ rust_port::rust_port(rust_task *task, size_t unit_sz) : writers(task->dom), chans(task->dom) { - rust_dom *dom = task->dom; - dom->log(rust_log::MEM|rust_log::COMM, - "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" - PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); + task->log(rust_log::MEM|rust_log::COMM, + "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" + PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); } rust_port::~rust_port() { - rust_dom *dom = task->dom; - dom->log(rust_log::COMM|rust_log::MEM, - "~rust_port 0x%" PRIxPTR, - (uintptr_t)this); + task->log(rust_log::COMM|rust_log::MEM, + "~rust_port 0x%" PRIxPTR, + (uintptr_t)this); while (chans.length() > 0) chans.pop()->disassociate(); } @@ -182,7 +77,7 @@ rust_token::withdraw() if (task->blocked()) task->wakeup(this); // must be blocked on us (or dead) - port->writers.swapdel(this); + port->writers.swap_delete(this); submitted = false; } diff --git a/src/rt/rust_crate_cache.cpp b/src/rt/rust_crate_cache.cpp index 2db0eb45..650e3bb1 100644 --- a/src/rt/rust_crate_cache.cpp +++ b/src/rt/rust_crate_cache.cpp @@ -251,7 +251,7 @@ rust_crate_cache::flush() { if (s) { dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref rust_sym %" - PRIdPTR " (rc=%" PRIdPTR ")", i, s->refcnt); + PRIdPTR " (rc=%" PRIdPTR ")", i, s->ref_count); s->deref(); } rust_syms[i] = NULL; @@ -262,7 +262,7 @@ rust_crate_cache::flush() { if (s) { dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref c_sym %" - PRIdPTR " (rc=%" PRIdPTR ")", i, s->refcnt); + PRIdPTR " (rc=%" PRIdPTR ")", i, s->ref_count); s->deref(); } c_syms[i] = NULL; @@ -272,7 +272,7 @@ rust_crate_cache::flush() { lib *l = libs[i]; if (l) { dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref lib %" - PRIdPTR " (rc=%" PRIdPTR ")", i, l->refcnt); + PRIdPTR " (rc=%" PRIdPTR ")", i, l->ref_count); l->deref(); } libs[i] = NULL; diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 3b5e23b2..39124491 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,6 +4,24 @@ template class ptr_vec<rust_task>; +rust_message::rust_message(rust_dom *dom) : dom(dom) { + +} + +void rust_message::process() { + +} + +kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) : + rust_message(dom), _task(task) { + +} + +void kill_task_message::process() { + _task->ref_count--; + _task->kill(); +} + rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) : interrupt_flag(0), root_crate(root_crate), @@ -81,13 +99,25 @@ rust_dom::activate(rust_task *task) { } void +rust_dom::log(rust_task *task, uint32_t type_bits, char const *fmt, ...) { + char buf[256]; + if (_log.is_tracing(type_bits)) { + va_list args; + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + _log.trace_ln(task, type_bits, buf); + va_end(args); + } +} + +void rust_dom::log(uint32_t type_bits, char const *fmt, ...) { char buf[256]; if (_log.is_tracing(type_bits)) { va_list args; va_start(args, fmt); vsnprintf(buf, sizeof(buf), fmt, args); - _log.trace_ln(type_bits, buf); + _log.trace_ln(NULL, type_bits, buf); va_end(args); } } @@ -189,7 +219,7 @@ rust_dom::remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task) "removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR, (uintptr_t)task, state_vec_name(v), (uintptr_t)v); I(this, (*v)[task->idx] == task); - v->swapdel(task); + v->swap_delete(task); } const char * @@ -203,25 +233,67 @@ rust_dom::state_vec_name(ptr_vec<rust_task> *v) return "dead"; } +/** + * Delete any dead tasks. + */ void -rust_dom::reap_dead_tasks() -{ +rust_dom::reap_dead_tasks() { for (size_t i = 0; i < dead_tasks.length(); ) { - rust_task *t = dead_tasks[i]; - if (t == root_task || t->refcnt == 0) { - I(this, !t->waiting_tasks.length()); - dead_tasks.swapdel(t); + rust_task *task = dead_tasks[i]; +// log(rust_log::TASK, "dead task 0x%" PRIxPTR " with ref_count: %d", +// task, task->ref_count); + if (task->ref_count == 0) { + I(this, !task->waiting_tasks.length()); + dead_tasks.swap_delete(task); log(rust_log::TASK, - "deleting unreferenced dead task 0x%" PRIxPTR, t); - delete t; + "deleting unreferenced dead task 0x%" PRIxPTR, task); + delete task; continue; } ++i; } } + +/** + * Enqueues a message in this domain's incoming message queue. It's the + * responsibility of the receiver to free the message once it's processed. + */ +void rust_dom::send_message(rust_message *message) { + log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR + " in queue 0x%" PRIxPTR, + message, + &_incoming_message_queue); + _incoming_message_queue.enqueue(message); + _incoming_message_pending.signal(); +} + +/** + * Drains and processes incoming pending messages. + */ +void rust_dom::drain_incoming_message_queue() { + rust_message *message; + while ((message = (rust_message *) _incoming_message_queue.dequeue())) { + log(rust_log::COMM, "read 0x%" PRIxPTR + " from queue 0x%" PRIxPTR, + message, + &_incoming_message_queue); + log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR, + message); + message->process(); + delete message; + } +} + +/** + * Schedules a running task for execution. Only running tasks can be + * activated. Blocked tasks have to be unblocked before they can be + * activated. + * + * Returns NULL if no tasks can be scheduled. + */ rust_task * -rust_dom::sched() +rust_dom::schedule_task() { I(this, this); // FIXME: in the face of failing tasks, this is not always right. @@ -231,11 +303,88 @@ rust_dom::sched() i %= running_tasks.length(); return (rust_task *)running_tasks[i]; } - log(rust_log::DOM|rust_log::TASK, - "no schedulable tasks"); + // log(rust_log::DOM|rust_log::TASK, "no schedulable tasks"); return NULL; } +/** + * Starts the main scheduler loop which performs task scheduling for this + * domain. + * + * Returns once no more tasks can be scheduled. + */ +int +rust_dom::start_main_loop() +{ + // Make sure someone is watching, to pull us out of infinite loops. + rust_timer timer(this); + + log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this); + logptr("exit-task glue", root_crate->get_exit_task_glue()); + + while (n_live_tasks() > 0) { + rust_task *scheduled_task = schedule_task(); + + // If we cannot schedule a task because all other live tasks + // are blocked, wait on a condition variable which is signaled + // if progress is made in other domains. + + if (scheduled_task == NULL) { + log(rust_log::TASK, + "all tasks are blocked, waiting for progress ..."); + _progress.wait(); + continue; + } + + I(this, scheduled_task->running()); + + log(rust_log::TASK, + "activating task 0x%" PRIxPTR ", sp=x%" PRIxPTR, + (uintptr_t)scheduled_task, scheduled_task->rust_sp); + + interrupt_flag = 0; + + activate(scheduled_task); + + log(rust_log::TASK, + "returned from task 0x%" PRIxPTR + " in state '%s', sp=0x%" PRIxPTR, + (uintptr_t)scheduled_task, + state_vec_name(scheduled_task->state), + scheduled_task->rust_sp); + + I(this, scheduled_task->rust_sp >= + (uintptr_t) &scheduled_task->stk->data[0]); + I(this, scheduled_task->rust_sp < scheduled_task->stk->limit); + + drain_incoming_message_queue(); + + reap_dead_tasks(); + } + + log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); + + while (dead_tasks.length() > 0) { + log(rust_log::DOM, + "waiting for %d dead tasks to become dereferenced ...", + dead_tasks.length()); + + log(rust_log::DOM, + "waiting for %" PRIxPTR, dead_tasks[0]); + + if (_incoming_message_queue.is_empty()) { + _incoming_message_pending.wait(); + } else { + drain_incoming_message_queue(); + } + reap_dead_tasks(); + } + + log(rust_log::DOM, "finished main-loop (dom.rval = %d)", rval); + return rval; +} + + rust_crate_cache * rust_dom::get_cache(rust_crate const *crate) { log(rust_log::CACHE, diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h new file mode 100644 index 00000000..38d04dbf --- /dev/null +++ b/src/rt/rust_dom.h @@ -0,0 +1,92 @@ +/* + * rust_dom.h + */ + +#ifndef RUST_DOM_H +#define RUST_DOM_H + +#include "sync/lock_free_queue.h" + +class rust_message : public lock_free_queue_node, + public dom_owned<rust_message> { +public: + rust_dom *dom; + rust_message(rust_dom *dom); + virtual void process(); +}; + +class kill_task_message : public rust_message { + rust_task *_task; +public: + kill_task_message(rust_dom *dom, rust_task *task); + void process(); +}; + +struct rust_dom +{ + // Fields known to the compiler: + uintptr_t interrupt_flag; + + // Fields known only by the runtime: + + // NB: the root crate must remain in memory until the root of the + // tree of domains exits. All domains within this tree have a + // copy of this root_crate value and use it for finding utility + // glue. + rust_crate const *root_crate; + rust_log _log; + rust_srv *srv; + ptr_vec<rust_task> running_tasks; + ptr_vec<rust_task> blocked_tasks; + ptr_vec<rust_task> dead_tasks; + ptr_vec<rust_crate_cache> caches; + randctx rctx; + rust_task *root_task; + rust_task *curr_task; + int rval; + + condition_variable _progress; + + // Incoming messages from other domains. + condition_variable _incoming_message_pending; + lock_free_queue _incoming_message_queue; + +#ifndef __WIN32__ + pthread_attr_t attr; +#endif + + rust_dom(rust_srv *srv, rust_crate const *root_crate); + ~rust_dom(); + + void activate(rust_task *task); + void log(rust_task *task, uint32_t logbit, char const *fmt, ...); + void log(uint32_t logbit, char const *fmt, ...); + rust_log & get_log(); + void logptr(char const *msg, uintptr_t ptrval); + template<typename T> + void logptr(char const *msg, T* ptrval); + void fail(); + void *malloc(size_t sz); + void *calloc(size_t sz); + void *realloc(void *data, size_t sz); + void free(void *p); + + void send_message(rust_message *message); + void drain_incoming_message_queue(); + +#ifdef __WIN32__ + void win32_require(LPCTSTR fn, BOOL ok); +#endif + + rust_crate_cache *get_cache(rust_crate const *crate); + size_t n_live_tasks(); + void add_task_to_state_vec(ptr_vec<rust_task> *v, rust_task *task); + void remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task); + const char *state_vec_name(ptr_vec<rust_task> *v); + + void reap_dead_tasks(); + rust_task *schedule_task(); + int start_main_loop(); +}; + +#endif /* RUST_DOM_H */ diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index f877cefc..d962e894 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -5,6 +5,8 @@ #define __STDC_CONSTANT_MACROS 1 #define __STDC_FORMAT_MACROS 1 +#define ERROR 0 + #include <stdlib.h> #include <stdint.h> #include <inttypes.h> @@ -36,13 +38,18 @@ extern "C" { #error "Platform not supported." #endif +#include "sync/condition_variable.h" + #ifndef __i386__ #error "Target CPU not supported." #endif -#define I(dom, e) ((e) ? (void)0 : \ +#define I(dom, e) ((e) ? (void)0 : \ (dom)->srv->fatal(#e, __FILE__, __LINE__)) +#define A(dom, e, s) ((e) ? (void)0 : \ + (dom)->srv->fatal(#e " : " #s, __FILE__, __LINE__)) + struct rust_task; struct rust_port; class rust_chan; @@ -50,7 +57,7 @@ struct rust_token; struct rust_dom; class rust_crate; class rust_crate_cache; -class lockfree_queue; +// class lockfree_queue; struct stk_seg; struct type_desc; @@ -66,14 +73,14 @@ template <typename T> struct rc_base { - size_t refcnt; + size_t ref_count; void ref() { - ++refcnt; + ++ref_count; } void deref() { - if (--refcnt == 0) { + if (--ref_count == 0) { delete (T*)this; } } @@ -122,71 +129,29 @@ public: return fill; } + bool is_empty() { + return fill == 0; + } + T *& operator[](size_t offset); void push(T *p); T *pop(); void trim(size_t fill); - void swapdel(T* p); + void swap_delete(T* p); }; -struct -rust_dom -{ - // Fields known to the compiler: - uintptr_t interrupt_flag; - - // Fields known only by the runtime: - - // NB: the root crate must remain in memory until the root of the - // tree of domains exits. All domains within this tree have a - // copy of this root_crate value and use it for finding utility - // glue. - rust_crate const *root_crate; - rust_log _log; - rust_srv *srv; - // uint32_t logbits; - ptr_vec<rust_task> running_tasks; - ptr_vec<rust_task> blocked_tasks; - ptr_vec<rust_task> dead_tasks; - ptr_vec<rust_crate_cache> caches; - randctx rctx; - rust_task *root_task; - rust_task *curr_task; - int rval; - lockfree_queue *incoming; // incoming messages from other threads - -#ifndef __WIN32__ - pthread_attr_t attr; -#endif - - rust_dom(rust_srv *srv, rust_crate const *root_crate); - ~rust_dom(); - - void activate(rust_task *task); - void log(uint32_t logbit, char const *fmt, ...); - rust_log & get_log(); - void logptr(char const *msg, uintptr_t ptrval); - template<typename T> - void logptr(char const *msg, T* ptrval); - void fail(); - void *malloc(size_t sz); - void *calloc(size_t sz); - void *realloc(void *data, size_t sz); - void free(void *p); - -#ifdef __WIN32__ - void win32_require(LPCTSTR fn, BOOL ok); -#endif +#include "rust_dom.h" - rust_crate_cache *get_cache(rust_crate const *crate); - size_t n_live_tasks(); - void add_task_to_state_vec(ptr_vec<rust_task> *v, rust_task *task); - void remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task); - const char *state_vec_name(ptr_vec<rust_task> *v); +template <typename T> inline T +check_null(rust_dom *dom, T value, char const *expr, + char const *file, size_t line) { + if (value == NULL) { + dom->srv->fatal(expr, file, line); + } + return value; +} - void reap_dead_tasks(); - rust_task *sched(); -}; +#define CHECK_NULL(dom, e) (check_null(dom, e, #e, __FILE__, __LINE__)) inline void *operator new(size_t sz, void *mem) { return mem; @@ -217,7 +182,7 @@ rust_timer // For now it's just the most basic "thread that can interrupt // its associated domain-thread" device, so that we have // *some* form of task-preemption. - rust_dom &dom; + rust_dom *dom; uintptr_t exit_flag; #if defined(__WIN32__) @@ -227,7 +192,7 @@ rust_timer pthread_t thread; #endif - rust_timer(rust_dom &dom); + rust_timer(rust_dom *dom); ~rust_timer(); }; @@ -608,94 +573,8 @@ struct gc_alloc { } }; -struct -rust_task : public rc_base<rust_task>, - public dom_owned<rust_task>, - public rust_cond -{ - // Fields known to the compiler. - stk_seg *stk; - uintptr_t runtime_sp; // Runtime sp while task running. - uintptr_t rust_sp; // Saved sp when not running. - gc_alloc *gc_alloc_chain; // Linked list of GC allocations. - rust_dom *dom; - rust_crate_cache *cache; - - // Fields known only to the runtime. - ptr_vec<rust_task> *state; - rust_cond *cond; - uintptr_t* dptr; // Rendezvous pointer for send/recv. - rust_task *supervisor; // Parent-link for failure propagation. - size_t idx; - size_t gc_alloc_thresh; - size_t gc_alloc_accum; - - // Wait queue for tasks waiting for this task. - rust_wait_queue waiting_tasks; - rust_alarm alarm; - - rust_task(rust_dom *dom, - rust_task *spawner); - ~rust_task(); - - void start(uintptr_t exit_task_glue, - uintptr_t spawnee_fn, - uintptr_t args, - size_t callsz); - void grow(size_t n_frame_bytes); - bool running(); - bool blocked(); - bool blocked_on(rust_cond *cond); - bool dead(); - - void link_gc(gc_alloc *gcm); - void unlink_gc(gc_alloc *gcm); - void *malloc(size_t sz, type_desc *td=0); - void *realloc(void *data, size_t sz, bool gc_mem=false); - void free(void *p, bool gc_mem=false); - - const char *state_str(); - void transition(ptr_vec<rust_task> *svec, ptr_vec<rust_task> *dvec); - - void block(rust_cond *on); - void wakeup(rust_cond *from); - void die(); - void unblock(); - - void check_active() { I(dom, dom->curr_task == this); } - void check_suspended() { I(dom, dom->curr_task != this); } - - // Swap in some glue code to run when we have returned to the - // task's context (assuming we're the active task). - void run_after_return(size_t nargs, uintptr_t glue); - - // Swap in some glue code to run when we're next activated - // (assuming we're the suspended task). - void run_on_resume(uintptr_t glue); - - // Save callee-saved registers and return to the main loop. - void yield(size_t nargs); - - // Fail this task (assuming caller-on-stack is different task). - void kill(); - - // Fail self, assuming caller-on-stack is this task. - void fail(size_t nargs); - - // Run the gc glue on the task stack. - void gc(size_t nargs); - - // Disconnect from our supervisor. - void unsupervise(); - - // Notify tasks waiting for us that we are about to die. - void notify_waiting_tasks(); - - uintptr_t get_fp(); - uintptr_t get_previous_fp(uintptr_t fp); - frame_glue_fns *get_frame_glue_fns(uintptr_t fp); - rust_crate_cache * get_crate_cache(rust_crate const *curr_crate); -}; +#include "rust_proxy.h" +#include "rust_task.h" struct rust_port : public rc_base<rust_port>, public task_owned<rust_port>, @@ -722,31 +601,29 @@ struct rust_token : public rust_cond { void withdraw(); }; +#include "circular_buffer.h" -struct circ_buf : public dom_owned<circ_buf> { - static const size_t INIT_CIRC_BUF_UNITS = 8; - static const size_t MAX_CIRC_BUF_SIZE = 1 << 24; - - rust_dom *dom; - size_t alloc; - size_t unit_sz; - size_t next; - size_t unread; - uint8_t *data; - - circ_buf(rust_dom *dom, size_t unit_sz); - ~circ_buf(); - - void transfer(void *dst); - void push(void *src); - void shift(void *dst); -}; +//struct circ_buf : public dom_owned<circ_buf> { +// static const size_t INIT_CIRC_BUF_UNITS = 8; +// static const size_t MAX_CIRC_BUF_SIZE = 1 << 24; +// +// rust_dom *dom; +// size_t alloc; +// size_t unit_sz; +// size_t next; +// size_t unread; +// uint8_t *data; +// +// circ_buf(rust_dom *dom, size_t unit_sz); +// ~circ_buf(); +// +// void transfer(void *dst); +// void push(void *src); +// void shift(void *dst); +//}; #include "rust_chan.h" -int -rust_main_loop(rust_dom *dom); - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp index 5cdf315c..b67876eb 100644 --- a/src/rt/rust_log.cpp +++ b/src/rt/rust_log.cpp @@ -4,9 +4,13 @@ */ #include "rust_internal.h" +#include "sync/spin_lock.h" +#include "util/array_list.h" +#include <stdarg.h> -static uint32_t read_type_bit_mask() { - uint32_t bits = rust_log::ULOG | rust_log::ERR; +static uint32_t +read_type_bit_mask() { + uint32_t bits = rust_log::ULOG | rust_log::ERR | rust_log::ALL; char *env_str = getenv("RUST_LOG"); if (env_str) { bits = 0; @@ -27,92 +31,167 @@ static uint32_t read_type_bit_mask() { return bits; } -rust_log::ansi_color rust_log::get_type_color(log_type type) { - switch (type) { - case ERR: - return rust_log::RED; - case UPCALL: - return rust_log::GREEN; - case COMM: - return rust_log::MAGENTA; - case DOM: - case TASK: - return rust_log::LIGHTTEAL; - case MEM: - return rust_log::YELLOW; - default: - return rust_log::WHITE; - } +rust_log::ansi_color +get_type_color(rust_log::log_type type) { + rust_log::ansi_color color = rust_log::WHITE; + if (type & rust_log::ERR) + color = rust_log::RED; + if (type & rust_log::MEM) + color = rust_log::YELLOW; + if (type & rust_log::UPCALL) + color = rust_log::GREEN; + if (type & rust_log::COMM) + color = rust_log::MAGENTA; + if (type & rust_log::DOM) + color = rust_log::LIGHTTEAL; + if (type & rust_log::TASK) + color = rust_log::LIGHTTEAL; + return color; } -static const char * _foreground_colors[] = { "[30m", "[1;30m", "[37m", - "[31m", "[1;31m", "[32m", - "[1;32m", "[33m", "[33m", - "[34m", "[1;34m", "[35m", - "[1;35m", "[36m", "[1;36m" }; +static const char * _foreground_colors[] = { "[37m", + "[31m", "[1;31m", + "[32m", "[1;32m", + "[33m", "[1;33m", + "[31m", "[1;31m", + "[35m", "[1;35m", + "[36m", "[1;36m" }; + +/** + * Synchronizes access to the underlying logging mechanism. + */ +static spin_lock _log_lock; + rust_log::rust_log(rust_srv *srv, rust_dom *dom) : - _srv(srv), _dom(dom), _type_bit_mask(read_type_bit_mask()), - _use_colors(getenv("RUST_COLOR_LOG")), _indent(0) { + _srv(srv), + _dom(dom), + _type_bit_mask(read_type_bit_mask()), + _use_colors(getenv("RUST_COLOR_LOG")), + _indent(0) { } rust_log::~rust_log() { } -void rust_log::trace_ln(char *message) { - char buffer[512]; - if (_use_colors) { - snprintf(buffer, sizeof(buffer), "\x1b%s0x%08" PRIxPTR "\x1b[0m: ", - _foreground_colors[1 + ((uintptr_t) _dom % 2687 % (LIGHTTEAL - - 1))], (uintptr_t) _dom); - } else { - snprintf(buffer, sizeof(buffer), "0x%08" PRIxPTR ": ", - (uintptr_t) _dom); +const uint16_t +hash(uintptr_t ptr) { + // Robert Jenkins' 32 bit integer hash function + ptr = (ptr + 0x7ed55d16) + (ptr << 12); + ptr = (ptr ^ 0xc761c23c) ^ (ptr >> 19); + ptr = (ptr + 0x165667b1) + (ptr << 5); + ptr = (ptr + 0xd3a2646c) ^ (ptr << 9); + ptr = (ptr + 0xfd7046c5) + (ptr << 3); + ptr = (ptr ^ 0xb55a4f09) ^ (ptr >> 16); + return (uint16_t) ptr; +} + +const char * +get_color(uintptr_t ptr) { + return _foreground_colors[hash(ptr) % rust_log::LIGHTTEAL]; +} + +char * +copy_string(char *dst, const char *src, size_t length) { + return strncpy(dst, src, length) + length; +} + +char * +append_string(char *buffer, const char *format, ...) { + if (buffer != NULL && format) { + va_list args; + va_start(args, format); + vsprintf(buffer + strlen(buffer), format, args); + va_end(args); } + return buffer; +} +char * +append_string(char *buffer, rust_log::ansi_color color, + const char *format, ...) { + if (buffer != NULL && format) { + append_string(buffer, "\x1b%s", _foreground_colors[color]); + va_list args; + va_start(args, format); + vsprintf(buffer + strlen(buffer), format, args); + va_end(args); + append_string(buffer, "\x1b[0m"); + } + return buffer; +} + +void +rust_log::trace_ln(char *prefix, char *message) { + char buffer[1024] = ""; + _log_lock.lock(); + append_string(buffer, "%-34s", prefix); for (uint32_t i = 0; i < _indent; i++) { - strncat(buffer, "\t", sizeof(buffer) - strlen(buffer) - 1); + append_string(buffer, " "); } - strncat(buffer, message, sizeof(buffer) - strlen(buffer) - 1); + append_string(buffer, "%s", message); _srv->log(buffer); + _log_lock.unlock(); +} + +void +rust_log::trace_ln(rust_task *task, char *message) { +#if defined(__WIN32__) + uint32_t thread_id = 0; +#else + uint32_t thread_id = (uint32_t) pthread_self(); +#endif + char prefix[1024] = ""; + append_string(prefix, "0x%08" PRIxPTR ":0x%08" PRIxPTR ":", + thread_id, (uintptr_t) _dom); + if (task) { + append_string(prefix, "0x%08" PRIxPTR ":", (uintptr_t) task); + } + trace_ln(prefix, message); } /** * Traces a log message if the specified logging type is not filtered. */ -void rust_log::trace_ln(uint32_t type_bits, char *message) { - trace_ln(get_type_color((rust_log::log_type) type_bits), type_bits, - message); +void +rust_log::trace_ln(rust_task *task, uint32_t type_bits, char *message) { + trace_ln(task, get_type_color((rust_log::log_type) type_bits), + type_bits, message); } /** * Traces a log message using the specified ANSI color code. */ -void rust_log::trace_ln(ansi_color color, uint32_t type_bits, char *message) { +void +rust_log::trace_ln(rust_task *task, ansi_color color, + uint32_t type_bits, char *message) { if (is_tracing(type_bits)) { if (_use_colors) { - char buffer[512]; - snprintf(buffer, sizeof(buffer), "\x1b%s%s\x1b[0m", - _foreground_colors[color], message); - trace_ln(buffer); + char buffer[512] = ""; + append_string(buffer, color, "%s", message); + trace_ln(task, buffer); } else { - trace_ln(message); + trace_ln(task, message); } } } -bool rust_log::is_tracing(uint32_t type_bits) { +bool +rust_log::is_tracing(uint32_t type_bits) { return type_bits & _type_bit_mask; } -void rust_log::indent() { +void +rust_log::indent() { _indent++; } -void rust_log::outdent() { +void +rust_log::outdent() { _indent--; } -void rust_log::reset_indent(uint32_t indent) { +void +rust_log::reset_indent(uint32_t indent) { _indent = indent; } diff --git a/src/rt/rust_log.h b/src/rt/rust_log.h index bd32c155..06712066 100644 --- a/src/rt/rust_log.h +++ b/src/rt/rust_log.h @@ -1,22 +1,18 @@ -#ifndef RUST_LOG_H_ -#define RUST_LOG_H_ +#ifndef RUST_LOG_H +#define RUST_LOG_H class rust_dom; +class rust_task; + + class rust_log { - rust_srv *_srv; - rust_dom *_dom; - uint32_t _type_bit_mask; - bool _use_colors; - uint32_t _indent; - void trace_ln(char *message); + public: rust_log(rust_srv *srv, rust_dom *dom); virtual ~rust_log(); enum ansi_color { - BLACK, - GRAY, WHITE, RED, LIGHTRED, @@ -51,10 +47,19 @@ public: void indent(); void outdent(); void reset_indent(uint32_t indent); - void trace_ln(uint32_t type_bits, char *message); - void trace_ln(ansi_color color, uint32_t type_bits, char *message); + void trace_ln(char *prefix, char *message); + void trace_ln(rust_task *task, uint32_t type_bits, char *message); + void trace_ln(rust_task *task, ansi_color color, uint32_t type_bits, char *message); bool is_tracing(uint32_t type_bits); - static ansi_color get_type_color(log_type type); + +private: + rust_srv *_srv; + rust_dom *_dom; + uint32_t _type_bit_mask; + bool _use_labels; + bool _use_colors; + uint32_t _indent; + void trace_ln(rust_task *task, char *message); }; -#endif /* RUST_LOG_H_ */ +#endif /* RUST_LOG_H */ diff --git a/src/rt/rust_proxy.h b/src/rt/rust_proxy.h new file mode 100644 index 00000000..8059dd89 --- /dev/null +++ b/src/rt/rust_proxy.h @@ -0,0 +1,31 @@ +/** + * A proxy object is a wrapper around other Rust objects. One use of the proxy + * object is to mitigate access between tasks in different thread domains. + */ + +#ifndef RUST_PROXY_H +#define RUST_PROXY_H + +template <typename T> struct +rust_proxy_delegate : public rc_base<T> { +protected: + T *_delegate; +public: + rust_proxy_delegate(T * delegate) : _delegate(delegate) { + } + T *delegate() { return _delegate; } +}; + +template <typename T> struct +rust_proxy : public rust_proxy_delegate<T>, + public dom_owned<rust_proxy<T> > { +public: + rust_dom *dom; + rust_proxy(rust_dom *dom, T *delegate) : + rust_proxy_delegate<T> (delegate), + dom(dom) { + delegate->ref(); + } +}; + +#endif /* RUST_PROXY_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 7c92c4ca..357edbf1 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -53,6 +53,7 @@ align_down(uintptr_t sp) rust_task::rust_task(rust_dom *dom, rust_task *spawner) : + rust_proxy_delegate<rust_task>(this), stk(new_stk(dom, 0)), runtime_sp(0), rust_sp(stk->limit), @@ -61,20 +62,24 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) : cache(NULL), state(&dom->running_tasks), cond(NULL), - dptr(0), supervisor(spawner), idx(0), waiting_tasks(dom), + rendezvous_ptr(0), alarm(this) { dom->logptr("new task", (uintptr_t)this); + + if (spawner == NULL) { + ref_count = 0; + } } rust_task::~rust_task() { dom->log(rust_log::MEM|rust_log::TASK, "~rust_task 0x%" PRIxPTR ", refcnt=%d", - (uintptr_t)this, refcnt); + (uintptr_t)this, ref_count); /* for (uintptr_t fp = get_fp(); fp; fp = get_previous_fp(fp)) { @@ -98,8 +103,8 @@ rust_task::~rust_task() /* FIXME: tighten this up, there are some more assertions that hold at task-lifecycle events. */ - I(dom, refcnt == 0 || - (refcnt == 1 && this == dom->root_task)); + I(dom, ref_count == 0 || + (ref_count == 1 && this == dom->root_task)); del_stk(dom, stk); if (cache) @@ -275,9 +280,9 @@ rust_task::run_after_return(size_t nargs, uintptr_t glue) uintptr_t *retpc = ((uintptr_t *) sp) - 1; dom->log(rust_log::TASK|rust_log::MEM, - "run_after_return: overwriting retpc=0x%" PRIxPTR - " @ runtime_sp=0x%" PRIxPTR - " with glue=0x%" PRIxPTR, + "run_after_return: overwriting retpc=x%" PRIxPTR + " @ runtime_sp=x%" PRIxPTR + " with glue=x%" PRIxPTR, *retpc, sp, glue); // Move the current return address (which points into rust code) @@ -296,9 +301,9 @@ rust_task::run_on_resume(uintptr_t glue) uintptr_t* rsp = (uintptr_t*) rust_sp; rsp += n_callee_saves; dom->log(rust_log::TASK|rust_log::MEM, - "run_on_resume: overwriting retpc=0x%" PRIxPTR - " @ rust_sp=0x%" PRIxPTR - " with glue=0x%" PRIxPTR, + "run_on_resume: overwriting retpc=x%" PRIxPTR + " @ rust_sp=x%" PRIxPTR + " with glue=x%" PRIxPTR, *rsp, rsp, glue); *rsp = glue; } @@ -306,8 +311,8 @@ rust_task::run_on_resume(uintptr_t glue) void rust_task::yield(size_t nargs) { - dom->log(rust_log::TASK, - "task 0x%" PRIxPTR " yielding", this); + log(rust_log::TASK, + "task 0x%" PRIxPTR " yielding", this); run_after_return(nargs, dom->root_crate->get_yield_glue()); } @@ -322,11 +327,13 @@ rust_task::kill() { // Note the distinction here: kill() is when you're in an upcall // from task A and want to force-fail task B, you do B->kill(). // If you want to fail yourself you do self->fail(upcall_nargs). - dom->log(rust_log::TASK, "killing task 0x%" PRIxPTR, this); + log(rust_log::TASK, "killing task 0x%" PRIxPTR, this); // Unblock the task so it can unwind. unblock(); + if (this == dom->root_task) dom->fail(); + run_on_resume(dom->root_crate->get_unwind_glue()); } @@ -369,9 +376,12 @@ void rust_task::notify_waiting_tasks() { while (waiting_tasks.length() > 0) { - rust_task *t = waiting_tasks.pop()->receiver; - if (!t->dead()) - t->wakeup(this); + log(rust_log::ALL, "notify_waiting_tasks: %d", + waiting_tasks.length()); + rust_task *waiting_task = waiting_tasks.pop()->receiver; + if (!waiting_task->dead()) { + waiting_task->wakeup(this); + } } } @@ -532,6 +542,9 @@ void rust_task::wakeup(rust_cond *from) { transition(&dom->blocked_tasks, &dom->running_tasks); + // TODO: Signaling every time the task is awaken is kind of silly, + // do this a nicer way. + dom->_progress.signal(); I(dom, cond == from); } @@ -565,6 +578,18 @@ rust_task::get_crate_cache(rust_crate const *curr_crate) return cache; } +void +rust_task::log(uint32_t type_bits, char const *fmt, ...) { + char buf[256]; + if (dom->get_log().is_tracing(type_bits)) { + va_list args; + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + dom->get_log().trace_ln(this, type_bits, buf); + va_end(args); + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h new file mode 100644 index 00000000..879cb61a --- /dev/null +++ b/src/rt/rust_task.h @@ -0,0 +1,107 @@ +/* + * + */ + +#ifndef RUST_TASK_H +#define RUST_TASK_H +struct +rust_task : public rust_proxy_delegate<rust_task>, + public dom_owned<rust_task>, + public rust_cond +{ + // Fields known to the compiler. + stk_seg *stk; + uintptr_t runtime_sp; // Runtime sp while task running. + uintptr_t rust_sp; // Saved sp when not running. + gc_alloc *gc_alloc_chain; // Linked list of GC allocations. + rust_dom *dom; + rust_crate_cache *cache; + + // Fields known only to the runtime. + ptr_vec<rust_task> *state; + rust_cond *cond; + rust_task *supervisor; // Parent-link for failure propagation. + size_t idx; + size_t gc_alloc_thresh; + size_t gc_alloc_accum; + + // Wait queue for tasks waiting for this task. + rust_wait_queue waiting_tasks; + + // Rendezvous pointer for receiving data when blocked on a port. If we're + // trying to read data and no data is available on any incoming channel, + // we block on the port, and yield control to the scheduler. Since, we + // were not able to read anJything, we remember the location where the + // result should go in the rendezvous_ptr, and let the sender write to + // that location before waking us up. + uintptr_t* rendezvous_ptr; + + rust_alarm alarm; + + rust_task(rust_dom *dom, + rust_task *spawner); + ~rust_task(); + + void start(uintptr_t exit_task_glue, + uintptr_t spawnee_fn, + uintptr_t args, + size_t callsz); + void grow(size_t n_frame_bytes); + bool running(); + bool blocked(); + bool blocked_on(rust_cond *cond); + bool dead(); + + void link_gc(gc_alloc *gcm); + void unlink_gc(gc_alloc *gcm); + void *malloc(size_t sz, type_desc *td=0); + void *realloc(void *data, size_t sz, bool gc_mem=false); + void free(void *p, bool gc_mem=false); + + const char *state_str(); + void transition(ptr_vec<rust_task> *svec, ptr_vec<rust_task> *dvec); + + void block(rust_cond *on); + void wakeup(rust_cond *from); + void die(); + void unblock(); + + void check_active() { I(dom, dom->curr_task == this); } + void check_suspended() { I(dom, dom->curr_task != this); } + + void log(uint32_t type_bits, char const *fmt, ...); + + // Swap in some glue code to run when we have returned to the + // task's context (assuming we're the active task). + void run_after_return(size_t nargs, uintptr_t glue); + + // Swap in some glue code to run when we're next activated + // (assuming we're the suspended task). + void run_on_resume(uintptr_t glue); + + // Save callee-saved registers and return to the main loop. + void yield(size_t nargs); + + // Fail this task (assuming caller-on-stack is different task). + void kill(); + + // Fail self, assuming caller-on-stack is this task. + void fail(size_t nargs); + + // Run the gc glue on the task stack. + void gc(size_t nargs); + + // Disconnect from our supervisor. + void unsupervise(); + + // Notify tasks waiting for us that we are about to die. + void notify_waiting_tasks(); + + uintptr_t get_fp(); + uintptr_t get_previous_fp(uintptr_t fp); + frame_glue_fns *get_frame_glue_fns(uintptr_t fp); + rust_crate_cache * get_crate_cache(rust_crate const *curr_crate); +}; + + +#endif /* RUST_TASK_H */ diff --git a/src/rt/rust_timer.cpp b/src/rt/rust_timer.cpp index fdee3075..269942f5 100644 --- a/src/rt/rust_timer.cpp +++ b/src/rt/rust_timer.cpp @@ -1,4 +1,3 @@ - #include "rust_internal.h" #include "valgrind.h" @@ -27,12 +26,11 @@ static void * #else #error "Platform not supported" #endif -timer_loop(void *ptr) -{ +timer_loop(void *ptr) { // We were handed the rust_timer that owns us. rust_timer *timer = (rust_timer *)ptr; - rust_dom &dom = timer->dom; - dom.log(rust_log::TIMER, "in timer 0x%" PRIxPTR, (uintptr_t)timer); + rust_dom *dom = timer->dom; + dom->log(rust_log::TIMER, "in timer 0x%" PRIxPTR, (uintptr_t)timer); size_t ms = TIME_SLICE_IN_MS; if (!RUNNING_ON_VALGRIND) ms = 1; @@ -43,12 +41,10 @@ timer_loop(void *ptr) #else usleep(ms * 1000); #endif - dom.log(rust_log::TIMER, - "timer 0x%" PRIxPTR - " interrupting domain 0x%" PRIxPTR, - (uintptr_t)timer, - (uintptr_t)&dom); - dom.interrupt_flag = 1; + dom->log(rust_log::TIMER, "timer 0x%" PRIxPTR + " interrupting domain 0x%" PRIxPTR, (uintptr_t) timer, + (uintptr_t) dom); + dom->interrupt_flag = 1; } #if defined(__WIN32__) ExitThread(0); @@ -58,10 +54,9 @@ timer_loop(void *ptr) return 0; } - -rust_timer::rust_timer(rust_dom &dom) : dom(dom), exit_flag(0) -{ - dom.log(rust_log::TIMER, "creating timer for domain 0x%" PRIxPTR, &dom); +rust_timer::rust_timer(rust_dom *dom) : + dom(dom), exit_flag(0) { + dom->log(rust_log::TIMER, "creating timer for domain 0x%" PRIxPTR, dom); #if defined(__WIN32__) thread = CreateThread(NULL, 0, timer_loop, this, 0, NULL); dom.win32_require("CreateThread", thread != NULL); @@ -76,13 +71,11 @@ rust_timer::rust_timer(rust_dom &dom) : dom(dom), exit_flag(0) #endif } -rust_timer::~rust_timer() -{ +rust_timer::~rust_timer() { exit_flag = 1; #if defined(__WIN32__) - dom.win32_require("WaitForSingleObject", - WaitForSingleObject(thread, INFINITE) - == WAIT_OBJECT_0); + dom->win32_require("WaitForSingleObject", + WaitForSingleObject(thread, INFINITE) == WAIT_OBJECT_0); #else pthread_join(thread, NULL); #endif diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 99df91a6..1aaf89fb 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -1,127 +1,120 @@ - #include "rust_internal.h" - // Upcalls. #ifdef __GNUC__ #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ - (task)->dom->log(rust_log::UPCALL, \ - "upcall task: 0x%" PRIxPTR \ - " retpc: 0x%" PRIxPTR, \ - (task), __builtin_return_address(0)); \ + (task)->log(rust_log::UPCALL, \ + "> UPCALL %s - task: 0x%" PRIxPTR \ + " retpc: x%" PRIxPTR, \ + __FUNCTION__, \ + (task), __builtin_return_address(0)); \ (task)->dom->get_log().indent(); #else #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ - (task)->dom->log(rust_log::UPCALL, \ - "upcall task: 0x%" PRIxPTR (task)); \ + (task)->log(rust_log::UPCALL, \ + "> UPCALL task: x%" PRIxPTR (task)); \ (task)->dom->get_log().indent(); #endif extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); -extern "C" void -upcall_grow_task(rust_task *task, size_t n_frame_bytes) -{ +inline bool +requires_message_passing(rust_task *sender, rust_task *receiver) { + return sender->dom != receiver->dom; +} + +extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); } -extern "C" CDECL void -upcall_log_int(rust_task *task, int32_t i) -{ +extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::ULOG, - "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", - i, i, (char)i); + task->log(rust_log::UPCALL | rust_log::ULOG, + "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i, + (char) i); } -extern "C" CDECL void -upcall_log_str(rust_task *task, rust_str *str) -{ +extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); const char *c = str_buf(task, str); - task->dom->log(rust_log::UPCALL|rust_log::ULOG, - "upcall log_str(\"%s\")", - c); + task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c); } -extern "C" CDECL void -upcall_trace_word(rust_task *task, uintptr_t i) -{ +extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::TRACE, - "trace: 0x%" PRIxPTR "", - i, i, (char)i); + task->log(rust_log::UPCALL | rust_log::TRACE, "trace: 0x%" PRIxPTR "", i, + i, (char) i); } -extern "C" CDECL void -upcall_trace_str(rust_task *task, char const *c) -{ +extern "C" CDECL void upcall_trace_str(rust_task *task, char const *c) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::TRACE, - "trace: %s", - c); + task->log(rust_log::UPCALL | rust_log::TRACE, "trace: %s", c); } extern "C" CDECL rust_port* -upcall_new_port(rust_task *task, size_t unit_sz) -{ +upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)", - (uintptr_t)task, unit_sz); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)", + (uintptr_t) task, unit_sz); return new (dom) rust_port(task, unit_sz); } -extern "C" CDECL void -upcall_del_port(rust_task *task, rust_port *port) -{ +extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall del_port(0x%" PRIxPTR ")", (uintptr_t)port); - I(task->dom, !port->refcnt); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); + I(task->dom, !port->ref_count); delete port; } +/** + * Creates a new channel, pointed to a specified port. + */ extern "C" CDECL rust_chan* -upcall_new_chan(rust_task *task, rust_port *port) -{ +upcall_new_chan(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", - (uintptr_t)task, port); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", + (uintptr_t) task, port); I(dom, port); return new (dom) rust_chan(task, port); } -extern "C" CDECL void -upcall_del_chan(rust_task *task, rust_chan *chan) -{ +/** + * Called whenever the channel's ref count drops to zero. + */ +extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t)chan); - I(dom, !chan->refcnt); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); + I(dom, !chan->ref_count); delete chan; } +/** + * Clones a channel and stores it in the spawnee's domain. Each spawned task + * has it's own copy of the channel. + */ extern "C" CDECL rust_chan * -upcall_clone_chan(rust_task *task, rust_task *owner, rust_chan *chan) -{ +upcall_clone_chan(rust_task *task, + rust_proxy_delegate<rust_task> *spawnee_proxy, + rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM, - "upcall clone_chan(owner 0x%" PRIxPTR ", chan 0x%" PRIxPTR ")", - (uintptr_t)owner, (uintptr_t)chan); - return new (owner->dom) rust_chan(owner, chan->port); + rust_task *spawnee = spawnee_proxy->delegate(); + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, + "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, + (uintptr_t) spawnee, (uintptr_t) chan); + return new (spawnee->dom) rust_chan(spawnee, chan->port); } - /* * Buffering protocol: * @@ -143,74 +136,64 @@ upcall_clone_chan(rust_task *task, rust_task *owner, rust_chan *chan) * - Set blocked writer to running * */ +// +//static int +//attempt_transmission(rust_dom *dom, rust_chan *src, rust_task *dst) { +// I(dom, src); +// I(dom, dst); +// +// rust_port *port = src->port; +// if (!port) { +// dom->log(rust_log::COMM, "src died, transmission incomplete"); +// return 0; +// } +// +// circular_buffer *buf = &src->buffer; +// if (buf->is_empty()) { +// dom->log(rust_log::COMM, "buffer empty, transmission incomplete"); +// return 0; +// } +// +// if (!dst->blocked_on(port)) { +// dom->log(rust_log::COMM, +// "dst in non-reading state, transmission incomplete"); +// return 0; +// } +// +// uintptr_t *dptr = dst->dptr; +// dom->log(rust_log::COMM, "receiving %d bytes into dst_task=0x%" PRIxPTR +// ", dptr=0x%" PRIxPTR, port->unit_sz, dst, dptr); +// buf->dequeue(dptr); +// +// // Wake up the sender if its waiting for the send operation. +// rust_task *sender = src->task; +// rust_token *token = &src->token; +// if (sender->blocked_on(token)) +// sender->wakeup(token); +// +// // Wake up the receiver, there is new data. +// dst->wakeup(port); +// +// dom->log(rust_log::COMM, "transmission complete"); +// return 1; +//} -static int -attempt_transmission(rust_dom *dom, - rust_chan *src, - rust_task *dst) -{ - I(dom, src); - I(dom, dst); - - rust_port *port = src->port; - if (!port) { - dom->log(rust_log::COMM, - "src died, transmission incomplete"); - return 0; - } - - circ_buf *buf = &src->buffer; - if (buf->unread == 0) { - dom->log(rust_log::COMM, - "buffer empty, transmission incomplete"); - return 0; - } - - if (!dst->blocked_on(port)) { - dom->log(rust_log::COMM, - "dst in non-reading state, transmission incomplete"); - return 0; - } - - uintptr_t *dptr = dst->dptr; - dom->log(rust_log::COMM, - "receiving %d bytes into dst_task=0x%" PRIxPTR - ", dptr=0x%" PRIxPTR, - port->unit_sz, dst, dptr); - buf->shift(dptr); - - // Wake up the sender if its waiting for the send operation. - rust_task *sender = src->task; - rust_token *token = &src->token; - if (sender->blocked_on(token)) - sender->wakeup(token); - - // Wake up the receiver, there is new data. - dst->wakeup(port); - - dom->log(rust_log::COMM, "transmission complete"); - return 1; -} - -extern "C" CDECL void -upcall_yield(rust_task *task) -{ +extern "C" CDECL void upcall_yield(rust_task *task) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, "upcall yield()"); + task->log(rust_log::UPCALL | rust_log::COMM, "upcall yield()"); task->yield(1); } -extern "C" CDECL void -upcall_join(rust_task *task, rust_task *other) -{ +extern "C" CDECL void upcall_join(rust_task *task, + rust_proxy_delegate<rust_task> *proxy) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, - "upcall join(other=0x%" PRIxPTR ")", - (uintptr_t)other); + task->log(rust_log::UPCALL | rust_log::COMM, + "join proxy 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, + proxy, proxy->delegate()); - // If the other task is already dying, we dont have to wait for it. + rust_task *other = proxy->delegate(); + + // If the other task is already dying, we don't have to wait for it. if (!other->dead()) { other->waiting_tasks.push(&task->alarm); task->block(other); @@ -218,106 +201,91 @@ upcall_join(rust_task *task, rust_task *other) } } +/** + * Sends an chunk of data along the specified channel. + * + * sptr: pointer to a chunk of data to send + */ extern "C" CDECL void -upcall_send(rust_task *task, rust_chan *chan, void *sptr) -{ +upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, - "upcall send(chan=0x%" PRIxPTR ", sptr=0x%" PRIxPTR ")", - (uintptr_t)chan, - (uintptr_t)sptr); + task->log(rust_log::UPCALL | rust_log::COMM, + "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", + (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz); - I(dom, chan); - I(dom, sptr); - - rust_port *port = chan->port; - dom->log(rust_log::MEM|rust_log::COMM, - "send to port", (uintptr_t)port); - I(dom, port); - - rust_token *token = &chan->token; - dom->log(rust_log::MEM|rust_log::COMM, - "sending via token 0x%" PRIxPTR, - (uintptr_t)token); - - if (port->task) { - chan->buffer.push(sptr); - task->block(token); - attempt_transmission(dom, chan, port->task); - if (chan->buffer.unread && !token->pending()) - token->submit(); - } else { - dom->log(rust_log::COMM|rust_log::ERR, - "port has no task (possibly throw?)"); - } - - if (!task->running()) - task->yield(3); + chan->buffer.enqueue(sptr); + chan->transmit(); + task->log(rust_log::COMM, "=== WROTE DATA ===>"); } extern "C" CDECL void -upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) -{ +upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::COMM, - "upcall recv(dptr=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", - (uintptr_t)dptr, - (uintptr_t)port); + task->log(rust_log::UPCALL | rust_log::COMM, + "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); + + for (uint32_t i = 0; i < port->chans.length(); i++) { + rust_chan *chan = port->chans[i]; + if (chan->buffer.is_empty() == false) { + chan->buffer.dequeue(dptr); + task->log(rust_log::COMM, "<=== READ DATA ==="); + return; + } + } - I(dom, port); - I(dom, port->task); - I(dom, task); - I(dom, port->task == task); + // No data was buffered on any incoming channel, so block this task + // on the port. Remember the rendezvous location so that any sender + // task can write to it before waking up this task. + task->rendezvous_ptr = dptr; task->block(port); - - if (port->writers.length() > 0) { - I(dom, task->dom); - size_t i = rand(&dom->rctx); - i %= port->writers.length(); - rust_token *token = port->writers[i]; - rust_chan *chan = token->chan; - if (attempt_transmission(dom, chan, task)) - token->withdraw(); - } else { - dom->log(rust_log::COMM, - "no writers sending to port", (uintptr_t)port); - } - - if (!task->running()) { - task->dptr = dptr; - task->yield(3); - } + task->yield(3); } -extern "C" CDECL void -upcall_fail(rust_task *task, char const *expr, char const *file, size_t line) -{ +extern "C" CDECL void upcall_fail(rust_task *task, char const *expr, + char const *file, size_t line) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::ERR, - "upcall fail '%s', %s:%" PRIdPTR, - expr, file, line); + task->log(rust_log::UPCALL | rust_log::ERR, + "upcall fail '%s', %s:%" PRIdPTR, expr, file, line); task->fail(4); } +/** + * Called whenever a task's ref count drops to zero. + */ extern "C" CDECL void -upcall_kill(rust_task *task, rust_task *target) -{ +upcall_kill(rust_task *task, rust_proxy_delegate<rust_task> *target_proxy) { LOG_UPCALL_ENTRY(task); - task->dom->log(rust_log::UPCALL|rust_log::TASK, - "upcall kill target=0x%" PRIxPTR, target); - target->kill(); + rust_task *target_task = target_proxy->delegate(); + if (target_proxy != target_task) { + task->dom->free(target_proxy); + } + task->log(rust_log::UPCALL | rust_log::TASK, + "kill task 0x%" PRIxPTR ", ref count %d", + target_task, + target_task->ref_count); + + if (requires_message_passing(task, target_task)) { + rust_dom *target_domain = target_task->dom; + target_domain->send_message( + new (target_domain) + kill_task_message(target_domain, target_task)); + } else { + target_task->kill(); + } } +/** + * Called by the exit glue when the task terminates. + */ extern "C" CDECL void -upcall_exit(rust_task *task) -{ +upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); - - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::TASK, "upcall exit"); + task->log(rust_log::UPCALL | rust_log::TASK, + "task ref_count: %d", task->ref_count); task->die(); task->notify_waiting_tasks(); task->yield(1); @@ -341,11 +309,13 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) return (uintptr_t) p; } +/** + * Called whenever an object's ref count drops to zero. + */ extern "C" CDECL void upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; dom->log(rust_log::UPCALL|rust_log::MEM, "upcall free(0x%" PRIxPTR ")", @@ -371,22 +341,19 @@ upcall_mark(rust_task *task, void* ptr) } extern "C" CDECL rust_str * -upcall_new_str(rust_task *task, char const *s, size_t fill) -{ +upcall_new_str(rust_task *task, char const *s, size_t fill) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_str('%s', %" PRIdPTR ")", s, fill); size_t alloc = next_power_of_two(sizeof(rust_str) + fill); void *mem = dom->malloc(alloc); if (!mem) { task->fail(3); return NULL; } - rust_str *st = new (mem) rust_str(dom, alloc, fill, (uint8_t const *)s); - dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_str('%s', %" PRIdPTR ") = 0x%" PRIxPTR, - s, fill, st); + rust_str *st = new (mem) rust_str(dom, alloc, fill, (uint8_t const *) s); + task->log(rust_log::UPCALL | rust_log::MEM, + "upcall new_str('%s', %" PRIdPTR ") = 0x%" PRIxPTR, + s, fill, st); return st; } @@ -405,34 +372,32 @@ upcall_new_vec(rust_task *task, size_t fill, type_desc *td) return NULL; } rust_vec *v = new (mem) rust_vec(dom, alloc, 0, NULL); - dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_vec(%" PRIdPTR ") = 0x%" PRIxPTR, - fill, v); + task->log(rust_log::UPCALL | rust_log::MEM, + "upcall new_vec(%" PRIdPTR ") = 0x%" PRIxPTR, fill, v); return v; } - extern "C" CDECL rust_str * upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::MEM, + task->log(rust_log::UPCALL|rust_log::MEM, "upcall vec_grow(%" PRIxPTR ", %" PRIdPTR "), alloc=%" PRIdPTR ", fill=%" PRIdPTR, v, n_bytes, v->alloc, v->fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + v->fill + n_bytes); - if (v->refcnt == 1) { + if (v->ref_count == 1) { // Fastest path: already large enough. if (v->alloc >= alloc) { - dom->log(rust_log::UPCALL|rust_log::MEM, "no-growth path"); + task->log(rust_log::UPCALL | rust_log::MEM, "no-growth path"); return v; } // Second-fastest path: can at least realloc. - dom->log(rust_log::UPCALL|rust_log::MEM, "realloc path"); - v = (rust_vec*)dom->realloc(v, alloc); + task->log(rust_log::UPCALL | rust_log::MEM, "realloc path"); + v = (rust_vec*) dom->realloc(v, alloc); if (!v) { task->fail(4); return NULL; @@ -441,7 +406,7 @@ upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc) } else { // Slowest path: make a new vec. - dom->log(rust_log::UPCALL|rust_log::MEM, "new vec path"); + task->log(rust_log::UPCALL | rust_log::MEM, "new vec path"); void *mem = dom->malloc(alloc); if (!mem) { task->fail(4); @@ -454,121 +419,100 @@ upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc) return v; } - static rust_crate_cache::c_sym * -fetch_c_sym(rust_task *task, - rust_crate const *curr_crate, - size_t lib_num, - size_t c_sym_num, - char const *library, - char const *symbol) -{ +fetch_c_sym(rust_task *task, rust_crate const *curr_crate, size_t lib_num, + size_t c_sym_num, char const *library, char const *symbol) { rust_crate_cache *cache = task->get_crate_cache(curr_crate); rust_crate_cache::lib *l = cache->get_lib(lib_num, library); return cache->get_c_sym(c_sym_num, l, symbol); } -extern "C" CDECL uintptr_t -upcall_require_rust_sym(rust_task *task, - rust_crate const *curr_crate, - size_t lib_num, // # of lib - size_t c_sym_num, // # of C sym "rust_crate" in lib - size_t rust_sym_num, // # of rust sym - char const *library, - char const **path) -{ +extern "C" CDECL uintptr_t upcall_require_rust_sym(rust_task *task, + rust_crate const *curr_crate, size_t lib_num, // # of lib + size_t c_sym_num, // # of C sym "rust_crate" in lib + size_t rust_sym_num, // # of rust sym + char const *library, char const **path) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::CACHE, - "upcall require rust sym: lib #%" PRIdPTR - " = %s, c_sym #%" PRIdPTR - ", rust_sym #%" PRIdPTR - ", curr_crate = 0x%" PRIxPTR, - lib_num, library, c_sym_num, rust_sym_num, - curr_crate); + task->log(rust_log::UPCALL | rust_log::CACHE, + "upcall require rust sym: lib #%" PRIdPTR + " = %s, c_sym #%" PRIdPTR + ", rust_sym #%" PRIdPTR + ", curr_crate = 0x%" PRIxPTR, lib_num, library, c_sym_num, + rust_sym_num, curr_crate); for (char const **c = crate_rel(curr_crate, path); *c; ++c) { - dom->log(rust_log::UPCALL, " + %s", crate_rel(curr_crate, *c)); + task->log(rust_log::UPCALL, " + %s", crate_rel(curr_crate, *c)); } - dom->log(rust_log::UPCALL|rust_log::CACHE, - "require C symbol 'rust_crate' from lib #%" PRIdPTR,lib_num); + task->log(rust_log::UPCALL | rust_log::CACHE, + "require C symbol 'rust_crate' from lib #%" PRIdPTR, lib_num); rust_crate_cache::c_sym *c = - fetch_c_sym(task, curr_crate, lib_num, c_sym_num, - library, "rust_crate"); + fetch_c_sym(task, curr_crate, lib_num, c_sym_num, library, + "rust_crate"); - dom->log(rust_log::UPCALL|rust_log::CACHE, - "require rust symbol inside crate"); - rust_crate_cache::rust_sym *s = - task->cache->get_rust_sym(rust_sym_num, dom, curr_crate, c, path); + task->log(rust_log::UPCALL | rust_log::CACHE, + "require rust symbol inside crate"); + rust_crate_cache::rust_sym *s = task->cache->get_rust_sym(rust_sym_num, + dom, + curr_crate, c, + path); uintptr_t addr = s->get_val(); if (addr) { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "found-or-cached addr: 0x%" PRIxPTR, addr); + task->log(rust_log::UPCALL | rust_log::CACHE, + "found-or-cached addr: 0x%" PRIxPTR, addr); } else { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "failed to resolve symbol"); + task->log(rust_log::UPCALL | rust_log::CACHE, + "failed to resolve symbol"); task->fail(7); } return addr; } -extern "C" CDECL uintptr_t -upcall_require_c_sym(rust_task *task, - rust_crate const *curr_crate, - size_t lib_num, // # of lib - size_t c_sym_num, // # of C sym - char const *library, - char const *symbol) -{ +extern "C" CDECL uintptr_t upcall_require_c_sym(rust_task *task, + rust_crate const *curr_crate, size_t lib_num, // # of lib + size_t c_sym_num, // # of C sym + char const *library, char const *symbol) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::CACHE, - "upcall require c sym: lib #%" PRIdPTR - " = %s, c_sym #%" PRIdPTR - " = %s" - ", curr_crate = 0x%" PRIxPTR, - lib_num, library, c_sym_num, symbol, curr_crate); + task->log(rust_log::UPCALL | rust_log::CACHE, + "upcall require c sym: lib #%" PRIdPTR + " = %s, c_sym #%" PRIdPTR + " = %s" + ", curr_crate = 0x%" PRIxPTR, lib_num, library, c_sym_num, + symbol, curr_crate); - rust_crate_cache::c_sym *c = - fetch_c_sym(task, curr_crate, lib_num, c_sym_num, library, symbol); + rust_crate_cache::c_sym *c = fetch_c_sym(task, curr_crate, lib_num, + c_sym_num, library, symbol); uintptr_t addr = c->get_val(); if (addr) { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "found-or-cached addr: 0x%" PRIxPTR, addr); + task->log(rust_log::UPCALL | rust_log::CACHE, + "found-or-cached addr: 0x%" PRIxPTR, addr); } else { - dom->log(rust_log::UPCALL|rust_log::CACHE, - "failed to resolve symbol"); + task->log(rust_log::UPCALL | rust_log::CACHE, + "failed to resolve symbol"); task->fail(6); } return addr; } extern "C" CDECL type_desc * -upcall_get_type_desc(rust_task *task, - rust_crate const *curr_crate, - size_t size, - size_t align, - size_t n_descs, - type_desc const **descs) -{ +upcall_get_type_desc(rust_task *task, rust_crate const *curr_crate, + size_t size, size_t align, size_t n_descs, type_desc const **descs) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; - dom->log(rust_log::UPCALL|rust_log::CACHE, - "upcall get_type_desc with size=%" PRIdPTR - ", align=%" PRIdPTR ", %" PRIdPTR " descs", - size, align, n_descs); + task->log(rust_log::UPCALL | rust_log::CACHE, + "upcall get_type_desc with size=%" PRIdPTR + ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, + n_descs); rust_crate_cache *cache = task->get_crate_cache(curr_crate); type_desc *td = cache->get_type_desc(size, align, n_descs, descs); - dom->log(rust_log::UPCALL|rust_log::CACHE, - "returning tydesc 0x%" PRIxPTR, td); + task->log(rust_log::UPCALL | rust_log::CACHE, + "returning tydesc 0x%" PRIxPTR, td); return td; } - #if defined(__WIN32__) static DWORD WINAPI rust_thread_start(void *ptr) #elif defined(__GNUC__) @@ -578,10 +522,10 @@ static void *rust_thread_start(void *ptr) #endif { // We were handed the domain we are supposed to run. - rust_dom *dom = (rust_dom *)ptr; + rust_dom *dom = (rust_dom *) ptr; // Start a new rust main loop for this thread. - rust_main_loop(dom); + dom->start_main_loop(); rust_srv *srv = dom->srv; delete dom; @@ -591,81 +535,77 @@ static void *rust_thread_start(void *ptr) } extern "C" CDECL rust_task * -upcall_new_task(rust_task *spawner) -{ +upcall_new_task(rust_task *spawner) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; rust_task *task = new (dom) rust_task(dom, spawner); - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK, + dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, "upcall new_task(spawner 0x%" PRIxPTR ") = 0x%" PRIxPTR, spawner, task); return task; } extern "C" CDECL rust_task * -upcall_start_task(rust_task *spawner, - rust_task *task, - uintptr_t exit_task_glue, - uintptr_t spawnee_fn, - size_t callsz) -{ +upcall_start_task(rust_task *spawner, rust_task *task, + uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK, + dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, "upcall start_task(task 0x%" PRIxPTR " exit_task_glue 0x%" PRIxPTR ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", - task, exit_task_glue, spawnee_fn, callsz); + ", callsz %" PRIdPTR ")", task, exit_task_glue, spawnee_fn, + callsz); task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); return task; } -extern "C" CDECL rust_task * -upcall_new_thread(rust_task *task) -{ +extern "C" CDECL rust_proxy_delegate<rust_task> * +upcall_new_thread(rust_task *task) { LOG_UPCALL_ENTRY(task); rust_dom *old_dom = task->dom; rust_dom *new_dom = new rust_dom(old_dom->srv->clone(), old_dom->root_crate); - new_dom->log(rust_log::UPCALL|rust_log::MEM, - "upcall new_thread() = 0x%" PRIxPTR, - new_dom->root_task); - return new_dom->root_task; + + task->log(rust_log::UPCALL | rust_log::MEM, + "upcall new_thread() = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, + new_dom, new_dom->root_task); + rust_proxy<rust_task> *proxy = + new (old_dom) rust_proxy<rust_task>(old_dom, new_dom->root_task); + task->log(rust_log::UPCALL | rust_log::MEM, + "new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, + proxy, proxy->delegate()); + return proxy; } -extern "C" CDECL rust_task * +extern "C" CDECL rust_proxy_delegate<rust_task> * upcall_start_thread(rust_task *spawner, - rust_task *root_task, - uintptr_t exit_task_glue, - uintptr_t spawnee_fn, - size_t callsz) -{ + rust_proxy_delegate<rust_task> *root_task_proxy, + uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; - dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK, + rust_task *root_task = root_task_proxy->delegate(); + dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, "upcall start_thread(exit_task_glue 0x%" PRIxPTR ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", - exit_task_glue, spawnee_fn, callsz); + ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz); root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); #if defined(__WIN32__) HANDLE thread; thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom, - 0, NULL); + 0, NULL); dom->win32_require("CreateThread", thread != NULL); #else pthread_t thread; pthread_create(&thread, &dom->attr, rust_thread_start, - (void *)root_task->dom); + (void *) root_task->dom); #endif - - return 0; + return root_task_proxy; } // diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index 6f34dad9..62ac7de2 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -5,7 +5,7 @@ template <typename T> rc_base<T>::rc_base() : - refcnt(1) + ref_count(1) { } @@ -85,7 +85,7 @@ ptr_vec<T>::trim(size_t sz) template <typename T> void -ptr_vec<T>::swapdel(T *item) +ptr_vec<T>::swap_delete(T *item) { /* Swap the endpoint into i and decr fill. */ I(dom, data); diff --git a/src/rt/sync/condition_variable.cpp b/src/rt/sync/condition_variable.cpp new file mode 100644 index 00000000..d4032572 --- /dev/null +++ b/src/rt/sync/condition_variable.cpp @@ -0,0 +1,66 @@ +#include "../globals.h" + +/* + * Conditional variable. Implemented using pthreads condition variables, and + * using events on windows. + */ + +#include "condition_variable.h" + +// #define TRACE + +condition_variable::condition_variable() { +#if defined(__WIN32__) + _event = CreateEvent(NULL, FALSE, FALSE, NULL); +#else + pthread_cond_init(&_cond, NULL); + pthread_mutex_init(&_mutex, NULL); +#endif +} + +condition_variable::~condition_variable() { +#if defined(__WIN32__) + CloseHandle(_event); +#else + pthread_cond_destroy(&_cond); + pthread_mutex_destroy(&_mutex); +#endif +} + +/** + * Wait indefinitely until condition is signaled. + */ +void condition_variable::wait() { +#ifdef TRACE + printf("waiting on condition_variable: 0x%" PRIxPTR "\n", + (uintptr_t)this); +#endif +#if defined(__WIN32__) + WaitForSingleObject(_event, INFINITE); +#else + pthread_mutex_lock(&_mutex); + pthread_cond_wait(&_cond, &_mutex); + pthread_mutex_unlock(&_mutex); +#endif +#ifdef TRACE + printf("resumed on condition_variable: 0x%" PRIxPTR "\n", + (uintptr_t)this); +#endif +} + +/** + * Signal condition, and resume the waiting thread. + */ +void condition_variable::signal() { +#if defined(__WIN32__) + SetEvent(_event); +#else + pthread_mutex_lock(&_mutex); + pthread_cond_signal(&_cond); + pthread_mutex_unlock(&_mutex); +#endif +#ifdef TRACE + printf("signal condition_variable: 0x%" PRIxPTR "\n", + (uintptr_t)this); +#endif +} diff --git a/src/rt/sync/condition_variable.h b/src/rt/sync/condition_variable.h new file mode 100644 index 00000000..f847ef99 --- /dev/null +++ b/src/rt/sync/condition_variable.h @@ -0,0 +1,19 @@ +#ifndef CONDITION_VARIABLE_H +#define CONDITION_VARIABLE_H + +class condition_variable { +#if defined(__WIN32__) + HANDLE _event; +#else + pthread_cond_t _cond; + pthread_mutex_t _mutex; +#endif +public: + condition_variable(); + virtual ~condition_variable(); + + void wait(); + void signal(); +}; + +#endif /* CONDITION_VARIABLE_H */ diff --git a/src/rt/sync/lock_free_queue.cpp b/src/rt/sync/lock_free_queue.cpp index 9d1081de..69241ece 100644 --- a/src/rt/sync/lock_free_queue.cpp +++ b/src/rt/sync/lock_free_queue.cpp @@ -5,33 +5,46 @@ * dequeue() is not allowed to interrupt itself. */ +#include "../globals.h" #include "lock_free_queue.h" -lock_free_queue::lock_free_queue() : - tail(this) { +lock_free_queue_node::lock_free_queue_node() : next(NULL) { + +} + +lock_free_queue::lock_free_queue() : _tail(this) { + } -void lock_free_queue::enqueue(lock_free_queue_node *item) { - item->next = (lock_free_queue_node *) 0; - lock_free_queue_node *last = tail; - tail = item; - while (last->next) +void +lock_free_queue::enqueue(lock_free_queue_node *item) { + item->next = (lock_free_queue_node *) NULL; + lock_free_queue_node *last = _tail; + _tail = item; + while (last->next) { last = last->next; + } last->next = item; } -lock_free_queue_node *lockfree_queue::dequeue() { +lock_free_queue_node * +lock_free_queue::dequeue() { lock_free_queue_node *item = next; if (item && !(next = item->next)) { - tail = (lock_free_queue_node *) this; + _tail = (lock_free_queue_node *) this; if (item->next) { lock_free_queue_node *lost = item->next; lock_free_queue_node *help; do { help = lost->next; enqueue(lost); - } while ((lost = help) != (lock_free_queue_node *) 0); + } while ((lost = help) != (lock_free_queue_node *) NULL); } } return item; } + +bool +lock_free_queue::is_empty() { + return next == NULL; +} diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h index fba4aa9a..1f09ec52 100644 --- a/src/rt/sync/lock_free_queue.h +++ b/src/rt/sync/lock_free_queue.h @@ -2,14 +2,18 @@ #define LOCK_FREE_QUEUE_H class lock_free_queue_node { +public: lock_free_queue_node *next; + lock_free_queue_node(); }; -class lock_free_queue { +class lock_free_queue : lock_free_queue_node { + lock_free_queue_node *_tail; public: lock_free_queue(); void enqueue(lock_free_queue_node *item); lock_free_queue_node *dequeue(); + bool is_empty(); }; #endif /* LOCK_FREE_QUEUE_H */ diff --git a/src/rt/sync/spin_lock.cpp b/src/rt/sync/spin_lock.cpp index 11a5cb20..4a113d1a 100644 --- a/src/rt/sync/spin_lock.cpp +++ b/src/rt/sync/spin_lock.cpp @@ -1,9 +1,10 @@ +#include "../globals.h" +#include "spin_lock.h" + /* * Your average spin lock. */ -#include "globals.h" - // #define TRACE spin_lock::spin_lock() { diff --git a/src/rt/sync/spin_lock.h b/src/rt/sync/spin_lock.h index 3684c23a..f15416a2 100644 --- a/src/rt/sync/spin_lock.h +++ b/src/rt/sync/spin_lock.h @@ -1,5 +1,5 @@ -#ifndef UNFAIR_TICKET_LOCK_H -#define UNFAIR_TICKET_LOCK_H +#ifndef SPIN_LOCK_H +#define SPIN_LOCK_H class spin_lock { unsigned ticket; @@ -11,4 +11,4 @@ public: void unlock(); }; -#endif /* UNFAIR_TICKET_LOCK_H */ +#endif /* SPIN_LOCK_H */ diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index 0d112575..04fd833f 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -13,38 +13,44 @@ public: array_list(); ~array_list(); size_t size(); - void append(T value); + int32_t append(T value); T replace(T old_value, T new_value); - size_t index_of(T value); + int32_t index_of(T value); T & operator[](size_t index); }; -template<typename T> array_list<T>::array_list() { +template<typename T> +array_list<T>::array_list() { _capacity = INITIAL_CAPACITY; _data = (T *) malloc(sizeof(T) * _capacity); } -template<typename T> array_list<T>::~array_list() { +template<typename T> +array_list<T>::~array_list() { delete _data; } -template<typename T> size_t array_list<T>::size() { +template<typename T> size_t +array_list<T>::size() { return _size; } -template<typename T> void array_list<T>::append(T value) { +template<typename T> int32_t +array_list<T>::append(T value) { if (_size == _capacity) { _capacity = _capacity * 2; _data = (T *) realloc(_data, _capacity * sizeof(T)); } - _data[_size++] = value; + _data[_size ++] = value; + return _size - 1; } /** * Replaces the old_value in the list with the new_value. * Returns the old_value if the replacement succeeded, or NULL otherwise. */ -template<typename T> T array_list<T>::replace(T old_value, T new_value) { +template<typename T> T +array_list<T>::replace(T old_value, T new_value) { int index = index_of(old_value); if (index < 0) { return NULL; @@ -53,7 +59,8 @@ template<typename T> T array_list<T>::replace(T old_value, T new_value) { return old_value; } -template<typename T> size_t array_list<T>::index_of(T value) { +template<typename T> int32_t +array_list<T>::index_of(T value) { for (size_t i = 0; i < _size; i++) { if (_data[i] == value) { return i; @@ -62,7 +69,8 @@ template<typename T> size_t array_list<T>::index_of(T value) { return -1; } -template<typename T> T & array_list<T>::operator[](size_t index) { +template<typename T> T & +array_list<T>::operator[](size_t index) { return _data[index]; } diff --git a/src/test/run-pass/task-comm-0.rs b/src/test/run-pass/task-comm-0.rs new file mode 100644 index 00000000..5992ba5c --- /dev/null +++ b/src/test/run-pass/task-comm-0.rs @@ -0,0 +1,19 @@ +io fn main() -> () { + test05(); +} + +io fn test05_start(chan[int] ch) { + ch <| 10; + ch <| 20; + ch <| 30; +} + +io fn test05() { + let port[int] po = port(); + let chan[int] ch = chan(po); + spawn test05_start(chan(po)); + let int value <- po; + value <- po; + value <- po; + log value; +} diff --git a/src/test/run-pass/task-comm-1.rs b/src/test/run-pass/task-comm-1.rs new file mode 100644 index 00000000..48983b71 --- /dev/null +++ b/src/test/run-pass/task-comm-1.rs @@ -0,0 +1,13 @@ +fn main() -> () { + test00(); +} + +fn start() { + log "Started / Finished Task."; +} + +fn test00() { + let task t = spawn thread start(); + join t; + log "Completing."; +}
\ No newline at end of file diff --git a/src/test/run-pass/task-comm-2.rs b/src/test/run-pass/task-comm-2.rs new file mode 100644 index 00000000..9151c7b1 --- /dev/null +++ b/src/test/run-pass/task-comm-2.rs @@ -0,0 +1,34 @@ +fn main() -> () { + log "===== THREADS ====="; + test00(true); + log "====== TASKS ======"; + test00(false); +} + +fn start(int task_number) { + log "Started task."; + let int i = 0; + while (i < 10000) { + i = i + 1; + } + log "Finished task."; +} + +fn test00(bool create_threads) { + let int number_of_tasks = 32; + + let int i = 0; + let vec[task] tasks = vec(); + while (i < number_of_tasks) { + i = i + 1; + if (create_threads) { + tasks += vec(spawn thread start(i)); + } else { + tasks += vec(spawn start(i)); + } + } + + for (task t in tasks) { + join t; + } +}
\ No newline at end of file diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs new file mode 100644 index 00000000..6dd620cc --- /dev/null +++ b/src/test/run-pass/task-comm-3.rs @@ -0,0 +1,59 @@ +io fn main() -> () { + log "===== THREADS ====="; + test00(false); +} + +io fn test00_start(chan[int] ch, int message, int count) { + log "Starting test00_start"; + let int i = 0; + while (i < count) { + log "Sending Message"; + ch <| message; + i = i + 1; + } + log "Ending test00_start"; +} + +io fn test00(bool is_multithreaded) { + let int number_of_tasks = 1; + let int number_of_messages = 0; + log "Creating tasks"; + + let port[int] po = port(); + let chan[int] ch = chan(po); + + let int i = 0; + + // Create and spawn tasks... + let vec[task] tasks = vec(); + while (i < number_of_tasks) { + i = i + 1; + if (is_multithreaded) { + tasks += vec( + spawn thread test00_start(ch, i, number_of_messages)); + } else { + tasks += vec(spawn test00_start(ch, i, number_of_messages)); + } + } + + // Read from spawned tasks... + let int sum = 0; + for (task t in tasks) { + i = 0; + while (i < number_of_messages) { + let int value <- po; + sum += value; + i = i + 1; + } + } + + // Join spawned tasks... + for (task t in tasks) { + join t; + } + + log "Completed: Final number is: "; + check (sum + 1 == number_of_messages * + (number_of_tasks * number_of_tasks + number_of_tasks) / 2); + log sum; +}
\ No newline at end of file diff --git a/src/test/run-pass/task-comm-4.rs b/src/test/run-pass/task-comm-4.rs new file mode 100644 index 00000000..42ba6992 --- /dev/null +++ b/src/test/run-pass/task-comm-4.rs @@ -0,0 +1,10 @@ +io fn main() -> () { + test00(); +} + +io fn test00() { + let port[int] p = port(); + let chan[int] c = chan(p); + c <| 42; + let int r <- p; +}
\ No newline at end of file diff --git a/src/test/run-pass/task-comm.rs b/src/test/run-pass/task-comm.rs index 4a21b4e4..ef71c6e1 100644 --- a/src/test/run-pass/task-comm.rs +++ b/src/test/run-pass/task-comm.rs @@ -1,17 +1,19 @@ - -io fn main() -> () { - test00(true); +fn main() -> () { + // test00(true); // test01(); // test02(); // test03(); // test04(); + // test05(); + test06(); } io fn test00_start(chan[int] ch, int message, int count) { log "Starting test00_start"; let int i = 0; while (i < count) { + log "Sending Message"; ch <| message; i = i + 1; } @@ -19,7 +21,7 @@ io fn test00_start(chan[int] ch, int message, int count) { } io fn test00(bool is_multithreaded) { - let int number_of_tasks = 4; + let int number_of_tasks = 1; let int number_of_messages = 64; log "Creating tasks"; @@ -109,12 +111,50 @@ fn test04() { log "Finishing up."; } +io fn test05_start(chan[int] ch) { + ch <| 10; + ch <| 20; + ch <| 30; + ch <| 30; + ch <| 30; +} +io fn test05() { + let port[int] po = port(); + let chan[int] ch = chan(po); + spawn thread test05_start(ch); + let int value <- po; + value <- po; + value <- po; + log value; +} - - - - +fn test06_start(int task_number) { + log "Started task."; + let int i = 0; + while (i < 100000000) { + i = i + 1; + } + log "Finished task."; +} + +fn test06() { + let int number_of_tasks = 32; + log "Creating tasks"; + + let int i = 0; + + let vec[task] tasks = vec(); + while (i < number_of_tasks) { + i = i + 1; + tasks += vec(spawn thread test06_start(i)); + // tasks += vec(spawn test06_start(i)); + } + + for (task t in tasks) { + join t; + } +} |