/* * Copyright (C) 2008-2011 Philippe Gerum . * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include #include #include "boilerplate/lock.h" #include "copperplate/threadobj.h" #include "copperplate/syncobj.h" #include "copperplate/debug.h" #include "internal.h" /* * XXX: The POSIX spec states that "Synchronization primitives that * attempt to interfere with scheduling policy by specifying an * ordering rule are considered undesirable. Threads waiting on * mutexes and condition variables are selected to proceed in an order * dependent upon the scheduling policy rather than in some fixed * order (for example, FIFO or priority). Thus, the scheduling policy * determines which thread(s) are awakened and allowed to proceed.". * Linux enforces this by always queuing SCHED_FIFO waiters by * priority when sleeping on futex objects, which underlay mutexes and * condition variables. * * Unfortunately, most non-POSIX RTOS do allow specifying the queuing * order which applies to their synchronization objects at creation * time, and ignoring the FIFO queuing requirement may break the * application in case a fair attribution of the resource is * expected. Therefore, we must emulate FIFO ordering, and we do that * using an internal queue. We also use this queue to implement the * flush operation on synchronization objects which POSIX does not * provide either. * * The syncobj abstraction is based on a complex monitor object to * wait for resources, either implemented natively by Cobalt or * emulated via a mutex and two condition variables over Mercury (one * of which being hosted by the thread object implementation). * * NOTE: we don't do error backtracing in this file, since error * returns when locking, pending or deleting sync objects usually * express normal runtime conditions. */ #ifdef CONFIG_XENO_COBALT #include "cobalt/internal.h" static inline int monitor_enter(struct syncobj *sobj) { return cobalt_monitor_enter(&sobj->core.monitor); } static inline void monitor_exit(struct syncobj *sobj) { int ret; ret = cobalt_monitor_exit(&sobj->core.monitor); assert(ret == 0); (void)ret; } static inline int monitor_wait_grant(struct syncobj *sobj, struct threadobj *current, const struct timespec *timeout) { return cobalt_monitor_wait(&sobj->core.monitor, COBALT_MONITOR_WAITGRANT, timeout); } static inline int monitor_wait_drain(struct syncobj *sobj, struct threadobj *current, const struct timespec *timeout) { return cobalt_monitor_wait(&sobj->core.monitor, COBALT_MONITOR_WAITDRAIN, timeout); } static inline void monitor_grant(struct syncobj *sobj, struct threadobj *thobj) { cobalt_monitor_grant(&sobj->core.monitor, threadobj_get_window(&thobj->core)); } static inline void monitor_drain_all(struct syncobj *sobj) { cobalt_monitor_drain_all(&sobj->core.monitor); } static inline int syncobj_init_corespec(struct syncobj *sobj, clockid_t clk_id) { int flags = monitor_scope_attribute; return __bt(cobalt_monitor_init(&sobj->core.monitor, clk_id, flags)); } static inline void syncobj_cleanup_corespec(struct syncobj *sobj) { /* We hold the gate lock while destroying. */ int ret = cobalt_monitor_destroy(&sobj->core.monitor); /* Let earlier EPERM condition propagate, don't trap. */ assert(ret == 0 || ret == -EPERM); (void)ret; } #else /* CONFIG_XENO_MERCURY */ static inline int monitor_enter(struct syncobj *sobj) { return -pthread_mutex_lock(&sobj->core.lock); } static inline void monitor_exit(struct syncobj *sobj) { int ret; ret = pthread_mutex_unlock(&sobj->core.lock); assert(ret == 0); (void)ret; } static inline int monitor_wait_grant(struct syncobj *sobj, struct threadobj *current, const struct timespec *timeout) { if (timeout) return -threadobj_cond_timedwait(¤t->core.grant_sync, &sobj->core.lock, timeout); return -threadobj_cond_wait(¤t->core.grant_sync, &sobj->core.lock); } static inline int monitor_wait_drain(struct syncobj *sobj, struct threadobj *current, const struct timespec *timeout) { if (timeout) return -threadobj_cond_timedwait(&sobj->core.drain_sync, &sobj->core.lock, timeout); return -threadobj_cond_wait(&sobj->core.drain_sync, &sobj->core.lock); } static inline void monitor_grant(struct syncobj *sobj, struct threadobj *thobj) { threadobj_cond_signal(&thobj->core.grant_sync); } static inline void monitor_drain_all(struct syncobj *sobj) { threadobj_cond_broadcast(&sobj->core.drain_sync); } /* * Over Mercury, we implement a complex monitor via a mutex and a * couple of condvars, one in the syncobj and the other owned by the * thread object. */ static inline int syncobj_init_corespec(struct syncobj *sobj, clockid_t clk_id) { pthread_mutexattr_t mattr; pthread_condattr_t cattr; int ret; pthread_mutexattr_init(&mattr); pthread_mutexattr_settype(&mattr, mutex_type_attribute); pthread_mutexattr_setprotocol(&mattr, PTHREAD_PRIO_INHERIT); ret = __bt(-pthread_mutexattr_setpshared(&mattr, mutex_scope_attribute)); if (ret) { pthread_mutexattr_destroy(&mattr); return ret; } ret = __bt(-pthread_mutex_init(&sobj->core.lock, &mattr)); pthread_mutexattr_destroy(&mattr); if (ret) return ret; pthread_condattr_init(&cattr); pthread_condattr_setpshared(&cattr, mutex_scope_attribute); ret = __bt(pthread_condattr_setclock(&cattr, clk_id)); if (ret) goto fail; ret = __bt(-pthread_cond_init(&sobj->core.drain_sync, &cattr)); pthread_condattr_destroy(&cattr); if (ret) { fail: pthread_mutex_destroy(&sobj->core.lock); return ret; } return 0; } static inline void syncobj_cleanup_corespec(struct syncobj *sobj) { monitor_exit(sobj); pthread_cond_destroy(&sobj->core.drain_sync); pthread_mutex_destroy(&sobj->core.lock); } #endif /* CONFIG_XENO_MERCURY */ int syncobj_init(struct syncobj *sobj, clockid_t clk_id, int flags, fnref_type(void (*)(struct syncobj *sobj)) finalizer) { sobj->flags = flags; list_init(&sobj->grant_list); list_init(&sobj->drain_list); sobj->grant_count = 0; sobj->drain_count = 0; sobj->wait_count = 0; sobj->finalizer = finalizer; sobj->magic = SYNCOBJ_MAGIC; return __bt(syncobj_init_corespec(sobj, clk_id)); } int syncobj_lock(struct syncobj *sobj, struct syncstate *syns) { int ret, oldstate; /* * This magic prevents concurrent locking while a deletion is * in progress, waiting for the release count to drop to zero. */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate); ret = monitor_enter(sobj); if (ret) goto fail; /* Check for an ongoing deletion. */ if (sobj->magic != SYNCOBJ_MAGIC) { monitor_exit(sobj); ret = -EINVAL; goto fail; } syns->state = oldstate; __syncobj_tag_locked(sobj); return 0; fail: pthread_setcancelstate(oldstate, NULL); return ret; } void syncobj_unlock(struct syncobj *sobj, struct syncstate *syns) { __syncobj_tag_unlocked(sobj); monitor_exit(sobj); pthread_setcancelstate(syns->state, NULL); } static void __syncobj_finalize(struct syncobj *sobj) { void (*finalizer)(struct syncobj *sobj); /* * Cancelability is still disabled or we are running over the * thread finalizer, therefore we can't be wiped off in the * middle of the finalization process. */ syncobj_cleanup_corespec(sobj); fnref_get(finalizer, sobj->finalizer); if (finalizer) finalizer(sobj); } int __syncobj_broadcast_grant(struct syncobj *sobj, int reason) { struct threadobj *thobj; int ret; assert(!list_empty(&sobj->grant_list)); do { thobj = list_pop_entry(&sobj->grant_list, struct threadobj, wait_link); thobj->wait_status |= reason; thobj->wait_sobj = NULL; monitor_grant(sobj, thobj); } while (!list_empty(&sobj->grant_list)); ret = sobj->grant_count; sobj->grant_count = 0; return ret; } int __syncobj_broadcast_drain(struct syncobj *sobj, int reason) { struct threadobj *thobj; int ret; assert(!list_empty(&sobj->drain_list)); do { thobj = list_pop_entry(&sobj->drain_list, struct threadobj, wait_link); thobj->wait_sobj = NULL; thobj->wait_status |= reason; } while (!list_empty(&sobj->drain_list)); monitor_drain_all(sobj); ret = sobj->drain_count; sobj->drain_count = 0; return ret; } static inline void enqueue_waiter(struct syncobj *sobj, struct threadobj *thobj) { struct threadobj *__thobj; thobj->wait_prio = thobj->global_priority; if (list_empty(&sobj->grant_list) || (sobj->flags & SYNCOBJ_PRIO) == 0) { list_append(&thobj->wait_link, &sobj->grant_list); return; } list_for_each_entry_reverse(__thobj, &sobj->grant_list, wait_link) { if (thobj->wait_prio <= __thobj->wait_prio) break; } ath(&__thobj->wait_link, &thobj->wait_link); } static inline void dequeue_waiter(struct syncobj *sobj, struct threadobj *thobj) { list_remove(&thobj->wait_link); if (thobj->wait_status & SYNCOBJ_DRAINWAIT) sobj->drain_count--; else sobj->grant_count--; assert(sobj->wait_count > 0); } /* * NOTE: we don't use POSIX cleanup handlers in syncobj_wait_grant() and * syncobj_wait() on purpose: these may have a significant impact on * latency due to I-cache misses on low-end hardware (e.g. ~6 us on * MPC5200), particularly when unwinding the cancel frame. So the * cleanup handler below is called by the threadobj finalizer instead * when appropriate, since we have enough internal information to * handle this situation. */ void __syncobj_cleanup_wait(struct syncobj *sobj, struct threadobj *thobj) { /* * We don't care about resetting the original cancel type * saved in the syncstate struct since we are there precisely * because the caller got cancelled while sleeping on the * GRANT/DRAIN condition. */ dequeue_waiter(sobj, thobj); if (--sobj->wait_count == 0 && sobj->magic != SYNCOBJ_MAGIC) { __syncobj_finalize(sobj); return; } monitor_exit(sobj); } struct threadobj *syncobj_grant_one(struct syncobj *sobj) { struct threadobj *thobj; __syncobj_check_locked(sobj); if (list_empty(&sobj->grant_list)) return NULL; thobj = list_pop_entry(&sobj->grant_list, struct threadobj, wait_link); thobj->wait_status |= SYNCOBJ_SIGNALED; thobj->wait_sobj = NULL; sobj->grant_count--; monitor_grant(sobj, thobj); return thobj; } void syncobj_grant_to(struct syncobj *sobj, struct threadobj *thobj) { __syncobj_check_locked(sobj); list_remove(&thobj->wait_link); thobj->wait_status |= SYNCOBJ_SIGNALED; thobj->wait_sobj = NULL; sobj->grant_count--; monitor_grant(sobj, thobj); } struct threadobj *syncobj_peek_grant(struct syncobj *sobj) { struct threadobj *thobj; __syncobj_check_locked(sobj); if (list_empty(&sobj->grant_list)) return NULL; thobj = list_first_entry(&sobj->grant_list, struct threadobj, wait_link); return thobj; } struct threadobj *syncobj_peek_drain(struct syncobj *sobj) { struct threadobj *thobj; __syncobj_check_locked(sobj); if (list_empty(&sobj->drain_list)) return NULL; thobj = list_first_entry(&sobj->drain_list, struct threadobj, wait_link); return thobj; } static int wait_epilogue(struct syncobj *sobj, struct syncstate *syns, struct threadobj *current, int ret) { current->run_state = __THREAD_S_RUNNING; /* * Fixup a potential race upon return from grant/drain_wait * operations, e.g. given two threads A and B: * * A:enqueue_waiter(self) * A:monitor_wait * A:monitor_unlock * A:[timed] sleep * A:wakeup on timeout/interrupt * B:monitor_lock * B:look_for_queued_waiter * (found A, update A's state) * B:monitor_unlock * A:dequeue_waiter(self) * A:return -ETIMEDOUT/-EINTR * * The race may happen anytime between the timeout/interrupt * event is received by A, and the moment it grabs back the * monitor lock before unqueuing. When the race happens, B can * squeeze in a signal before A unqueues after resumption on * error. * * Problem: A's internal state has been updated (e.g. some * data transferred to it), but it will receive * -ETIMEDOUT/-EINTR, causing it to miss the update * eventually. * * Solution: fixup the status code upon return from * wait_grant/drain operations, so that -ETIMEDOUT/-EINTR is * never returned to the caller if the syncobj was actually * signaled. We still allow the SYNCOBJ_FLUSHED condition to * override that success code though. * * Whether a condition should be deemed satisfied if it is * signaled during the race window described above is * debatable, but this is a simple and straightforward way to * handle such grey area. */ if (current->wait_sobj) { dequeue_waiter(sobj, current); current->wait_sobj = NULL; } else if (ret == -ETIMEDOUT || ret == -EINTR) ret = 0; sobj->wait_count--; assert(sobj->wait_count >= 0); if (sobj->magic != SYNCOBJ_MAGIC) { if (sobj->wait_count == 0) __syncobj_finalize(sobj); else monitor_exit(sobj); pthread_setcancelstate(syns->state, NULL); return -EIDRM; } if (current->wait_status & SYNCOBJ_FLUSHED) return -EINTR; return ret; } int syncobj_wait_grant(struct syncobj *sobj, const struct timespec *timeout, struct syncstate *syns) { struct threadobj *current = threadobj_current(); int ret, state; __syncobj_check_locked(sobj); assert(current != NULL); current->run_state = timeout ? __THREAD_S_TIMEDWAIT : __THREAD_S_WAIT; threadobj_save_timeout(¤t->core, timeout); current->wait_status = 0; enqueue_waiter(sobj, current); current->wait_sobj = sobj; sobj->grant_count++; sobj->wait_count++; /* * NOTE: we are guaranteed to be in deferred cancel mode, with * cancelability disabled (in syncobj_lock); re-enable it * before pending on the condvar. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &state); assert(state == PTHREAD_CANCEL_DISABLE); do { __syncobj_tag_unlocked(sobj); ret = monitor_wait_grant(sobj, current, timeout); __syncobj_tag_locked(sobj); /* Check for spurious wake up. */ } while (ret == 0 && current->wait_sobj); pthread_setcancelstate(state, NULL); return wait_epilogue(sobj, syns, current, ret); } int syncobj_wait_drain(struct syncobj *sobj, const struct timespec *timeout, struct syncstate *syns) { struct threadobj *current = threadobj_current(); int ret, state; __syncobj_check_locked(sobj); assert(current != NULL); current->run_state = timeout ? __THREAD_S_TIMEDWAIT : __THREAD_S_WAIT; threadobj_save_timeout(¤t->core, timeout); current->wait_status = SYNCOBJ_DRAINWAIT; list_append(¤t->wait_link, &sobj->drain_list); current->wait_sobj = sobj; sobj->drain_count++; sobj->wait_count++; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &state); assert(state == PTHREAD_CANCEL_DISABLE); /* * NOTE: Since the DRAINED signal is broadcast to all waiters, * a race may exist for acting upon it among those * threads. Therefore the caller must check that the drain * condition is still true before proceeding. */ do { __syncobj_tag_unlocked(sobj); ret = monitor_wait_drain(sobj, current, timeout); __syncobj_tag_locked(sobj); } while (ret == 0 && current->wait_sobj); pthread_setcancelstate(state, NULL); return wait_epilogue(sobj, syns, current, ret); } int syncobj_destroy(struct syncobj *sobj, struct syncstate *syns) { int ret; __syncobj_check_locked(sobj); sobj->magic = ~SYNCOBJ_MAGIC; ret = syncobj_flush(sobj); if (ret) { syncobj_unlock(sobj, syns); return ret; } /* No thread awaken - we may dispose immediately. */ __syncobj_finalize(sobj); pthread_setcancelstate(syns->state, NULL); return 0; } void syncobj_uninit(struct syncobj *sobj) { monitor_enter(sobj); assert(sobj->wait_count == 0); syncobj_cleanup_corespec(sobj); }