// Copyright 2018 The Fuchsia Authors. All rights reserved.
|
// Use of this source code is governed by a BSD-style license that can be
|
// found in the LICENSE file.
|
|
// Can't compile this for Zircon userspace yet since libstdc++ isn't available.
|
#ifndef FIT_NO_STD_FOR_ZIRCON_USERSPACE
|
|
#include <condition_variable>
|
#include <mutex>
|
|
#include <lib/fit/single_threaded_executor.h>
|
#include <lib/fit/thread_safety.h>
|
|
namespace fit {
|
|
// The dispatcher runs tasks and provides the suspended task resolver.
|
//
|
// The lifetime of this object is somewhat complex since there are pointers
|
// to it from multiple sources which are released in different ways.
|
//
|
// - |single_threaded_executor| holds a pointer in |dispatcher_| which it releases
|
// after calling |shutdown()| to inform the dispatcher of its own demise
|
// - |suspended_task| holds a pointer to the dispatcher's resolver
|
// interface and the number of outstanding pointers corresponds to the
|
// number of outstanding suspended task tickets tracked by |scheduler_|.
|
//
|
// The dispatcher deletes itself once all pointers have been released.
|
class single_threaded_executor::dispatcher_impl final
|
: public suspended_task::resolver {
|
public:
|
dispatcher_impl();
|
|
void shutdown();
|
void schedule_task(pending_task task);
|
void run(context_impl& context);
|
suspended_task suspend_current_task();
|
|
suspended_task::ticket duplicate_ticket(
|
suspended_task::ticket ticket) override;
|
void resolve_ticket(
|
suspended_task::ticket ticket, bool resume_task) override;
|
|
private:
|
~dispatcher_impl() override;
|
|
void wait_for_runnable_tasks(
|
fit::subtle::scheduler::task_queue* out_tasks);
|
void run_task(pending_task* task, context& context);
|
|
suspended_task::ticket current_task_ticket_ = 0;
|
std::condition_variable wake_;
|
|
// A bunch of state that is guarded by a mutex.
|
struct {
|
std::mutex mutex_;
|
bool was_shutdown_ FIT_GUARDED(mutex_) = false;
|
bool need_wake_ FIT_GUARDED(mutex_) = false;
|
fit::subtle::scheduler scheduler_ FIT_GUARDED(mutex_);
|
} guarded_;
|
};
|
|
single_threaded_executor::single_threaded_executor()
|
: context_(this), dispatcher_(new dispatcher_impl()) {}
|
|
single_threaded_executor::~single_threaded_executor() {
|
dispatcher_->shutdown();
|
}
|
|
void single_threaded_executor::schedule_task(pending_task task) {
|
assert(task);
|
dispatcher_->schedule_task(std::move(task));
|
}
|
|
void single_threaded_executor::run() {
|
dispatcher_->run(context_);
|
}
|
|
single_threaded_executor::context_impl::context_impl(single_threaded_executor* executor)
|
: executor_(executor) {}
|
|
single_threaded_executor::context_impl::~context_impl() = default;
|
|
single_threaded_executor* single_threaded_executor::context_impl::executor() const {
|
return executor_;
|
}
|
|
suspended_task single_threaded_executor::context_impl::suspend_task() {
|
return executor_->dispatcher_->suspend_current_task();
|
}
|
|
single_threaded_executor::dispatcher_impl::dispatcher_impl() = default;
|
|
single_threaded_executor::dispatcher_impl::~dispatcher_impl() {
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
assert(guarded_.was_shutdown_);
|
assert(!guarded_.scheduler_.has_runnable_tasks());
|
assert(!guarded_.scheduler_.has_suspended_tasks());
|
assert(!guarded_.scheduler_.has_outstanding_tickets());
|
}
|
|
void single_threaded_executor::dispatcher_impl::shutdown() {
|
fit::subtle::scheduler::task_queue tasks; // drop outside of the lock
|
{
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
assert(!guarded_.was_shutdown_);
|
guarded_.was_shutdown_ = true;
|
guarded_.scheduler_.take_all_tasks(&tasks);
|
if (guarded_.scheduler_.has_outstanding_tickets()) {
|
return; // can't delete self yet
|
}
|
}
|
|
// Must destroy self outside of the lock.
|
delete this;
|
}
|
|
void single_threaded_executor::dispatcher_impl::schedule_task(pending_task task) {
|
{
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
assert(!guarded_.was_shutdown_);
|
guarded_.scheduler_.schedule_task(std::move(task));
|
if (!guarded_.need_wake_) {
|
return; // don't need to wake
|
}
|
guarded_.need_wake_ = false;
|
}
|
|
// It is more efficient to notify outside the lock.
|
wake_.notify_one();
|
}
|
|
void single_threaded_executor::dispatcher_impl::run(context_impl& context) {
|
fit::subtle::scheduler::task_queue tasks;
|
for (;;) {
|
wait_for_runnable_tasks(&tasks);
|
if (tasks.empty()) {
|
return; // all done!
|
}
|
|
do {
|
run_task(&tasks.front(), context);
|
tasks.pop(); // the task may be destroyed here if it was not suspended
|
} while (!tasks.empty());
|
}
|
}
|
|
// Must only be called while |run_task()| is running a task.
|
// This happens when the task's continuation calls |context::suspend_task()|
|
// upon the context it received as an argument.
|
suspended_task single_threaded_executor::dispatcher_impl::suspend_current_task() {
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
assert(!guarded_.was_shutdown_);
|
if (current_task_ticket_ == 0) {
|
current_task_ticket_ = guarded_.scheduler_.obtain_ticket(
|
2 /*initial_refs*/);
|
} else {
|
guarded_.scheduler_.duplicate_ticket(current_task_ticket_);
|
}
|
return suspended_task(this, current_task_ticket_);
|
}
|
|
// Unfortunately std::unique_lock does not support thread-safety annotations
|
void single_threaded_executor::dispatcher_impl::wait_for_runnable_tasks(
|
fit::subtle::scheduler::task_queue* out_tasks) FIT_NO_THREAD_SAFETY_ANALYSIS {
|
std::unique_lock<std::mutex> lock(guarded_.mutex_);
|
for (;;) {
|
assert(!guarded_.was_shutdown_);
|
guarded_.scheduler_.take_runnable_tasks(out_tasks);
|
if (!out_tasks->empty()) {
|
return; // got some tasks
|
}
|
if (!guarded_.scheduler_.has_suspended_tasks()) {
|
return; // all done!
|
}
|
guarded_.need_wake_ = true;
|
wake_.wait(lock);
|
guarded_.need_wake_ = false;
|
}
|
}
|
|
void single_threaded_executor::dispatcher_impl::run_task(pending_task* task,
|
context& context) {
|
assert(current_task_ticket_ == 0);
|
const bool finished = (*task)(context);
|
assert(!*task == finished);
|
(void)finished;
|
if (current_task_ticket_ == 0) {
|
return; // task was not suspended, no ticket was produced
|
}
|
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
assert(!guarded_.was_shutdown_);
|
guarded_.scheduler_.finalize_ticket(current_task_ticket_, task);
|
current_task_ticket_ = 0;
|
}
|
|
suspended_task::ticket single_threaded_executor::dispatcher_impl::duplicate_ticket(
|
suspended_task::ticket ticket) {
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
guarded_.scheduler_.duplicate_ticket(ticket);
|
return ticket;
|
}
|
|
void single_threaded_executor::dispatcher_impl::resolve_ticket(
|
suspended_task::ticket ticket, bool resume_task) {
|
pending_task abandoned_task; // drop outside of the lock
|
bool do_wake = false;
|
{
|
std::lock_guard<std::mutex> lock(guarded_.mutex_);
|
if (resume_task) {
|
guarded_.scheduler_.resume_task_with_ticket(ticket);
|
} else {
|
abandoned_task = guarded_.scheduler_.release_ticket(ticket);
|
}
|
if (guarded_.was_shutdown_) {
|
assert(!guarded_.need_wake_);
|
if (guarded_.scheduler_.has_outstanding_tickets()) {
|
return; // can't shutdown yet
|
}
|
} else if (guarded_.need_wake_ &&
|
(guarded_.scheduler_.has_runnable_tasks() ||
|
!guarded_.scheduler_.has_suspended_tasks())) {
|
guarded_.need_wake_ = false;
|
do_wake = true;
|
} else {
|
return; // nothing else to do
|
}
|
}
|
|
// Must do this outside of the lock.
|
if (do_wake) {
|
wake_.notify_one();
|
} else {
|
delete this;
|
}
|
}
|
|
} // namespace fit
|
|
#endif // FIT_NO_STD_FOR_ZIRCON_USERSPACE
|