diff options
Diffstat (limited to 'thirdparty/tourist/foundation/src/scheduler.cpp')
| -rw-r--r-- | thirdparty/tourist/foundation/src/scheduler.cpp | 339 |
1 files changed, 339 insertions, 0 deletions
diff --git a/thirdparty/tourist/foundation/src/scheduler.cpp b/thirdparty/tourist/foundation/src/scheduler.cpp new file mode 100644 index 000000000..bb2499e0d --- /dev/null +++ b/thirdparty/tourist/foundation/src/scheduler.cpp @@ -0,0 +1,339 @@ +#include <foundation/malloc.h> +#include <foundation/scheduler.h> + +#include <cassert> + +//------------------------------------------------------------------------------ +struct alignas(INTERFERENCE_SIZE) Job +{ + StringView name; + + void (*entry)(void*) = nullptr; + void* data = nullptr; + + Job* fence = nullptr; + Job* first_child = nullptr; + Job* next_sibling = nullptr; + + atomic_int16 num_refs = 0; + atomic_int16 dep_refs = 0; + uint16 _unused; + atomic_uint8 flags = 0; + uint8 wait_bit = 0; + + enum : uint8 + { + Flag_Locked = 1 << 0, + Flag_Scheduled = 1 << 1, + Flag_Running = 1 << 2, + Flag_Finished = 1 << 3, + Flag_HasWaiter = 1 << 4, + }; +}; +static_assert(sizeof(Job) <= INTERFERENCE_SIZE); + + + +//------------------------------------------------------------------------------ +class JobLock : NoMove, NoCopy +{ +public: + JobLock(Job* job); + ~JobLock() { _job->flags.xor_release(Job::Flag_Locked); } + bool has_finished() const { return !!(_prev_flags & Job::Flag_Finished); } + +private: + void claim(); + Job* _job; + uint8 _prev_flags; +}; + +//------------------------------------------------------------------------------ +JobLock::JobLock(Job* job) +: _job(job) +{ + _prev_flags = job->flags.or_acquire(Job::Flag_Locked); + if (_prev_flags & Job::Flag_Locked) + claim(); +} + +//------------------------------------------------------------------------------ +void JobLock::claim() +{ + while (true) + { + _prev_flags = _job->flags.or_acquire(Job::Flag_Locked); + if ((_prev_flags & Job::Flag_Locked) == 0) + return; + } +} + + + +//------------------------------------------------------------------------------ +Task::Task(Job* job) +: _job(job) +{ + assert(job->num_refs.load_relaxed() >= 1); +} + +//------------------------------------------------------------------------------ +Task::~Task() +{ + if (!is_valid()) + return; + + int32 prev_ref = _job->num_refs.add_relaxed(-1); + if (prev_ref != 1) + return; + + tt_free(_job); +} + +//------------------------------------------------------------------------------ +Task::operator Job* () { return _job; } +Job* Task::operator -> () { return _job; } + + + +//------------------------------------------------------------------------------ +Scheduler::Scheduler(const Setup& desc) +{ + uint32 concurrency = desc.concurrency; + if (concurrency == 0) + concurrency = std::thread::hardware_concurrency(); + + Setup::ThreadFactory* factory = desc.thread_factory; + if (factory == nullptr) + { + auto default_factory = [] (uintptr, Setup::WorkLoop* loop, uintptr loop_param) { + return Thread(loop, loop_param); + }; + factory = default_factory; + } + + _threads.resize(concurrency); + for (Thread& thread : _threads) + { + thread = factory(desc.factory_param, [] (uintptr param) { + auto& scheduler = *(Scheduler*)param; + scheduler.do_work(); + }, uintptr(this)); + } +} + +//------------------------------------------------------------------------------ +Scheduler::~Scheduler() +{ + _running.store_release(0); + + _list_cond_var.notify_all(); + + for (Thread& thread : _threads) + thread.join(); + + assert(_job_list == nullptr); +} + +//------------------------------------------------------------------------------ +Task Scheduler::create(StringView name) +{ + return create_impl(name, nullptr, nullptr); +} + +//------------------------------------------------------------------------------ +void Scheduler::depends_on(Task& fence, Task& that) +{ + uint8 prev_flags = fence->flags.or_relaxed(Job::Flag_Scheduled); + if ((prev_flags & Job::Flag_Scheduled) == 0) + fence->num_refs.add_relaxed(1); + + assert((that->flags.load_relaxed() & Job::Flag_Scheduled) == 0); // "Build fences first" + assert(that->fence == nullptr); // "Jobs may only have one dependant" + + that->fence = fence; + fence->dep_refs.add_relaxed(1); +} + +//------------------------------------------------------------------------------ +void Scheduler::start_after(Task& self, Task& runs_first) +{ + uint8 prev_flags = self->flags.or_relaxed(Job::Flag_Scheduled); + assert((prev_flags & Job::Flag_Scheduled) == 0); + + self->num_refs.add_relaxed(1); + + JobLock lock(runs_first); + if (lock.has_finished()) + return submit((Job*)self); + + self->next_sibling = runs_first->first_child; + runs_first->first_child = self; +} + +//------------------------------------------------------------------------------ +void Scheduler::submit(Task& task) +{ + if (!task.is_valid()) + return; + + uint8 prev_flags = task->flags.or_relaxed(Job::Flag_Scheduled); + assert((prev_flags & Job::Flag_Scheduled) == 0); + + Job* job = task; + job->num_refs.add_relaxed(1); + submit(job); +} + +//------------------------------------------------------------------------------ +Task Scheduler::create_impl(StringView name, void* data, JobEntry entry) +{ + Job* job = tt_new<Job>(); + job->name = name; + job->entry = entry; + job->data = data; + + job->num_refs.add_relaxed(1); + return Task(job); +} + +//------------------------------------------------------------------------------ +void Scheduler::do_work() +{ + while (true) + { + UniqueLock lock(_list_lock); + + Job* job; + while (true) + { + if (!_running.load_relaxed()) + return; + + job = _job_list; + if (job != nullptr) + { + _job_list = job->next_sibling; + lock.unlock(); + break; + } + + _list_cond_var.wait(lock); + } + + do + { + job = do_work(job); + } + while (job != nullptr); + } +} + +//------------------------------------------------------------------------------ +Job* Scheduler::do_work(Job* job) +{ + Task task(job); + + job->flags.or_acquire(Job::Flag_Running); + + if (job->entry != nullptr) + job->entry(job->data); + + uint8 prev_flags = job->flags.or_release(Job::Flag_Finished); + + if (prev_flags & Job::Flag_HasWaiter) + unwait(job); + + Job* next = nullptr; + if (Job* fence = job->fence) + { + uint32 prev_refs = fence->dep_refs.add_release(-1); + if (prev_refs == 1) + next = fence; + } + + if (Job* child = job->first_child) + { + if (next == nullptr) + { + next = child; + child = child->next_sibling; + } + + if (child != nullptr) + submit(child); + } + + return next; +} + +//------------------------------------------------------------------------------ +void Scheduler::submit(Job* job) +{ + uint32 count = 1; + Job* tail = job; + for (; tail->next_sibling != nullptr; ++count) + { + assert(tail->dep_refs.load_relaxed() == 0); // don't recall why I put this here + JobLock lock(tail); + tail = tail->next_sibling; + } + + _list_lock.lock(); + tail->next_sibling = _job_list; + _job_list = job; + _list_lock.unlock(); + + (count > 1) ? _list_cond_var.notify_all() : _list_cond_var.notify_one(); +} + +//------------------------------------------------------------------------------ +Scheduler::WaitSlot& Scheduler::get_wait_slot(Job* job) +{ + enum { A_RANDOM_PRIME_I_PICKED_ALL_ON_MY_OWN = 29 }; + uint32 index = uint32(uintptr(job)) / sizeof(Job); + index *= A_RANDOM_PRIME_I_PICKED_ALL_ON_MY_OWN; + index %= sizeof_array(_wait_slots); + return _wait_slots[index]; +} + +//------------------------------------------------------------------------------ +void* Scheduler::wait_impl(Task& task) +{ + if (!task.is_valid()) + return nullptr; + + uint8 seen_flags; + + for (uint32 i = 0; i < 50; ++i) + { + seen_flags = task->flags.load_acquire(); + if (seen_flags & Job::Flag_Finished) + return task->data; + + std::this_thread::yield(); + } + + seen_flags = task->flags.or_acquire(Job::Flag_HasWaiter); + if (seen_flags & Job::Flag_Finished) + return task->data; + + WaitSlot& slot = get_wait_slot(task); + + UniqueLock lock(slot.lock); + while (true) + { + seen_flags = task->flags.load_acquire(); + if (seen_flags & Job::Flag_Finished) + return task->data; + + slot.cond_var.wait(lock); + } +} + +//------------------------------------------------------------------------------ +void Scheduler::unwait(Job* job) +{ + WaitSlot& slot = get_wait_slot(job); + slot.cond_var.notify_all(); +} |