#include #include #include //------------------------------------------------------------------------------ struct alignas(INTERFERENCE_SIZE) Job { StringView name; void (*entry)(void*) = nullptr; void* data = nullptr; Job* fence = nullptr; Job* first_child = nullptr; Job* next_sibling = nullptr; atomic_int16 num_refs = 0; atomic_int16 dep_refs = 0; uint16 _unused; atomic_uint8 flags = 0; uint8 wait_bit = 0; enum : uint8 { Flag_Locked = 1 << 0, Flag_Scheduled = 1 << 1, Flag_Running = 1 << 2, Flag_Finished = 1 << 3, Flag_HasWaiter = 1 << 4, }; }; static_assert(sizeof(Job) <= INTERFERENCE_SIZE); //------------------------------------------------------------------------------ class JobLock : NoMove, NoCopy { public: JobLock(Job* job); ~JobLock() { _job->flags.xor_release(Job::Flag_Locked); } bool has_finished() const { return !!(_prev_flags & Job::Flag_Finished); } private: void claim(); Job* _job; uint8 _prev_flags; }; //------------------------------------------------------------------------------ JobLock::JobLock(Job* job) : _job(job) { _prev_flags = job->flags.or_acquire(Job::Flag_Locked); if (_prev_flags & Job::Flag_Locked) claim(); } //------------------------------------------------------------------------------ void JobLock::claim() { while (true) { _prev_flags = _job->flags.or_acquire(Job::Flag_Locked); if ((_prev_flags & Job::Flag_Locked) == 0) return; } } //------------------------------------------------------------------------------ Task::Task(Job* job) : _job(job) { assert(job->num_refs.load_relaxed() >= 1); } //------------------------------------------------------------------------------ Task::~Task() { if (!is_valid()) return; int32 prev_ref = _job->num_refs.add_relaxed(-1); if (prev_ref != 1) return; tt_free(_job); } //------------------------------------------------------------------------------ Task::operator Job* () { return _job; } Job* Task::operator -> () { return _job; } //------------------------------------------------------------------------------ Scheduler::Scheduler(const Setup& desc) { uint32 concurrency = desc.concurrency; if (concurrency == 0) concurrency = std::thread::hardware_concurrency(); Setup::ThreadFactory* factory = desc.thread_factory; if (factory == nullptr) { auto default_factory = [] (uintptr, Setup::WorkLoop* loop, uintptr loop_param) { return Thread(loop, loop_param); }; factory = default_factory; } _threads.resize(concurrency); for (Thread& thread : _threads) { thread = factory(desc.factory_param, [] (uintptr param) { auto& scheduler = *(Scheduler*)param; scheduler.do_work(); }, uintptr(this)); } } //------------------------------------------------------------------------------ Scheduler::~Scheduler() { _running.store_release(0); _list_cond_var.notify_all(); for (Thread& thread : _threads) thread.join(); assert(_job_list == nullptr); } //------------------------------------------------------------------------------ Task Scheduler::create(StringView name) { return create_impl(name, nullptr, nullptr); } //------------------------------------------------------------------------------ void Scheduler::depends_on(Task& fence, Task& that) { uint8 prev_flags = fence->flags.or_relaxed(Job::Flag_Scheduled); if ((prev_flags & Job::Flag_Scheduled) == 0) fence->num_refs.add_relaxed(1); assert((that->flags.load_relaxed() & Job::Flag_Scheduled) == 0); // "Build fences first" assert(that->fence == nullptr); // "Jobs may only have one dependant" that->fence = fence; fence->dep_refs.add_relaxed(1); } //------------------------------------------------------------------------------ void Scheduler::start_after(Task& self, Task& runs_first) { uint8 prev_flags = self->flags.or_relaxed(Job::Flag_Scheduled); assert((prev_flags & Job::Flag_Scheduled) == 0); self->num_refs.add_relaxed(1); JobLock lock(runs_first); if (lock.has_finished()) return submit((Job*)self); self->next_sibling = runs_first->first_child; runs_first->first_child = self; } //------------------------------------------------------------------------------ void Scheduler::submit(Task& task) { if (!task.is_valid()) return; uint8 prev_flags = task->flags.or_relaxed(Job::Flag_Scheduled); assert((prev_flags & Job::Flag_Scheduled) == 0); Job* job = task; job->num_refs.add_relaxed(1); submit(job); } //------------------------------------------------------------------------------ Task Scheduler::create_impl(StringView name, void* data, JobEntry entry) { Job* job = tt_new(); job->name = name; job->entry = entry; job->data = data; job->num_refs.add_relaxed(1); return Task(job); } //------------------------------------------------------------------------------ void Scheduler::do_work() { while (true) { UniqueLock lock(_list_lock); Job* job; while (true) { if (!_running.load_relaxed()) return; job = _job_list; if (job != nullptr) { _job_list = job->next_sibling; lock.unlock(); break; } _list_cond_var.wait(lock); } do { job = do_work(job); } while (job != nullptr); } } //------------------------------------------------------------------------------ Job* Scheduler::do_work(Job* job) { Task task(job); job->flags.or_acquire(Job::Flag_Running); if (job->entry != nullptr) job->entry(job->data); uint8 prev_flags = job->flags.or_release(Job::Flag_Finished); if (prev_flags & Job::Flag_HasWaiter) unwait(job); Job* next = nullptr; if (Job* fence = job->fence) { uint32 prev_refs = fence->dep_refs.add_release(-1); if (prev_refs == 1) next = fence; } if (Job* child = job->first_child) { if (next == nullptr) { next = child; child = child->next_sibling; } if (child != nullptr) submit(child); } return next; } //------------------------------------------------------------------------------ void Scheduler::submit(Job* job) { uint32 count = 1; Job* tail = job; for (; tail->next_sibling != nullptr; ++count) { assert(tail->dep_refs.load_relaxed() == 0); // don't recall why I put this here JobLock lock(tail); tail = tail->next_sibling; } _list_lock.lock(); tail->next_sibling = _job_list; _job_list = job; _list_lock.unlock(); (count > 1) ? _list_cond_var.notify_all() : _list_cond_var.notify_one(); } //------------------------------------------------------------------------------ Scheduler::WaitSlot& Scheduler::get_wait_slot(Job* job) { enum { A_RANDOM_PRIME_I_PICKED_ALL_ON_MY_OWN = 29 }; uint32 index = uint32(uintptr(job)) / sizeof(Job); index *= A_RANDOM_PRIME_I_PICKED_ALL_ON_MY_OWN; index %= sizeof_array(_wait_slots); return _wait_slots[index]; } //------------------------------------------------------------------------------ void* Scheduler::wait_impl(Task& task) { if (!task.is_valid()) return nullptr; uint8 seen_flags; for (uint32 i = 0; i < 50; ++i) { seen_flags = task->flags.load_acquire(); if (seen_flags & Job::Flag_Finished) return task->data; std::this_thread::yield(); } seen_flags = task->flags.or_acquire(Job::Flag_HasWaiter); if (seen_flags & Job::Flag_Finished) return task->data; WaitSlot& slot = get_wait_slot(task); UniqueLock lock(slot.lock); while (true) { seen_flags = task->flags.load_acquire(); if (seen_flags & Job::Flag_Finished) return task->data; slot.cond_var.wait(lock); } } //------------------------------------------------------------------------------ void Scheduler::unwait(Job* job) { WaitSlot& slot = get_wait_slot(job); slot.cond_var.notify_all(); }