// Copyright 2017 The Fuchsia Authors. All rights reserved.
|
// Use of this source code is governed by a BSD-style license that can be
|
// found in the LICENSE file.
|
|
#ifndef _ALL_SOURCE
|
#define _ALL_SOURCE // Enables thrd_create_with_name in <threads.h>.
|
#endif
|
#include <lib/async-loop/loop.h>
|
|
#include <assert.h>
|
#include <stdatomic.h>
|
#include <stdlib.h>
|
|
#include <zircon/assert.h>
|
#include <zircon/listnode.h>
|
#include <zircon/syscalls.h>
|
#include <zircon/syscalls/hypervisor.h>
|
|
#include <lib/async/default.h>
|
#include <lib/async/exception.h>
|
#include <lib/async/receiver.h>
|
#include <lib/async/task.h>
|
#include <lib/async/trap.h>
|
#include <lib/async/wait.h>
|
|
// The port wait key associated with the dispatcher's control messages.
|
#define KEY_CONTROL (0u)
|
|
static zx_time_t async_loop_now(async_dispatcher_t* dispatcher);
|
static zx_status_t async_loop_begin_wait(async_dispatcher_t* dispatcher, async_wait_t* wait);
|
static zx_status_t async_loop_cancel_wait(async_dispatcher_t* dispatcher, async_wait_t* wait);
|
static zx_status_t async_loop_post_task(async_dispatcher_t* dispatcher, async_task_t* task);
|
static zx_status_t async_loop_cancel_task(async_dispatcher_t* dispatcher, async_task_t* task);
|
static zx_status_t async_loop_queue_packet(async_dispatcher_t* dispatcher, async_receiver_t* receiver,
|
const zx_packet_user_t* data);
|
static zx_status_t async_loop_set_guest_bell_trap(
|
async_dispatcher_t* dispatcher, async_guest_bell_trap_t* trap,
|
zx_handle_t guest, zx_vaddr_t addr, size_t length);
|
static zx_status_t async_loop_bind_exception_port(async_dispatcher_t* async,
|
async_exception_t* exception);
|
static zx_status_t async_loop_unbind_exception_port(async_dispatcher_t* async,
|
async_exception_t* exception);
|
static zx_status_t async_loop_resume_from_exception(async_dispatcher_t* async,
|
async_exception_t* exception,
|
zx_handle_t task,
|
uint32_t options);
|
|
static const async_ops_t async_loop_ops = {
|
.version = ASYNC_OPS_V2,
|
.reserved = 0,
|
.v1 = {
|
.now = async_loop_now,
|
.begin_wait = async_loop_begin_wait,
|
.cancel_wait = async_loop_cancel_wait,
|
.post_task = async_loop_post_task,
|
.cancel_task = async_loop_cancel_task,
|
.queue_packet = async_loop_queue_packet,
|
.set_guest_bell_trap = async_loop_set_guest_bell_trap,
|
},
|
.v2 = {
|
.bind_exception_port = async_loop_bind_exception_port,
|
.unbind_exception_port = async_loop_unbind_exception_port,
|
.resume_from_exception = async_loop_resume_from_exception,
|
},
|
};
|
|
typedef struct thread_record {
|
list_node_t node;
|
thrd_t thread;
|
} thread_record_t;
|
|
const async_loop_config_t kAsyncLoopConfigAttachToThread = {
|
.make_default_for_current_thread = true};
|
const async_loop_config_t kAsyncLoopConfigNoAttachToThread = {
|
.make_default_for_current_thread = false};
|
|
typedef struct async_loop {
|
async_dispatcher_t dispatcher; // must be first (the loop inherits from async_dispatcher_t)
|
async_loop_config_t config; // immutable
|
zx_handle_t port; // immutable
|
zx_handle_t timer; // immutable
|
|
_Atomic async_loop_state_t state;
|
atomic_uint active_threads; // number of active dispatch threads
|
|
mtx_t lock; // guards the lists and the dispatching tasks flag
|
bool dispatching_tasks; // true while the loop is busy dispatching tasks
|
list_node_t wait_list; // most recently added first
|
list_node_t task_list; // pending tasks, earliest deadline first
|
list_node_t due_list; // due tasks, earliest deadline first
|
list_node_t thread_list; // earliest created thread first
|
list_node_t exception_list; // most recently added first
|
} async_loop_t;
|
|
static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline);
|
static zx_status_t async_loop_dispatch_wait(async_loop_t* loop, async_wait_t* wait,
|
zx_status_t status, const zx_packet_signal_t* signal);
|
static zx_status_t async_loop_dispatch_tasks(async_loop_t* loop);
|
static void async_loop_dispatch_task(async_loop_t* loop, async_task_t* task,
|
zx_status_t status);
|
static zx_status_t async_loop_dispatch_packet(async_loop_t* loop, async_receiver_t* receiver,
|
zx_status_t status, const zx_packet_user_t* data);
|
static zx_status_t async_loop_dispatch_guest_bell_trap(async_loop_t* loop,
|
async_guest_bell_trap_t* trap,
|
zx_status_t status,
|
const zx_packet_guest_bell_t* bell);
|
static zx_status_t async_loop_dispatch_exception(async_loop_t* loop,
|
async_exception_t* exception,
|
zx_status_t status,
|
const zx_port_packet_t* report);
|
static void async_loop_wake_threads(async_loop_t* loop);
|
static void async_loop_insert_task_locked(async_loop_t* loop, async_task_t* task);
|
static void async_loop_restart_timer_locked(async_loop_t* loop);
|
static void async_loop_invoke_prologue(async_loop_t* loop);
|
static void async_loop_invoke_epilogue(async_loop_t* loop);
|
|
static_assert(sizeof(list_node_t) <= sizeof(async_state_t),
|
"async_state_t too small");
|
|
#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
|
#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
|
|
static inline list_node_t* wait_to_node(async_wait_t* wait) {
|
return TO_NODE(async_wait_t, wait);
|
}
|
|
static inline async_wait_t* node_to_wait(list_node_t* node) {
|
return FROM_NODE(async_wait_t, node);
|
}
|
|
static inline list_node_t* task_to_node(async_task_t* task) {
|
return TO_NODE(async_task_t, task);
|
}
|
|
static inline async_task_t* node_to_task(list_node_t* node) {
|
return FROM_NODE(async_task_t, node);
|
}
|
|
static inline list_node_t* exception_to_node(async_exception_t* exception) {
|
return TO_NODE(async_exception_t, exception);
|
}
|
|
static inline async_exception_t* node_to_exception(list_node_t* node) {
|
return FROM_NODE(async_exception_t, node);
|
}
|
|
zx_status_t async_loop_create(const async_loop_config_t* config, async_loop_t** out_loop) {
|
ZX_DEBUG_ASSERT(out_loop);
|
ZX_DEBUG_ASSERT(config != NULL);
|
|
async_loop_t* loop = calloc(1u, sizeof(async_loop_t));
|
if (!loop)
|
return ZX_ERR_NO_MEMORY;
|
atomic_init(&loop->state, ASYNC_LOOP_RUNNABLE);
|
atomic_init(&loop->active_threads, 0u);
|
|
loop->dispatcher.ops = &async_loop_ops;
|
loop->config = *config;
|
mtx_init(&loop->lock, mtx_plain);
|
list_initialize(&loop->wait_list);
|
list_initialize(&loop->task_list);
|
list_initialize(&loop->due_list);
|
list_initialize(&loop->thread_list);
|
list_initialize(&loop->exception_list);
|
|
zx_status_t status = zx_port_create(0u, &loop->port);
|
if (status == ZX_OK)
|
status = zx_timer_create(0u, ZX_CLOCK_MONOTONIC, &loop->timer);
|
if (status == ZX_OK) {
|
status = zx_object_wait_async(loop->timer, loop->port, KEY_CONTROL,
|
ZX_TIMER_SIGNALED,
|
ZX_WAIT_ASYNC_REPEATING);
|
}
|
if (status == ZX_OK) {
|
*out_loop = loop;
|
if (loop->config.make_default_for_current_thread) {
|
ZX_DEBUG_ASSERT(async_get_default_dispatcher() == NULL);
|
async_set_default_dispatcher(&loop->dispatcher);
|
}
|
} else {
|
loop->config.make_default_for_current_thread = false;
|
async_loop_destroy(loop);
|
}
|
return status;
|
}
|
|
void async_loop_destroy(async_loop_t* loop) {
|
ZX_DEBUG_ASSERT(loop);
|
|
async_loop_shutdown(loop);
|
|
zx_handle_close(loop->port);
|
zx_handle_close(loop->timer);
|
mtx_destroy(&loop->lock);
|
free(loop);
|
}
|
|
void async_loop_shutdown(async_loop_t* loop) {
|
ZX_DEBUG_ASSERT(loop);
|
|
async_loop_state_t prior_state =
|
atomic_exchange_explicit(&loop->state, ASYNC_LOOP_SHUTDOWN,
|
memory_order_acq_rel);
|
if (prior_state == ASYNC_LOOP_SHUTDOWN)
|
return;
|
|
async_loop_wake_threads(loop);
|
async_loop_join_threads(loop);
|
|
list_node_t* node;
|
while ((node = list_remove_head(&loop->wait_list))) {
|
async_wait_t* wait = node_to_wait(node);
|
async_loop_dispatch_wait(loop, wait, ZX_ERR_CANCELED, NULL);
|
}
|
while ((node = list_remove_head(&loop->due_list))) {
|
async_task_t* task = node_to_task(node);
|
async_loop_dispatch_task(loop, task, ZX_ERR_CANCELED);
|
}
|
while ((node = list_remove_head(&loop->task_list))) {
|
async_task_t* task = node_to_task(node);
|
async_loop_dispatch_task(loop, task, ZX_ERR_CANCELED);
|
}
|
while ((node = list_remove_head(&loop->exception_list))) {
|
async_exception_t* exception = node_to_exception(node);
|
async_loop_dispatch_exception(loop, exception, ZX_ERR_CANCELED, NULL);
|
}
|
|
if (loop->config.make_default_for_current_thread) {
|
ZX_DEBUG_ASSERT(async_get_default_dispatcher() == &loop->dispatcher);
|
async_set_default_dispatcher(NULL);
|
}
|
}
|
|
zx_status_t async_loop_run(async_loop_t* loop, zx_time_t deadline, bool once) {
|
ZX_DEBUG_ASSERT(loop);
|
|
zx_status_t status;
|
atomic_fetch_add_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
|
do {
|
status = async_loop_run_once(loop, deadline);
|
} while (status == ZX_OK && !once);
|
atomic_fetch_sub_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
|
return status;
|
}
|
|
zx_status_t async_loop_run_until_idle(async_loop_t* loop) {
|
zx_status_t status = async_loop_run(loop, 0, false);
|
if (status == ZX_ERR_TIMED_OUT) {
|
status = ZX_OK;
|
}
|
return status;
|
}
|
|
static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline) {
|
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
|
if (state == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
if (state != ASYNC_LOOP_RUNNABLE)
|
return ZX_ERR_CANCELED;
|
|
zx_port_packet_t packet;
|
zx_status_t status = zx_port_wait(loop->port, deadline, &packet);
|
if (status != ZX_OK)
|
return status;
|
|
if (packet.key == KEY_CONTROL) {
|
// Handle wake-up packets.
|
if (packet.type == ZX_PKT_TYPE_USER)
|
return ZX_OK;
|
|
// Handle task timer expirations.
|
if (packet.type == ZX_PKT_TYPE_SIGNAL_REP &&
|
packet.signal.observed & ZX_TIMER_SIGNALED) {
|
return async_loop_dispatch_tasks(loop);
|
}
|
} else {
|
// Handle wait completion packets.
|
if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {
|
async_wait_t* wait = (void*)(uintptr_t)packet.key;
|
mtx_lock(&loop->lock);
|
list_delete(wait_to_node(wait));
|
mtx_unlock(&loop->lock);
|
return async_loop_dispatch_wait(loop, wait, packet.status, &packet.signal);
|
}
|
|
// Handle queued user packets.
|
if (packet.type == ZX_PKT_TYPE_USER) {
|
async_receiver_t* receiver = (void*)(uintptr_t)packet.key;
|
return async_loop_dispatch_packet(loop, receiver, packet.status, &packet.user);
|
}
|
|
// Handle guest bell trap packets.
|
if (packet.type == ZX_PKT_TYPE_GUEST_BELL) {
|
async_guest_bell_trap_t* trap = (void*)(uintptr_t)packet.key;
|
return async_loop_dispatch_guest_bell_trap(
|
loop, trap, packet.status, &packet.guest_bell);
|
}
|
|
// Handle exception packets.
|
if (ZX_PKT_IS_EXCEPTION(packet.type)) {
|
async_exception_t* exception = (void*)(uintptr_t)packet.key;
|
return async_loop_dispatch_exception(loop, exception, packet.status,
|
&packet);
|
}
|
}
|
|
ZX_DEBUG_ASSERT(false);
|
return ZX_ERR_INTERNAL;
|
}
|
|
async_dispatcher_t* async_loop_get_dispatcher(async_loop_t* loop) {
|
// Note: The loop's implementation inherits from async_t so we can upcast to it.
|
return (async_dispatcher_t*)loop;
|
}
|
|
async_loop_t* async_loop_from_dispatcher(async_dispatcher_t* async) {
|
return (async_loop_t*)async;
|
}
|
|
static zx_status_t async_loop_dispatch_guest_bell_trap(async_loop_t* loop,
|
async_guest_bell_trap_t* trap,
|
zx_status_t status,
|
const zx_packet_guest_bell_t* bell) {
|
async_loop_invoke_prologue(loop);
|
trap->handler((async_dispatcher_t*)loop, trap, status, bell);
|
async_loop_invoke_epilogue(loop);
|
return ZX_OK;
|
}
|
|
static zx_status_t async_loop_dispatch_wait(async_loop_t* loop, async_wait_t* wait,
|
zx_status_t status, const zx_packet_signal_t* signal) {
|
async_loop_invoke_prologue(loop);
|
wait->handler((async_dispatcher_t*)loop, wait, status, signal);
|
async_loop_invoke_epilogue(loop);
|
return ZX_OK;
|
}
|
|
static zx_status_t async_loop_dispatch_tasks(async_loop_t* loop) {
|
// Dequeue and dispatch one task at a time in case an earlier task wants
|
// to cancel a later task which has also come due. At most one thread
|
// can dispatch tasks at any given moment (to preserve serial ordering).
|
// Timer restarts are suppressed until we run out of tasks to dispatch.
|
mtx_lock(&loop->lock);
|
if (!loop->dispatching_tasks) {
|
loop->dispatching_tasks = true;
|
|
// Extract all of the tasks that are due into |due_list| for dispatch
|
// unless we already have some waiting from a previous iteration which
|
// we would like to process in order.
|
list_node_t* node;
|
if (list_is_empty(&loop->due_list)) {
|
zx_time_t due_time = async_loop_now((async_dispatcher_t*)loop);
|
list_node_t* tail = NULL;
|
list_for_every(&loop->task_list, node) {
|
if (node_to_task(node)->deadline > due_time)
|
break;
|
tail = node;
|
}
|
if (tail) {
|
list_node_t* head = loop->task_list.next;
|
loop->task_list.next = tail->next;
|
tail->next->prev = &loop->task_list;
|
loop->due_list.next = head;
|
head->prev = &loop->due_list;
|
loop->due_list.prev = tail;
|
tail->next = &loop->due_list;
|
}
|
}
|
|
// Dispatch all due tasks. Note that they might be canceled concurrently
|
// so we need to grab the lock during each iteration to fetch the next
|
// item from the list.
|
while ((node = list_remove_head(&loop->due_list))) {
|
mtx_unlock(&loop->lock);
|
|
// Invoke the handler. Note that it might destroy itself.
|
async_task_t* task = node_to_task(node);
|
async_loop_dispatch_task(loop, task, ZX_OK);
|
|
mtx_lock(&loop->lock);
|
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
|
if (state != ASYNC_LOOP_RUNNABLE)
|
break;
|
}
|
|
loop->dispatching_tasks = false;
|
async_loop_restart_timer_locked(loop);
|
}
|
mtx_unlock(&loop->lock);
|
return ZX_OK;
|
}
|
|
static void async_loop_dispatch_task(async_loop_t* loop,
|
async_task_t* task,
|
zx_status_t status) {
|
// Invoke the handler. Note that it might destroy itself.
|
async_loop_invoke_prologue(loop);
|
task->handler((async_dispatcher_t*)loop, task, status);
|
async_loop_invoke_epilogue(loop);
|
}
|
|
static zx_status_t async_loop_dispatch_packet(async_loop_t* loop, async_receiver_t* receiver,
|
zx_status_t status, const zx_packet_user_t* data) {
|
// Invoke the handler. Note that it might destroy itself.
|
async_loop_invoke_prologue(loop);
|
receiver->handler((async_dispatcher_t*)loop, receiver, status, data);
|
async_loop_invoke_epilogue(loop);
|
return ZX_OK;
|
}
|
|
static zx_status_t async_loop_dispatch_exception(async_loop_t* loop,
|
async_exception_t* exception,
|
zx_status_t status,
|
const zx_port_packet_t* report) {
|
// Invoke the handler. Note that it might destroy itself.
|
async_loop_invoke_prologue(loop);
|
exception->handler((async_dispatcher_t*)loop, exception, status, report);
|
async_loop_invoke_epilogue(loop);
|
return ZX_OK;
|
}
|
|
void async_loop_quit(async_loop_t* loop) {
|
ZX_DEBUG_ASSERT(loop);
|
|
async_loop_state_t expected_state = ASYNC_LOOP_RUNNABLE;
|
if (!atomic_compare_exchange_strong_explicit(&loop->state, &expected_state,
|
ASYNC_LOOP_QUIT,
|
memory_order_acq_rel, memory_order_acquire))
|
return;
|
|
async_loop_wake_threads(loop);
|
}
|
|
static void async_loop_wake_threads(async_loop_t* loop) {
|
// Queue enough packets to awaken all active threads.
|
// This is safe because any new threads which join the pool first increment the
|
// active thread count then check the loop state, so the count we observe here
|
// cannot be less than the number of threads which might be blocked in |port_wait|.
|
// Issuing too many packets is also harmless.
|
uint32_t n = atomic_load_explicit(&loop->active_threads, memory_order_acquire);
|
for (uint32_t i = 0u; i < n; i++) {
|
zx_port_packet_t packet = {
|
.key = KEY_CONTROL,
|
.type = ZX_PKT_TYPE_USER,
|
.status = ZX_OK};
|
zx_status_t status = zx_port_queue(loop->port, &packet);
|
ZX_ASSERT_MSG(status == ZX_OK, "zx_port_queue: status=%d", status);
|
}
|
}
|
|
zx_status_t async_loop_reset_quit(async_loop_t* loop) {
|
ZX_DEBUG_ASSERT(loop);
|
|
// Ensure that there are no active threads before resetting the quit state.
|
// This check is inherently racy but not dangerously so. It's mainly a
|
// sanity check for client code so we can make a stronger statement about
|
// how |async_loop_reset_quit()| is supposed to be used.
|
uint32_t n = atomic_load_explicit(&loop->active_threads, memory_order_acquire);
|
if (n != 0)
|
return ZX_ERR_BAD_STATE;
|
|
async_loop_state_t expected_state = ASYNC_LOOP_QUIT;
|
if (atomic_compare_exchange_strong_explicit(&loop->state, &expected_state,
|
ASYNC_LOOP_RUNNABLE,
|
memory_order_acq_rel, memory_order_acquire)) {
|
return ZX_OK;
|
}
|
|
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
|
if (state == ASYNC_LOOP_RUNNABLE)
|
return ZX_OK;
|
return ZX_ERR_BAD_STATE;
|
}
|
|
async_loop_state_t async_loop_get_state(async_loop_t* loop) {
|
ZX_DEBUG_ASSERT(loop);
|
|
return atomic_load_explicit(&loop->state, memory_order_acquire);
|
}
|
|
zx_time_t async_loop_now(async_dispatcher_t* dispatcher) {
|
return zx_clock_get_monotonic();
|
}
|
|
static zx_status_t async_loop_begin_wait(async_dispatcher_t* async, async_wait_t* wait) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(wait);
|
|
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
mtx_lock(&loop->lock);
|
|
zx_status_t status = zx_object_wait_async(
|
wait->object, loop->port, (uintptr_t)wait, wait->trigger, ZX_WAIT_ASYNC_ONCE);
|
if (status == ZX_OK) {
|
list_add_head(&loop->wait_list, wait_to_node(wait));
|
} else {
|
ZX_ASSERT_MSG(status == ZX_ERR_ACCESS_DENIED,
|
"zx_object_wait_async: status=%d", status);
|
}
|
|
mtx_unlock(&loop->lock);
|
return status;
|
}
|
|
static zx_status_t async_loop_cancel_wait(async_dispatcher_t* async, async_wait_t* wait) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(wait);
|
|
// Note: We need to process cancelations even while the loop is being
|
// destroyed in case the client is counting on the handler not being
|
// invoked again past this point.
|
|
mtx_lock(&loop->lock);
|
|
// First, confirm that the wait is actually pending.
|
list_node_t* node = wait_to_node(wait);
|
if (!list_in_list(node)) {
|
mtx_unlock(&loop->lock);
|
return ZX_ERR_NOT_FOUND;
|
}
|
|
// Next, cancel the wait. This may be racing with another thread that
|
// has read the wait's packet but not yet dispatched it. So if we fail
|
// to cancel then we assume we lost the race.
|
zx_status_t status = zx_port_cancel(loop->port, wait->object,
|
(uintptr_t)wait);
|
if (status == ZX_OK) {
|
list_delete(node);
|
} else {
|
ZX_ASSERT_MSG(status == ZX_ERR_NOT_FOUND,
|
"zx_port_cancel: status=%d", status);
|
}
|
|
mtx_unlock(&loop->lock);
|
return status;
|
}
|
|
static zx_status_t async_loop_post_task(async_dispatcher_t* async, async_task_t* task) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(task);
|
|
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
mtx_lock(&loop->lock);
|
|
async_loop_insert_task_locked(loop, task);
|
if (!loop->dispatching_tasks &&
|
task_to_node(task)->prev == &loop->task_list) {
|
// Task inserted at head. Earliest deadline changed.
|
async_loop_restart_timer_locked(loop);
|
}
|
|
mtx_unlock(&loop->lock);
|
return ZX_OK;
|
}
|
|
static zx_status_t async_loop_cancel_task(async_dispatcher_t* async, async_task_t* task) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(task);
|
|
// Note: We need to process cancelations even while the loop is being
|
// destroyed in case the client is counting on the handler not being
|
// invoked again past this point. Also, the task we're removing here
|
// might be present in the dispatcher's |due_list| if it is pending
|
// dispatch instead of in the loop's |task_list| as usual. The same
|
// logic works in both cases.
|
|
mtx_lock(&loop->lock);
|
list_node_t* node = task_to_node(task);
|
if (!list_in_list(node)) {
|
mtx_unlock(&loop->lock);
|
return ZX_ERR_NOT_FOUND;
|
}
|
|
// Determine whether the head task was canceled and following task has
|
// a later deadline. If so, we will bump the timer along to that deadline.
|
bool must_restart = !loop->dispatching_tasks &&
|
node->prev == &loop->task_list &&
|
node->next != &loop->task_list &&
|
node_to_task(node->next)->deadline > task->deadline;
|
list_delete(node);
|
if (must_restart)
|
async_loop_restart_timer_locked(loop);
|
|
mtx_unlock(&loop->lock);
|
return ZX_OK;
|
}
|
|
static zx_status_t async_loop_queue_packet(async_dispatcher_t* async, async_receiver_t* receiver,
|
const zx_packet_user_t* data) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(receiver);
|
|
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
zx_port_packet_t packet = {
|
.key = (uintptr_t)receiver,
|
.type = ZX_PKT_TYPE_USER,
|
.status = ZX_OK};
|
if (data)
|
packet.user = *data;
|
return zx_port_queue(loop->port, &packet);
|
}
|
|
static zx_status_t async_loop_set_guest_bell_trap(
|
async_dispatcher_t* async, async_guest_bell_trap_t* trap,
|
zx_handle_t guest, zx_vaddr_t addr, size_t length) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(trap);
|
|
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
zx_status_t status = zx_guest_set_trap(guest, ZX_GUEST_TRAP_BELL, addr,
|
length, loop->port, (uintptr_t)trap);
|
if (status != ZX_OK) {
|
ZX_ASSERT_MSG(status == ZX_ERR_ACCESS_DENIED ||
|
status == ZX_ERR_ALREADY_EXISTS ||
|
status == ZX_ERR_INVALID_ARGS ||
|
status == ZX_ERR_OUT_OF_RANGE ||
|
status == ZX_ERR_WRONG_TYPE,
|
"zx_guest_set_trap: status=%d", status);
|
}
|
return status;
|
}
|
|
static zx_status_t async_loop_bind_exception_port(async_dispatcher_t* async,
|
async_exception_t* exception) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(exception);
|
|
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
mtx_lock(&loop->lock);
|
|
uint64_t key = (uintptr_t)(void*) exception;
|
zx_status_t status = zx_task_bind_exception_port(exception->task, loop->port,
|
key, exception->options);
|
if (status == ZX_OK) {
|
list_add_head(&loop->exception_list, exception_to_node(exception));
|
}
|
|
mtx_unlock(&loop->lock);
|
return status;
|
}
|
|
static zx_status_t async_loop_unbind_exception_port(async_dispatcher_t* async,
|
async_exception_t* exception) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(exception);
|
|
// Note: We need to process unbindings even while the loop is being
|
// destroyed in case the client is counting on the handler not being
|
// invoked again past this point.
|
|
mtx_lock(&loop->lock);
|
|
// First, confirm that the port is actually bound.
|
list_node_t* node = exception_to_node(exception);
|
if (!list_in_list(node)) {
|
mtx_unlock(&loop->lock);
|
return ZX_ERR_NOT_FOUND;
|
}
|
|
uint64_t key = (uintptr_t)(void*) exception;
|
zx_status_t status = zx_task_bind_exception_port(exception->task,
|
ZX_HANDLE_INVALID, key, 0);
|
|
if (status == ZX_OK) {
|
list_delete(node);
|
}
|
|
mtx_unlock(&loop->lock);
|
return status;
|
}
|
|
static zx_status_t async_loop_resume_from_exception(async_dispatcher_t* async,
|
async_exception_t* exception,
|
zx_handle_t task,
|
uint32_t options) {
|
async_loop_t* loop = (async_loop_t*)async;
|
ZX_DEBUG_ASSERT(loop);
|
ZX_DEBUG_ASSERT(exception);
|
|
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
return zx_task_resume_from_exception(task, loop->port, options);
|
}
|
|
static void async_loop_insert_task_locked(async_loop_t* loop, async_task_t* task) {
|
// TODO(ZX-976): We assume that tasks are inserted in quasi-monotonic order and
|
// that insertion into the task queue will typically take no more than a few steps.
|
// If this assumption proves false and the cost of insertion becomes a problem, we
|
// should consider using a more efficient representation for maintaining order.
|
list_node_t* node;
|
for (node = loop->task_list.prev; node != &loop->task_list; node = node->prev) {
|
if (task->deadline >= node_to_task(node)->deadline)
|
break;
|
}
|
list_add_after(node, task_to_node(task));
|
}
|
|
static void async_loop_restart_timer_locked(async_loop_t* loop) {
|
zx_time_t deadline;
|
if (list_is_empty(&loop->due_list)) {
|
list_node_t* head = list_peek_head(&loop->task_list);
|
if (!head)
|
return;
|
async_task_t* task = node_to_task(head);
|
deadline = task->deadline;
|
if (deadline == ZX_TIME_INFINITE)
|
return;
|
} else {
|
// Fire now.
|
deadline = 0ULL;
|
}
|
|
zx_status_t status = zx_timer_set(loop->timer, deadline, 0);
|
ZX_ASSERT_MSG(status == ZX_OK, "zx_timer_set: status=%d", status);
|
}
|
|
static void async_loop_invoke_prologue(async_loop_t* loop) {
|
if (loop->config.prologue)
|
loop->config.prologue(loop, loop->config.data);
|
}
|
|
static void async_loop_invoke_epilogue(async_loop_t* loop) {
|
if (loop->config.epilogue)
|
loop->config.epilogue(loop, loop->config.data);
|
}
|
|
static int async_loop_run_thread(void* data) {
|
async_loop_t* loop = (async_loop_t*)data;
|
async_set_default_dispatcher(&loop->dispatcher);
|
async_loop_run(loop, ZX_TIME_INFINITE, false);
|
return 0;
|
}
|
|
zx_status_t async_loop_start_thread(async_loop_t* loop, const char* name, thrd_t* out_thread) {
|
ZX_DEBUG_ASSERT(loop);
|
|
// This check is inherently racy. The client should not be racing shutdown
|
// with attemps to start new threads. This is mainly a sanity check.
|
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
|
if (state == ASYNC_LOOP_SHUTDOWN)
|
return ZX_ERR_BAD_STATE;
|
|
thread_record_t* rec = calloc(1u, sizeof(thread_record_t));
|
if (!rec)
|
return ZX_ERR_NO_MEMORY;
|
|
if (thrd_create_with_name(&rec->thread, async_loop_run_thread, loop, name) != thrd_success) {
|
free(rec);
|
return ZX_ERR_NO_MEMORY;
|
}
|
|
mtx_lock(&loop->lock);
|
list_add_tail(&loop->thread_list, &rec->node);
|
mtx_unlock(&loop->lock);
|
|
if (out_thread)
|
*out_thread = rec->thread;
|
return ZX_OK;
|
}
|
|
void async_loop_join_threads(async_loop_t* loop) {
|
ZX_DEBUG_ASSERT(loop);
|
|
mtx_lock(&loop->lock);
|
for (;;) {
|
thread_record_t* rec = (thread_record_t*)list_remove_head(&loop->thread_list);
|
if (!rec)
|
break;
|
|
mtx_unlock(&loop->lock);
|
thrd_t thread = rec->thread;
|
free(rec);
|
int result = thrd_join(thread, NULL);
|
ZX_DEBUG_ASSERT(result == thrd_success);
|
mtx_lock(&loop->lock);
|
}
|
mtx_unlock(&loop->lock);
|
}
|