/* * Copyright (C) 2011 Philippe Gerum . * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include #include #include #include #include #include "reference.h" #include "internal.h" #include "queue.h" #include "timer.h" /** * @ingroup alchemy * @defgroup alchemy_queue Message queue services * * real-time IPC mechanism for sending messages of arbitrary size * * Message queueing is a method by which real-time tasks can exchange * or pass data through a Xenomai-managed queue of messages. Messages * can vary in length and be assigned different types or usages. A * message queue can be created by one task and used by multiple tasks * that send and/or receive messages to the queue. * * @{ */ struct syncluster alchemy_queue_table; static DEFINE_NAME_GENERATOR(queue_namegen, "queue", struct alchemy_queue, name); DEFINE_SYNC_LOOKUP(queue, RT_QUEUE); #ifdef CONFIG_XENO_REGISTRY static int prepare_waiter_cache(struct fsobstack *o, struct obstack *cache, int item_count) { const struct alchemy_queue *qcb; fsobstack_grow_format(o, "--\n[WAITER]\n"); obstack_blank(cache, item_count * sizeof(qcb->name)); return 0; } static size_t collect_waiter_data(void *p, struct threadobj *thobj) { const char *name = threadobj_get_name(thobj); int len = strlen(name); strcpy(p, name); *(char *)(p + len) = '\n'; return len + 1; } static struct fsobstack_syncops fill_ops = { .prepare_cache = prepare_waiter_cache, .collect_data = collect_waiter_data, }; static int queue_registry_open(struct fsobj *fsobj, void *priv) { size_t usable_mem, used_mem, limit; struct fsobstack *o = priv; struct alchemy_queue *qcb; struct syncstate syns; unsigned int mcount; int mode, ret; qcb = container_of(fsobj, struct alchemy_queue, fsobj); ret = syncobj_lock(&qcb->sobj, &syns); if (ret) return -EIO; usable_mem = heapobj_size(&qcb->hobj); used_mem = heapobj_inquire(&qcb->hobj); limit = qcb->limit; mcount = qcb->mcount; mode = qcb->mode; syncobj_unlock(&qcb->sobj, &syns); fsobstack_init(o); fsobstack_grow_format(o, "%6s %10s %9s %8s %s\n", "[TYPE]", "[TOTALMEM]", "[USEDMEM]", "[QLIMIT]", "[MCOUNT]"); fsobstack_grow_format(o, " %s %9Zu %9Zu %8Zu %8u\n", mode & Q_PRIO ? "PRIO" : "FIFO", usable_mem, used_mem, limit, mcount); fsobstack_grow_syncobj_grant(o, &qcb->sobj, &fill_ops); fsobstack_finish(o); return 0; } static struct registry_operations registry_ops = { .open = queue_registry_open, .release = fsobj_obstack_release, .read = fsobj_obstack_read }; #else /* !CONFIG_XENO_REGISTRY */ static struct registry_operations registry_ops; #endif /* CONFIG_XENO_REGISTRY */ static void queue_finalize(struct syncobj *sobj) { struct alchemy_queue *qcb; qcb = container_of(sobj, struct alchemy_queue, sobj); registry_destroy_file(&qcb->fsobj); heapobj_destroy(&qcb->hobj); xnfree(qcb); } fnref_register(libalchemy, queue_finalize); /** * @fn int rt_queue_create(RT_QUEUE *q, const char *name, size_t poolsize, size_t qlimit, int mode) * @brief Create a message queue. * * Create a message queue object which allows multiple tasks to * exchange data through the use of variable-sized messages. A message * queue is created empty. * * @param q The address of a queue descriptor which can be later used * to identify uniquely the created object, upon success of this call. * * @param name An ASCII string standing for the symbolic name of the * queue. When non-NULL and non-empty, a copy of this string is used * for indexing the created queue into the object registry. * * @param poolsize The size (in bytes) of the message buffer pool to * be pre-allocated for holding messages. Message buffers will be * claimed and released to this pool. The buffer pool memory cannot * be extended. See note. * * @param qlimit This parameter allows to limit the maximum number of * messages which can be queued at any point in time, sending to a * full queue begets an error. The special value Q_UNLIMITED can be * passed to disable the limit check. * * @param mode The queue creation mode. The following flags can be * OR'ed into this bitmask, each of them affecting the new queue: * * - Q_FIFO makes tasks pend in FIFO order on the queue for consuming * messages. * * - Q_PRIO makes tasks pend in priority order on the queue. * * @return Zero is returned upon success. Otherwise: * * - -EINVAL is returned if @a mode is invalid or @a poolsize is zero. * * - -ENOMEM is returned if the system fails to get memory from the * main heap in order to create the queue. * * - -EEXIST is returned if the @a name is conflicting with an already * registered queue. * * - -EPERM is returned if this service was called from an invalid * context, e.g. interrupt or non-Xenomai thread. * * @apitags{xthread-only, mode-unrestricted, switch-secondary} * * @note Queues can be shared by multiple processes which belong to * the same Xenomai session. * * @note Each message pending into the queue consumes four long words * plus the actual payload size, aligned to the next long word * boundary. e.g. a 6 byte message on a 32 bit platform would require * 24 bytes of storage into the pool. * * When @a qlimit is given (i.e. different from Q_UNLIMITED), this * overhead is accounted for automatically, so that @a qlimit messages * of @a poolsize / @a qlimit bytes can be stored into the pool * concurrently. Otherwise, @a poolsize is increased by 5% internally * to cope with such overhead. */ int rt_queue_create(RT_QUEUE *queue, const char *name, size_t poolsize, size_t qlimit, int mode) { struct alchemy_queue *qcb; int sobj_flags = 0, ret; struct service svc; if (threadobj_irq_p()) return -EPERM; if (poolsize == 0 || (mode & ~Q_PRIO) != 0) return -EINVAL; CANCEL_DEFER(svc); ret = -ENOMEM; qcb = xnmalloc(sizeof(*qcb)); if (qcb == NULL) goto fail_cballoc; generate_name(qcb->name, name, &queue_namegen); /* * The message pool has to be part of the main heap for proper * sharing between processes. * * We have the message descriptor overhead to cope with when * allocating the buffer pool. When the queue limit is not * known, assume 5% overhead. */ if (qlimit == Q_UNLIMITED) ret = heapobj_init(&qcb->hobj, qcb->name, poolsize + (poolsize * 5 / 100)); else ret = heapobj_init_array(&qcb->hobj, qcb->name, (poolsize / qlimit) + sizeof(struct alchemy_queue_msg), qlimit); if (ret) goto fail_bufalloc; qcb->mode = mode; qcb->limit = qlimit; list_init(&qcb->mq); qcb->mcount = 0; if (mode & Q_PRIO) sobj_flags = SYNCOBJ_PRIO; ret = syncobj_init(&qcb->sobj, CLOCK_COPPERPLATE, sobj_flags, fnref_put(libalchemy, queue_finalize)); if (ret) goto fail_syncinit; qcb->magic = queue_magic; registry_init_file_obstack(&qcb->fsobj, ®istry_ops); ret = __bt(registry_add_file(&qcb->fsobj, O_RDONLY, "/alchemy/queues/%s", qcb->name)); if (ret) warning("failed to export queue %s to registry, %s", qcb->name, symerror(ret)); ret = syncluster_addobj(&alchemy_queue_table, qcb->name, &qcb->cobj); if (ret) goto fail_register; queue->handle = mainheap_ref(qcb, uintptr_t); CANCEL_RESTORE(svc); return 0; fail_register: registry_destroy_file(&qcb->fsobj); syncobj_uninit(&qcb->sobj); fail_syncinit: heapobj_destroy(&qcb->hobj); fail_bufalloc: xnfree(qcb); fail_cballoc: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_queue_delete(RT_QUEUE *q) * @brief Delete a message queue. * * This routine deletes a queue object previously created by a call to * rt_queue_create(). All resources attached to that queue are * automatically released, including all pending messages. * * @param q The queue descriptor. * * @return Zero is returned upon success. Otherwise: * * - -EINVAL is returned if @a q is not a valid queue descriptor. * * - -EPERM is returned if this service was called from an * asynchronous context. * * @apitags{mode-unrestricted, switch-secondary} */ int rt_queue_delete(RT_QUEUE *queue) { struct alchemy_queue *qcb; struct syncstate syns; struct service svc; int ret = 0; if (threadobj_irq_p()) return -EPERM; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; syncluster_delobj(&alchemy_queue_table, &qcb->cobj); qcb->magic = ~queue_magic; syncobj_destroy(&qcb->sobj, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn void *rt_queue_alloc(RT_QUEUE *q, size_t size) * @brief Allocate a message buffer. * * This service allocates a message buffer from the queue's internal * pool. This buffer can be filled in with payload information, prior * enqueuing it by a call to rt_queue_send(). When used in pair, * these services provide a zero-copy interface for sending messages. * * @param q The queue descriptor. * * @param size The requested size in bytes of the buffer. Zero is an * acceptable value, which means that the message conveys no payload; * in this case, the receiver will get a zero-sized message. * * @return The address of the allocated buffer upon success, or NULL * if the call fails. * * @apitags{unrestricted, switch-primary} */ void *rt_queue_alloc(RT_QUEUE *queue, size_t size) { struct alchemy_queue_msg *msg = NULL; struct alchemy_queue *qcb; struct syncstate syns; struct service svc; int ret; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; msg = heapobj_alloc(&qcb->hobj, size + sizeof(*msg)); if (msg == NULL) goto done; /* * XXX: no need to init the ->next holder, list_*pend() do not * require this, and this ends up being costly on low end. */ msg->size = size; /* Zero is allowed. */ msg->refcount = 1; ++msg; done: put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return msg; } /** * @fn int rt_queue_free(RT_QUEUE *q, void *buf) * @brief Free a message buffer. * * This service releases a message buffer to the queue's internal * pool. * * @param q The queue descriptor. * * @param buf The address of the message buffer to free. Even * zero-sized messages carrying no payload data must be freed, since * they are assigned a valid memory space to store internal * information. * * @return Zero is returned upon success, or -EINVAL if @a buf is not * a valid message buffer previously allocated by the rt_queue_alloc() * service, or the caller did not get ownership of the message through * a successful return from rt_queue_receive(). * * @apitags{unrestricted, switch-primary} */ int rt_queue_free(RT_QUEUE *queue, void *buf) { struct alchemy_queue_msg *msg; struct alchemy_queue *qcb; struct syncstate syns; struct service svc; int ret = 0; if (buf == NULL) return -EINVAL; msg = (struct alchemy_queue_msg *)buf - 1; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; if (heapobj_validate(&qcb->hobj, msg) == 0) { ret = -EINVAL; goto done; } /* * Check the reference count under lock, so that we properly * serialize with rt_queue_send() and rt_queue_receive() which * may update it. */ if (msg->refcount == 0) { /* Mm, double-free? */ ret = -EINVAL; goto done; } if (--msg->refcount == 0) heapobj_free(&qcb->hobj, msg); done: put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_queue_send(RT_QUEUE *q, const void *buf, size_t size, int mode) * @brief Send a message to a queue. * * This service sends a complete message to a given queue. The message * must have been allocated by a previous call to rt_queue_alloc(). * * @param q The queue descriptor. * * @param buf The address of the message buffer to be sent. The * message buffer must have been allocated using the rt_queue_alloc() * service. Once passed to rt_queue_send(), the memory pointed to by * @a buf is no more under the control of the sender and thus should * not be referenced by it anymore; deallocation of this memory must * be handled on the receiving side. * * @param size The actual size in bytes of the message, which may be * lower than the allocated size for the buffer obtained from * rt_queue_alloc(). Zero is a valid value, in which case an empty * message will be sent. * * @param mode A set of flags affecting the operation: * * - Q_URGENT causes the message to be prepended to the message queue, * ensuring a LIFO ordering. * * - Q_NORMAL causes the message to be appended to the message queue, * ensuring a FIFO ordering. * * - Q_BROADCAST causes the message to be sent to all tasks currently * waiting for messages. The message is not copied; a reference count * is maintained instead so that the message will remain valid until * the last receiver releases its own reference using rt_queue_free(), * after which the message space will be returned to the queue's * internal pool. * * @return Upon success, this service returns the number of receivers * which got awaken as a result of the operation. If zero is returned, * no task was waiting on the receiving side of the queue, and the * message has been enqueued. Upon error, one of the following error * codes is returned: * * - -EINVAL is returned if @a q is not a message queue descriptor, @a * mode is invalid, or @a buf is NULL. * * - -ENOMEM is returned if queuing the message would exceed the limit * defined for the queue at creation. * * @apitags{unrestricted, switch-primary} */ int rt_queue_send(RT_QUEUE *queue, const void *buf, size_t size, int mode) { struct alchemy_queue_wait *wait; struct alchemy_queue_msg *msg; struct alchemy_queue *qcb; struct threadobj *waiter; struct syncstate syns; struct service svc; int ret = 0; if (buf == NULL || (mode & ~(Q_URGENT|Q_BROADCAST)) != 0) return -EINVAL; msg = (struct alchemy_queue_msg *)buf - 1; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; if (qcb->limit && qcb->mcount >= qcb->limit) { ret = -ENOMEM; goto done; } if (msg->refcount == 0) { ret = -EINVAL; goto done; } msg->refcount--; msg->size = size; ret = 0; /* # of tasks unblocked. */ do { waiter = syncobj_grant_one(&qcb->sobj); if (waiter == NULL) break; wait = threadobj_get_wait(waiter); wait->msg = __moff(msg); msg->refcount++; ret++; } while (mode & Q_BROADCAST); if (ret) goto done; /* * We need to queue the message if no task was waiting for it, * except in broadcast mode, in which case we only fix up the * reference count. */ if (mode & Q_BROADCAST) msg->refcount++; else { qcb->mcount++; if (mode & Q_URGENT) list_prepend(&msg->next, &qcb->mq); else list_append(&msg->next, &qcb->mq); } done: put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_queue_write(RT_QUEUE *q, const void *buf, size_t size) * @brief Write data to a queue. * * This service builds a message out of a raw data buffer, then send * it to a given queue. * * @param q The queue descriptor. * * @param buf The address of the payload data to be written to the * queue. The payload is copied to a message buffer allocated * internally by this service. * * @param size The size in bytes of the payload data. Zero is a valid * value, in which case an empty message is queued. * * @param mode A set of flags affecting the operation: * * - Q_URGENT causes the message to be prepended to the message queue, * ensuring a LIFO ordering. * * - Q_NORMAL causes the message to be appended to the message queue, * ensuring a FIFO ordering. * * - Q_BROADCAST causes the message to be sent to all tasks currently * waiting for messages. The message is not copied multiple times; a * reference count is maintained instead so that the message will * remain valid until the last receiver releases its own reference * using rt_queue_free(), after which the message space will be * returned to the queue's internal pool. * * @return Upon success, this service returns the number of receivers * which got awaken as a result of the operation. If zero is returned, * no task was waiting on the receiving side of the queue, and the * message has been enqueued. Upon error, one of the following error * codes is returned: * * - -EINVAL is returned if @a mode is invalid, @a buf is NULL with a * non-zero @a size, or @a q is not a essage queue descriptor. * * - -ENOMEM is returned if queuing the message would exceed the limit * defined for the queue at creation, or if no memory can be obtained * to convey the message data internally. * * @apitags{unrestricted, switch-primary} */ int rt_queue_write(RT_QUEUE *queue, const void *buf, size_t size, int mode) { struct alchemy_queue_wait *wait; struct alchemy_queue_msg *msg; struct alchemy_queue *qcb; struct threadobj *waiter; struct syncstate syns; int ret = 0, nwaiters; struct service svc; size_t bufsz; if (mode & ~(Q_URGENT|Q_BROADCAST)) return -EINVAL; if (buf == NULL && size > 0) return -EINVAL; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; if (mode & Q_BROADCAST) /* No buffer-to-buffer copy in broadcast mode. */ goto enqueue; waiter = syncobj_peek_grant(&qcb->sobj); if (waiter && threadobj_local_p(waiter)) { /* * Fast path for local threads already waiting for * data via rt_queue_read(): do direct copy to the * reader's buffer. */ wait = threadobj_get_wait(waiter); bufsz = wait->local_bufsz; if (bufsz == 0) /* no buffer provided, enqueue normally. */ goto enqueue; if (size > bufsz) size = bufsz; if (size > 0) memcpy(wait->local_buf, buf, size); wait->local_bufsz = size; syncobj_grant_to(&qcb->sobj, waiter); ret = 1; goto done; } enqueue: nwaiters = syncobj_count_grant(&qcb->sobj); if (nwaiters == 0 && (mode & Q_BROADCAST) != 0) goto done; ret = -ENOMEM; if (qcb->limit && qcb->mcount >= qcb->limit) goto done; msg = heapobj_alloc(&qcb->hobj, size + sizeof(*msg)); if (msg == NULL) goto done; msg->size = size; msg->refcount = 0; if (size > 0) memcpy(msg + 1, buf, size); ret = 0; /* # of tasks unblocked. */ if (nwaiters == 0) { qcb->mcount++; if (mode & Q_URGENT) list_prepend(&msg->next, &qcb->mq); else list_append(&msg->next, &qcb->mq); goto done; } do { waiter = syncobj_grant_one(&qcb->sobj); if (waiter == NULL) break; wait = threadobj_get_wait(waiter); wait->msg = __moff(msg); msg->refcount++; ret++; } while (mode & Q_BROADCAST); done: put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn ssize_t rt_queue_receive(RT_QUEUE *q, void **bufp, RTIME timeout) * @brief Receive from a queue (with relative scalar timeout). * * This routine is a variant of rt_queue_receive_timed() accepting a * relative timeout specification expressed as a scalar value. * * @param q The queue descriptor. * * @param bufp A pointer to a memory location which will be written * with the address of the received message. * * @param timeout A delay expressed in clock ticks. Passing * TM_INFINITE causes the caller to block indefinitely until * a message is available. Passing TM_NONBLOCK causes the service * to return immediately without blocking in case no message is * available. * * @apitags{xthread-nowait, switch-primary} */ /** * @fn ssize_t rt_queue_receive_until(RT_QUEUE *q, void **bufp, RTIME abs_timeout) * @brief Receive from a queue (with absolute scalar timeout). * * This routine is a variant of rt_queue_receive_timed() accepting an * absolute timeout specification expressed as a scalar value. * * @param q The queue descriptor. * * @param bufp A pointer to a memory location which will be written * with the address of the received message. * * @param abs_timeout An absolute date expressed in clock ticks. * Passing TM_INFINITE causes the caller to block indefinitely until * a message is available. Passing TM_NONBLOCK causes the service * to return immediately without blocking in case no message is * available. * * @apitags{xthread-nowait, switch-primary} */ /** * @fn ssize_t rt_queue_receive_timed(RT_QUEUE *q, void **bufp, const struct timespec *abs_timeout) * @brief Receive a message from a queue (with absolute timeout date). * * This service receives the next available message from a given * queue. * * @param q The queue descriptor. * * @param bufp A pointer to a memory location which will be written * with the address of the received message, upon success. Once * consumed, the message space should be freed using rt_queue_free(). * * @param abs_timeout An absolute date expressed in seconds / nanoseconds, * based on the Alchemy clock, specifying a time limit to wait for a * message to be available from the queue. Passing NULL causes the caller * to block indefinitely until a message is available. Passing * { .tv_sec = 0, .tv_nsec = 0 } causes the service to return immediately * without blocking in case no message is available. * * @return The number of bytes available from the received message is * returned upon success. Zero is a possible value corresponding to a * zero-sized message passed to rt_queue_send() or * rt_queue_write(). Otherwise: * * - -ETIMEDOUT is returned if @a abs_timeout is reached before a * message arrives. * * - -EWOULDBLOCK is returned if @a abs_timeout is { .tv_sec = 0, * .tv_nsec = 0 } and no message is immediately available on entry to * the call. * - -EINTR is returned if rt_task_unblock() was called for the * current task before a message was available. * * - -EINVAL is returned if @a q is not a valid queue descriptor. * * - -EIDRM is returned if @a q is deleted while the caller was * waiting for a message. In such event, @a q is no more valid upon * return of this service. * * - -EPERM is returned if this service should block, but was not * called from a Xenomai thread. * * @apitags{xthread-nowait, switch-primary} */ ssize_t rt_queue_receive_timed(RT_QUEUE *queue, void **bufp, const struct timespec *abs_timeout) { struct alchemy_queue_wait *wait; struct alchemy_queue_msg *msg; struct alchemy_queue *qcb; struct syncstate syns; struct service svc; ssize_t ret; int err = 0; if (!threadobj_current_p() && !alchemy_poll_mode(abs_timeout)) return -EPERM; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &err); if (qcb == NULL) { ret = err; goto out; } if (list_empty(&qcb->mq)) goto wait; msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next); msg->refcount++; *bufp = msg + 1; ret = (ssize_t)msg->size; qcb->mcount--; goto done; wait: if (alchemy_poll_mode(abs_timeout)) { ret = -EWOULDBLOCK; goto done; } wait = threadobj_prepare_wait(struct alchemy_queue_wait); wait->local_bufsz = 0; ret = syncobj_wait_grant(&qcb->sobj, abs_timeout, &syns); if (ret) { if (ret == -EIDRM) { threadobj_finish_wait(); goto out; } } else { msg = __mptr(wait->msg); *bufp = msg + 1; ret = (ssize_t)msg->size; } threadobj_finish_wait(); done: put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn ssize_t rt_queue_read(RT_QUEUE *q, void *buf, size_t size, RTIME timeout) * @brief Read from a queue (with relative scalar timeout). * * This routine is a variant of rt_queue_read_timed() accepting a * relative timeout specification expressed as a scalar value. * * @param q The queue descriptor. * * @param buf A pointer to a memory area which will be written upon * success with the received message payload. * * @param size The length in bytes of the memory area pointed to by @a * buf. * * @param timeout A delay expressed in clock ticks. Passing * TM_INFINITE causes the caller to block indefinitely until * a message is available. Passing TM_NONBLOCK causes the service * to return immediately without blocking in case no message is * available. * * @apitags{xthread-nowait, switch-primary} */ /** * @fn ssize_t rt_queue_read_until(RT_QUEUE *q, void *buf, size_t size, RTIME abs_timeout) * @brief Read from a queue (with absolute scalar timeout). * * This routine is a variant of rt_queue_read_timed() accepting an * absolute timeout specification expressed as a scalar value. * * @param q The queue descriptor. * * @param buf A pointer to a memory area which will be written upon * success with the received message payload. * * @param size The length in bytes of the memory area pointed to by @a * buf. * * @param abs_timeout An absolute date expressed in clock ticks. * Passing TM_INFINITE causes the caller to block indefinitely until * a message is available. Passing TM_NONBLOCK causes the service * to return immediately without blocking in case no message is * available. * * @apitags{xthread-nowait, switch-primary} */ /** * @fn ssize_t rt_queue_read_timed(RT_QUEUE *q, void *buf, size_t size, const struct timespec *abs_timeout) * @brief Read from a queue. * * This service reads the next available message from a given * queue. * * @param q The queue descriptor. * * @param buf A pointer to a memory area which will be written upon * success with the received message payload. The internal message * buffer conveying the data is automatically freed by this call. If * --enable-pshared is enabled in the configuration, @a buf must have * been obtained from the Xenomai memory allocator via xnmalloc() or * any service based on it, such as rt_heap_alloc(). * * @param size The length in bytes of the memory area pointed to by @a * buf. Messages larger than @a size are truncated appropriately. * * @param abs_timeout An absolute date expressed in seconds / nanoseconds, * based on the Alchemy clock, specifying a time limit to wait for a * message to be available from the queue. Passing NULL causes the * caller to block indefinitely until a message is available. Passing * { .tv_sec = 0, .tv_nsec = 0 } causes the service to return immediately * without blocking in case no message is available. * * @return The number of bytes copied to @a buf is returned upon * success. Zero is a possible value corresponding to a zero-sized * message passed to rt_queue_send() or rt_queue_write(). Otherwise: * * - -ETIMEDOUT is returned if @a abs_timeout is reached before a * message arrives. * * - -EWOULDBLOCK is returned if @a abs_timeout is { .tv_sec = 0, * .tv_nsec = 0 } and no message is immediately available on entry to * the call. * - -EINTR is returned if rt_task_unblock() was called for the * current task before a message was available. * * - -EINVAL is returned if @a q is not a valid queue descriptor. * * - -EIDRM is returned if @a q is deleted while the caller was * waiting for a message. In such event, @a q is no more valid upon * return of this service. * * - -EPERM is returned if this service should block, but was not * called from a Xenomai thread. * * @apitags{xthread-nowait, switch-primary} */ ssize_t rt_queue_read_timed(RT_QUEUE *queue, void *buf, size_t size, const struct timespec *abs_timeout) { struct alchemy_queue_wait *wait; struct alchemy_queue_msg *msg; struct alchemy_queue *qcb; struct syncstate syns; struct service svc; ssize_t ret; int err = 0; if (!threadobj_current_p() && !alchemy_poll_mode(abs_timeout)) return -EPERM; if (size == 0) return 0; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &err); if (qcb == NULL) { ret = err; goto out; } if (list_empty(&qcb->mq)) goto wait; msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next); qcb->mcount--; goto transfer; wait: if (alchemy_poll_mode(abs_timeout)) { ret = -EWOULDBLOCK; goto done; } wait = threadobj_prepare_wait(struct alchemy_queue_wait); wait->local_buf = buf; wait->local_bufsz = size; wait->msg = __moff_nullable(NULL); ret = syncobj_wait_grant(&qcb->sobj, abs_timeout, &syns); if (ret) { if (ret == -EIDRM) { threadobj_finish_wait(); goto out; } } else if (__mptr_nullable(wait->msg)) { msg = __mptr(wait->msg); transfer: ret = (ssize_t)(msg->size > size ? size : msg->size); if (ret > 0) memcpy(buf, msg + 1, ret); heapobj_free(&qcb->hobj, msg); } else /* A direct copy took place. */ ret = (ssize_t)wait->local_bufsz; threadobj_finish_wait(); done: put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_queue_flush(RT_QUEUE *q) * @brief Flush pending messages from a queue. * * This routine flushes all messages currently pending in a queue, * releasing all message buffers appropriately. * * @param q The queue descriptor. * * @return Zero is returned upon success. Otherwise: * * - -EINVAL is returned if @a q is not a valid queue descriptor. * * @apitags{unrestricted, switch-primary} */ int rt_queue_flush(RT_QUEUE *queue) { struct alchemy_queue_msg *msg, *tmp; struct alchemy_queue *qcb; struct syncstate syns; struct service svc; int ret = 0; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; ret = qcb->mcount; qcb->mcount = 0; /* * Flushing a message queue is not an operation we should see * in any fast path within an application, so locking out * other threads from using that queue while we flush it is * acceptable. */ if (!list_empty(&qcb->mq)) { list_for_each_entry_safe(msg, tmp, &qcb->mq, next) { list_remove(&msg->next); heapobj_free(&qcb->hobj, msg); } } put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_queue_inquire(RT_QUEUE *q, RT_QUEUE_INFO *info) * @brief Query queue status. * * This routine returns the status information about the specified * queue. * * @param q The queue descriptor. * * @param info A pointer to the @ref RT_QUEUE_INFO "return * buffer" to copy the information to. * * @return Zero is returned and status information is written to the * structure pointed at by @a info upon success. Otherwise: * * - -EINVAL is returned if @a q is not a valid queue descriptor. * * @apitags{unrestricted, switch-primary} */ int rt_queue_inquire(RT_QUEUE *queue, RT_QUEUE_INFO *info) { struct alchemy_queue *qcb; struct syncstate syns; struct service svc; int ret = 0; CANCEL_DEFER(svc); qcb = get_alchemy_queue(queue, &syns, &ret); if (qcb == NULL) goto out; info->nwaiters = syncobj_count_grant(&qcb->sobj); info->nmessages = qcb->mcount; info->mode = qcb->mode; info->qlimit = qcb->limit; info->poolsize = heapobj_size(&qcb->hobj); info->usedmem = heapobj_inquire(&qcb->hobj); strcpy(info->name, qcb->name); put_alchemy_queue(qcb, &syns); out: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_queue_bind(RT_QUEUE *q, const char *name, RTIME timeout) * @brief Bind to a message queue. * * This routine creates a new descriptor to refer to an existing * message queue identified by its symbolic name. If the object does * not exist on entry, the caller may block until a queue of the * given name is created. * * @param q The address of a queue descriptor filled in by the * operation. Contents of this memory is undefined upon failure. * * @param name A valid NULL-terminated name which identifies the * queue to bind to. This string should match the object name * argument passed to rt_queue_create(). * * @param timeout The number of clock ticks to wait for the * registration to occur (see note). Passing TM_INFINITE causes the * caller to block indefinitely until the object is * registered. Passing TM_NONBLOCK causes the service to return * immediately without waiting if the object is not registered on * entry. * * @return Zero is returned upon success. Otherwise: * * - -EINTR is returned if rt_task_unblock() was called for the * current task before the retrieval has completed. * * - -EWOULDBLOCK is returned if @a timeout is equal to TM_NONBLOCK * and the searched object is not registered on entry. * * - -ETIMEDOUT is returned if the object cannot be retrieved within * the specified amount of time. * * - -EPERM is returned if this service should block, but was not * called from a Xenomai thread. * * @apitags{xthread-nowait, switch-primary} * * @note The @a timeout value is interpreted as a multiple of the * Alchemy clock resolution (see --alchemy-clock-resolution option, * defaults to 1 nanosecond). */ int rt_queue_bind(RT_QUEUE *queue, const char *name, RTIME timeout) { return alchemy_bind_object(name, &alchemy_queue_table, timeout, offsetof(struct alchemy_queue, cobj), &queue->handle); } /** * @fn int rt_queue_unbind(RT_QUEUE *q) * @brief Unbind from a message queue. * * @param q The queue descriptor. * * This routine releases a previous binding to a message queue. After * this call has returned, the descriptor is no more valid for * referencing this object. * * @apitags{thread-unrestricted} */ int rt_queue_unbind(RT_QUEUE *queue) { queue->handle = 0; return 0; } /** @} */