aboutsummaryrefslogtreecommitdiff
path: root/src/rt/rust_comm.cpp
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-19 14:05:18 -0700
committerMichael Bebenita <[email protected]>2010-07-19 14:05:18 -0700
commit00d1465d13980fc3acf650f182ee0723fbda0e06 (patch)
treea73cf5f0f20c0bee6722b33d975eb930919fefdf /src/rt/rust_comm.cpp
parentAdd a test for an obvious-seeming (but not actually legal) kind of cast attem... (diff)
downloadrust-00d1465d13980fc3acf650f182ee0723fbda0e06.tar.xz
rust-00d1465d13980fc3acf650f182ee0723fbda0e06.zip
Added a message passing system based on lock free queues for inter-thread communication. Channels now buffer on the sending side, and no longer require blocking when sending. Lots of other refactoring and bug fixes.
Diffstat (limited to 'src/rt/rust_comm.cpp')
-rw-r--r--src/rt/rust_comm.cpp119
1 files changed, 7 insertions, 112 deletions
diff --git a/src/rt/rust_comm.cpp b/src/rt/rust_comm.cpp
index 58b9ef4c..37929b8b 100644
--- a/src/rt/rust_comm.cpp
+++ b/src/rt/rust_comm.cpp
@@ -10,109 +10,6 @@ rust_alarm::rust_alarm(rust_task *receiver) :
{
}
-
-// Circular buffers.
-
-circ_buf::circ_buf(rust_dom *dom, size_t unit_sz) :
- dom(dom),
- alloc(INIT_CIRC_BUF_UNITS * unit_sz),
- unit_sz(unit_sz),
- next(0),
- unread(0),
- data((uint8_t *)dom->calloc(alloc))
-{
- I(dom, unit_sz);
- dom->log(rust_log::MEM|rust_log::COMM,
- "new circ_buf(alloc=%d, unread=%d) -> circ_buf=0x%" PRIxPTR,
- alloc, unread, this);
- I(dom, data);
-}
-
-circ_buf::~circ_buf()
-{
- dom->log(rust_log::MEM|rust_log::COMM,
- "~circ_buf 0x%" PRIxPTR,
- this);
- I(dom, data);
- // I(dom, unread == 0);
- dom->free(data);
-}
-
-void
-circ_buf::transfer(void *dst)
-{
- size_t i;
- uint8_t *d = (uint8_t *)dst;
- I(dom, dst);
- for (i = 0; i < unread; i += unit_sz)
- memcpy(&d[i], &data[next + i % alloc], unit_sz);
-}
-
-void
-circ_buf::push(void *src)
-{
- size_t i;
- void *tmp;
-
- I(dom, src);
- I(dom, unread <= alloc);
-
- /* Grow if necessary. */
- if (unread == alloc) {
- I(dom, alloc <= MAX_CIRC_BUF_SIZE);
- tmp = dom->malloc(alloc << 1);
- transfer(tmp);
- alloc <<= 1;
- dom->free(data);
- data = (uint8_t *)tmp;
- }
-
- dom->log(rust_log::MEM|rust_log::COMM,
- "circ buf push, unread=%d, alloc=%d, unit_sz=%d",
- unread, alloc, unit_sz);
-
- I(dom, unread < alloc);
- I(dom, unread + unit_sz <= alloc);
-
- i = (next + unread) % alloc;
- memcpy(&data[i], src, unit_sz);
-
- dom->log(rust_log::MEM|rust_log::COMM, "pushed data at index %d", i);
- unread += unit_sz;
-}
-
-void
-circ_buf::shift(void *dst)
-{
- size_t i;
- void *tmp;
-
- I(dom, dst);
- I(dom, unit_sz > 0);
- I(dom, unread >= unit_sz);
- I(dom, unread <= alloc);
- I(dom, data);
- i = next;
- memcpy(dst, &data[i], unit_sz);
- dom->log(rust_log::MEM|rust_log::COMM, "shifted data from index %d", i);
- unread -= unit_sz;
- next += unit_sz;
- I(dom, next <= alloc);
- if (next == alloc)
- next = 0;
-
- /* Shrink if necessary. */
- if (alloc >= INIT_CIRC_BUF_UNITS * unit_sz &&
- unread <= alloc / 4) {
- tmp = dom->malloc(alloc / 2);
- transfer(tmp);
- alloc >>= 1;
- dom->free(data);
- data = (uint8_t *)tmp;
- }
-}
-
-
// Ports.
rust_port::rust_port(rust_task *task, size_t unit_sz) :
@@ -121,18 +18,16 @@ rust_port::rust_port(rust_task *task, size_t unit_sz) :
writers(task->dom),
chans(task->dom)
{
- rust_dom *dom = task->dom;
- dom->log(rust_log::MEM|rust_log::COMM,
- "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
- PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
+ task->log(rust_log::MEM|rust_log::COMM,
+ "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
+ PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
}
rust_port::~rust_port()
{
- rust_dom *dom = task->dom;
- dom->log(rust_log::COMM|rust_log::MEM,
- "~rust_port 0x%" PRIxPTR,
- (uintptr_t)this);
+ task->log(rust_log::COMM|rust_log::MEM,
+ "~rust_port 0x%" PRIxPTR,
+ (uintptr_t)this);
while (chans.length() > 0)
chans.pop()->disassociate();
}
@@ -182,7 +77,7 @@ rust_token::withdraw()
if (task->blocked())
task->wakeup(this); // must be blocked on us (or dead)
- port->writers.swapdel(this);
+ port->writers.swap_delete(this);
submitted = false;
}