aboutsummaryrefslogtreecommitdiff
path: root/src/rt/rust_upcall.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/rt/rust_upcall.cpp')
-rw-r--r--src/rt/rust_upcall.cpp142
1 files changed, 89 insertions, 53 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 574cb703..90d6f6d9 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -6,19 +6,37 @@
#define LOG_UPCALL_ENTRY(task) \
(task)->dom->get_log().reset_indent(0); \
(task)->log(rust_log::UPCALL, \
- "> UPCALL %s - task: 0x%" PRIxPTR \
- " retpc: x%" PRIxPTR, \
+ "> UPCALL %s - task: %s 0x%" PRIxPTR \
+ " retpc: x%" PRIxPTR \
+ " ref_count: %d", \
__FUNCTION__, \
- (task), __builtin_return_address(0)); \
+ (task)->name, (task), \
+ __builtin_return_address(0), \
+ (task->ref_count)); \
(task)->dom->get_log().indent();
#else
#define LOG_UPCALL_ENTRY(task) \
(task)->dom->get_log().reset_indent(0); \
(task)->log(rust_log::UPCALL, \
- "> UPCALL task: x%" PRIxPTR (task)); \
+ "> UPCALL task: %s @x%" PRIxPTR, \
+ (task)->name, (task)); \
(task)->dom->get_log().indent();
#endif
+void
+log_task_state(rust_task *task, maybe_proxy<rust_task> *target) {
+ rust_task *delegate = target->delegate();
+ if (target->is_proxy()) {
+ task->log(rust_log::TASK,
+ "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s",
+ delegate, delegate->ref_count, delegate->state_str());
+ } else {
+ task->log(rust_log::TASK,
+ "local task: 0x%" PRIxPTR ", ref_count: %d state: %s",
+ delegate, delegate->ref_count, delegate->state_str());
+ }
+}
+
extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s);
extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
@@ -29,14 +47,13 @@ extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::ULOG,
- "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i,
- (char) i);
+ "rust: %" PRId32 " (0x%" PRIx32 ")", i, i);
}
extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) {
LOG_UPCALL_ENTRY(task);
const char *c = str_buf(task, str);
- task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c);
+ task->log(rust_log::UPCALL | rust_log::ULOG, "rust: %s", c);
}
extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) {
@@ -55,8 +72,8 @@ upcall_new_port(rust_task *task, size_t unit_sz) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
- "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)",
- (uintptr_t) task, unit_sz);
+ "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
+ (uintptr_t) task, task->name, unit_sz);
return new (dom) rust_port(task, unit_sz);
}
@@ -76,33 +93,58 @@ upcall_new_chan(rust_task *task, rust_port *port) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
- "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")",
- (uintptr_t) task, port);
+ "upcall_new_chan("
+ "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
+ (uintptr_t) task, task->name, port);
I(dom, port);
return new (dom) rust_chan(task, port);
}
/**
+ * Called whenever this channel needs to be flushed. This can happen due to a
+ * flush statement, or automatically whenever a channel's ref count is
+ * about to drop to zero.
+ */
+extern "C" CDECL void
+upcall_flush_chan(rust_task *task, rust_chan *chan) {
+ LOG_UPCALL_ENTRY(task);
+ rust_dom *dom = task->dom;
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "flush chan: 0x%" PRIxPTR, chan);
+
+ if (chan->buffer.is_empty()) {
+ return;
+ }
+
+ A(dom, chan->port->is_proxy() == false,
+ "Channels to remote ports should be flushed automatically.");
+
+ // Block on the port until this channel has been completely drained
+ // by the port.
+ task->block(chan->port);
+ task->yield(2);
+}
+
+/**
* Called whenever the channel's ref count drops to zero.
+ *
+ * Cannot Yield: If the task were to unwind, the dropped ref would still
+ * appear to be live, causing modify-after-free errors.
*/
extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
+
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
- I(dom, !chan->ref_count);
-
- if (!chan->buffer.is_empty() && chan->is_associated()) {
- A(dom, !chan->port->is_proxy(),
- "Channels to remote ports should be flushed automatically.");
- // A target port may still be reading from this channel.
- // Block on this channel until it has been completely drained
- // by the port.
- task->block(chan);
- task->yield(2);
- return;
- }
+ A(task->dom, chan->ref_count == 0,
+ "Channel's ref count should be zero.");
+
+ if (chan->is_associated()) {
+ A(task->dom, chan->buffer.is_empty(),
+ "Channel's buffer should be empty.");
+ chan->disassociate();
+ }
delete chan;
}
@@ -136,11 +178,11 @@ extern "C" CDECL void upcall_yield(rust_task *task) {
extern "C" CDECL void
upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
+ rust_task *target_task = target->delegate();
task->log(rust_log::UPCALL | rust_log::COMM,
- "target: 0x%" PRIxPTR ", task: 0x%" PRIxPTR,
- target, target->delegate());
+ "target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR,
+ target, target_task->name, target_task);
- rust_task *target_task = target->delegate();
if (target->is_proxy()) {
notify_message::
send(notify_message::JOIN, "join", task, target->as_proxy());
@@ -168,8 +210,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
(uintptr_t) chan, (uintptr_t) sptr,
chan->port->delegate()->unit_sz);
- chan->buffer.enqueue(sptr);
- chan->transmit();
+ chan->send(sptr);
task->log(rust_log::COMM, "=== sent data ===>");
}
@@ -182,17 +223,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
- for (uint32_t i = 0; i < port->chans.length(); i++) {
- rust_chan *chan = port->chans[i];
- if (chan->buffer.is_empty() == false) {
- chan->buffer.dequeue(dptr);
- if (chan->buffer.is_empty() && chan->task->blocked()) {
- chan->task->wakeup(chan);
- delete chan;
- }
- task->log(rust_log::COMM, "<=== read data ===");
- return;
- }
+ if (port->receive(dptr)) {
+ return;
}
// No data was buffered on any incoming channel, so block this task
@@ -219,11 +251,12 @@ extern "C" CDECL void upcall_fail(rust_task *task, char const *expr,
extern "C" CDECL void
upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
+ log_task_state(task, target);
rust_task *target_task = target->delegate();
task->log(rust_log::UPCALL | rust_log::TASK,
- "kill task 0x%" PRIxPTR ", ref count %d",
- target_task,
+ "kill task %s @0x%" PRIxPTR ", ref count %d",
+ target_task->name, target_task,
target_task->ref_count);
if (target->is_proxy()) {
@@ -244,6 +277,8 @@ upcall_exit(rust_task *task) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::TASK,
"task ref_count: %d", task->ref_count);
+ A(task->dom, task->ref_count >= 0,
+ "Task ref_count should not be negative on exit!");
task->die();
task->notify_tasks_waiting_to_join();
task->yield(1);
@@ -498,14 +533,14 @@ static void *rust_thread_start(void *ptr)
}
extern "C" CDECL rust_task *
-upcall_new_task(rust_task *spawner) {
+upcall_new_task(rust_task *spawner, const char *name) {
LOG_UPCALL_ENTRY(spawner);
rust_dom *dom = spawner->dom;
- rust_task *task = new (dom) rust_task(dom, spawner);
+ rust_task *task = new (dom) rust_task(dom, spawner, name);
dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
- "upcall new_task(spawner 0x%" PRIxPTR ") = 0x%" PRIxPTR,
- spawner, task);
+ "upcall new_task(spawner %s @0x%" PRIxPTR ", %s) = 0x%" PRIxPTR,
+ spawner->name, spawner, name, task);
return task;
}
@@ -516,26 +551,27 @@ upcall_start_task(rust_task *spawner, rust_task *task,
rust_dom *dom = spawner->dom;
dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
- "upcall start_task(task 0x%" PRIxPTR
+ "upcall start_task(task %s @0x%" PRIxPTR
" exit_task_glue 0x%" PRIxPTR
", spawnee 0x%" PRIxPTR
- ", callsz %" PRIdPTR ")", task, exit_task_glue, spawnee_fn,
- callsz);
+ ", callsz %" PRIdPTR ")", task->name, task, exit_task_glue,
+ spawnee_fn, callsz);
task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz);
return task;
}
extern "C" CDECL maybe_proxy<rust_task> *
-upcall_new_thread(rust_task *task) {
+upcall_new_thread(rust_task *task, const char *name) {
LOG_UPCALL_ENTRY(task);
rust_dom *old_dom = task->dom;
rust_dom *new_dom = new rust_dom(old_dom->srv->clone(),
- old_dom->root_crate);
+ old_dom->root_crate,
+ name);
task->log(rust_log::UPCALL | rust_log::MEM,
- "upcall new_thread() = dom 0x%" PRIxPTR " task 0x%" PRIxPTR,
- new_dom, new_dom->root_task);
+ "upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR,
+ name, new_dom, new_dom->root_task);
rust_proxy<rust_task> *proxy =
new (old_dom) rust_proxy<rust_task>(old_dom,
new_dom->root_task, true);