diff options
| -rw-r--r-- | src/rt/rust_kernel.cpp | 47 | ||||
| -rw-r--r-- | src/rt/rust_kernel.h | 3 | ||||
| -rw-r--r-- | src/rt/sync/sync.cpp | 9 | ||||
| -rw-r--r-- | src/rt/sync/sync.h | 3 | ||||
| -rw-r--r-- | src/test/run-pass/task-comm-9.rs | 3 |
5 files changed, 45 insertions, 20 deletions
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 9ea1f2eb..910e33c6 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -129,20 +129,25 @@ rust_kernel::log(uint32_t type_bits, char const *fmt, ...) { } void -rust_kernel::start_kernel_loop() { - while (_interrupt_kernel_loop == false) { - message_queues.global.lock(); - for (size_t i = 0; i < message_queues.length(); i++) { - rust_message_queue *queue = message_queues[i]; - if (queue->is_associated() == false) { - rust_message *message = NULL; - while (queue->dequeue(&message)) { - message->kernel_process(); - delete message; - } +rust_kernel::pump_message_queues() { + message_queues.global.lock(); + for (size_t i = 0; i < message_queues.length(); i++) { + rust_message_queue *queue = message_queues[i]; + if (queue->is_associated() == false) { + rust_message *message = NULL; + while (queue->dequeue(&message)) { + message->kernel_process(); + delete message; } } - message_queues.global.unlock(); + } + message_queues.global.unlock(); +} + +void +rust_kernel::start_kernel_loop() { + while (_interrupt_kernel_loop == false) { + pump_message_queues(); } } @@ -153,16 +158,24 @@ rust_kernel::run() { log(rust_log::KERN, "finished kernel loop"); } +void +rust_kernel::terminate_kernel_loop() { + _interrupt_kernel_loop = true; + join(); +} + rust_kernel::~rust_kernel() { K(_srv, domains.length() == 0, "Kernel has %d live domain(s), join all domains before killing " "the kernel.", domains.length()); - // If the kernel loop is running, interrupt it, join and exit. - if (is_running()) { - _interrupt_kernel_loop = true; - join(); - } + terminate_kernel_loop(); + + // It's possible that the message pump misses some messages because + // of races, so pump any remaining messages here. By now all domain + // threads should have been joined, so we shouldn't miss any more + // messages. + pump_message_queues(); free_handles(_task_handles); free_handles(_port_handles); diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h index 902a9a2f..8c599c1f 100644 --- a/src/rt/rust_kernel.h +++ b/src/rt/rust_kernel.h @@ -60,6 +60,9 @@ class rust_kernel : public rust_thread { */ spin_lock _message_queues_lock; + void terminate_kernel_loop(); + void pump_message_queues(); + public: /** diff --git a/src/rt/sync/sync.cpp b/src/rt/sync/sync.cpp index 70e5077c..fdfc0652 100644 --- a/src/rt/sync/sync.cpp +++ b/src/rt/sync/sync.cpp @@ -11,6 +11,10 @@ void sync::yield() { #endif } +rust_thread::rust_thread() : _is_running(false) { + // Nop. +} + #if defined(__WIN32__) static DWORD WINAPI #elif defined(__GNUC__) @@ -36,6 +40,7 @@ rust_thread::start() { pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&thread, &attr, rust_thread_start, (void *) this); #endif + _is_running = true; } void @@ -46,10 +51,10 @@ rust_thread::join() { pthread_join(thread, NULL); #endif thread = 0; + _is_running = false; } bool rust_thread::is_running() { - // TODO: This may be broken because of possible races. - return thread; + return _is_running; } diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index 562d2a1b..3829dafe 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -15,12 +15,15 @@ public: * Thread utility class. Derive and implement your own run() method. */ class rust_thread { +private: + volatile bool _is_running; public: #if defined(__WIN32__) HANDLE thread; #else pthread_t thread; #endif + rust_thread(); void start(); virtual void run() { diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index e6ca84f5..a2c9d5c9 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -16,7 +16,8 @@ io fn test00() { let port[int] p = port(); let int number_of_messages = 10; - let task t0 = spawn thread test00_start(chan(p), number_of_messages); + let task t0 = spawn thread "child" + test00_start(chan(p), number_of_messages); let int i = 0; while (i < number_of_messages) { |