/* * Copyright (C) 2011 Philippe Gerum . * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include #include #include #include #include "rtdm/ipc.h" #include "copperplate/threadobj.h" #include "copperplate/heapobj.h" #include "copperplate/cluster.h" #include "reference.h" #include "internal.h" #include "pipe.h" #include "timer.h" /** * @ingroup alchemy * @defgroup alchemy_pipe Message pipe services * * Two-way communication channel between Xenomai & Linux domains * * A message pipe is a two-way communication channel between Xenomai * threads and normal Linux threads using regular file I/O operations * on a pseudo-device. Pipes can be operated in a message-oriented * fashion so that message boundaries are preserved, and also in * byte-oriented streaming mode from real-time to normal Linux * threads for optimal throughput. * * Xenomai threads open their side of the pipe using the * rt_pipe_create() service; regular Linux threads do the same by * opening one of the /dev/rtpN special devices, where N is the minor * number agreed upon between both ends of each pipe. * * In addition, named pipes are available through the registry * support, which automatically creates a symbolic link from entries * under /proc/xenomai/registry/rtipc/xddp/ to the corresponding * special device file. * * @note Alchemy's message pipes are fully based on the @ref * RTIPC_PROTO "XDDP protocol" available from the RTDM/ipc driver. * * @{ */ struct syncluster alchemy_pipe_table; static DEFINE_NAME_GENERATOR(pipe_namegen, "pipe", struct alchemy_pipe, name); DEFINE_LOOKUP_PRIVATE(pipe, RT_PIPE); /** * @fn int rt_pipe_create(RT_PIPE *pipe, const char *name, int minor, size_t poolsize) * @brief Create a message pipe. * * This service opens a bi-directional communication channel for * exchanging messages between Xenomai threads and regular Linux * threads. Pipes natively preserve message boundaries, but can also * be used in byte-oriented streaming mode from Xenomai to Linux. * * rt_pipe_create() always returns immediately, even if no thread has * opened the associated special device file yet. On the contrary, the * non real-time side could block upon attempt to open the special * device file until rt_pipe_create() is issued on the same pipe from * a Xenomai thread, unless O_NONBLOCK was given to the open(2) system * call. * * @param pipe The address of a pipe descriptor which can be later used * to identify uniquely the created object, upon success of this call. * * @param name An ASCII string standing for the symbolic name of the * pipe. When non-NULL and non-empty, a copy of this string is used * for indexing the created pipe into the object registry. * * Named pipes are supported through the use of the registry. Passing * a valid @a name parameter when creating a message pipe causes a * symbolic link to be created from * /proc/xenomai/registry/rtipc/xddp/@a name to the associated special * device (i.e. /dev/rtp*), so that the specific @a minor information * does not need to be known from those processes for opening the * proper device file. In such a case, both sides of the pipe only * need to agree upon a symbolic name to refer to the same data path, * which is especially useful whenever the @a minor number is picked * up dynamically using an adaptive algorithm, such as passing * P_MINOR_AUTO as @a minor value. * * @param minor The minor number of the device associated with the * pipe. Passing P_MINOR_AUTO causes the minor number to be * auto-allocated. In such a case, a symbolic link will be * automatically created from * /proc/xenomai/registry/rtipc/xddp/@a name to the allocated pipe * device entry. Valid minor numbers range from 0 to * CONFIG_XENO_OPT_PIPE_NRDEV-1. * * @param poolsize Specifies the size of a dedicated buffer pool for the * pipe. Passing 0 means that all message allocations for this pipe are * performed on the Cobalt core heap. * * @return The @a minor number assigned to the connection is returned * upon success. Otherwise: * * - -ENOMEM is returned if the system fails to get memory from the * main heap in order to create the pipe. * * - -ENODEV is returned if @a minor is different from P_MINOR_AUTO * and is not a valid minor number. * * - -EEXIST is returned if the @a name is conflicting with an already * registered pipe. * * - -EBUSY is returned if @a minor is already open. * * - -EPERM is returned if this service was called from an invalid * context, e.g. interrupt or non-Xenomai thread. * * @apitags{xthread-only, mode-unrestricted, switch-secondary} */ #ifndef DOXYGEN_CPP CURRENT_IMPL(int, rt_pipe_create, (RT_PIPE *pipe, const char *name, int minor, size_t poolsize)) #else int rt_pipe_create(RT_PIPE *pipe, const char *name, int minor, size_t poolsize) #endif { struct rtipc_port_label plabel; struct sockaddr_ipc saddr; struct alchemy_pipe *pcb; struct service svc; size_t streambufsz; socklen_t addrlen; int ret, sock; if (threadobj_irq_p()) return -EPERM; CANCEL_DEFER(svc); pcb = xnmalloc(sizeof(*pcb)); if (pcb == NULL) { ret = -ENOMEM; goto out; } sock = __RT(socket(AF_RTIPC, SOCK_DGRAM, IPCPROTO_XDDP)); if (sock < 0) { warning("RTIPC/XDDP protocol not supported by kernel"); ret = -errno; xnfree(pcb); goto out; } if (name && *name) { namecpy(plabel.label, name); ret = __RT(setsockopt(sock, SOL_XDDP, XDDP_LABEL, &plabel, sizeof(plabel))); if (ret) goto fail_sockopt; } if (poolsize > 0) { ret = __RT(setsockopt(sock, SOL_XDDP, XDDP_POOLSZ, &poolsize, sizeof(poolsize))); if (ret) goto fail_sockopt; } streambufsz = ALCHEMY_PIPE_STREAMSZ; ret = __RT(setsockopt(sock, SOL_XDDP, XDDP_BUFSZ, &streambufsz, sizeof(streambufsz))); if (ret) goto fail_sockopt; memset(&saddr, 0, sizeof(saddr)); saddr.sipc_family = AF_RTIPC; saddr.sipc_port = minor; ret = __RT(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr))); if (ret) goto fail_sockopt; if (minor == P_MINOR_AUTO) { /* Fetch the assigned minor device. */ addrlen = sizeof(saddr); ret = __RT(getsockname(sock, (struct sockaddr *)&saddr, &addrlen)); if (ret) goto fail_sockopt; if (addrlen != sizeof(saddr)) { ret = -EINVAL; goto fail_register; } minor = saddr.sipc_port; } generate_name(pcb->name, name, &pipe_namegen); pcb->sock = sock; pcb->minor = minor; pcb->magic = pipe_magic; ret = syncluster_addobj(&alchemy_pipe_table, pcb->name, &pcb->cobj); if (ret) goto fail_register; pipe->handle = mainheap_ref(pcb, uintptr_t); CANCEL_RESTORE(svc); return minor; fail_sockopt: ret = -errno; if (ret == -EADDRINUSE) ret = -EBUSY; fail_register: __RT(close(sock)); xnfree(pcb); out: CANCEL_RESTORE(svc); return ret; } /** * @fn int rt_pipe_delete(RT_PIPE *pipe) * @brief Delete a message pipe. * * This routine deletes a pipe object previously created by a call to * rt_pipe_create(). All resources attached to that pipe are * automatically released, all pending data is flushed. * * @param pipe The pipe descriptor. * * @return Zero is returned upon success. Otherwise: * * - -EINVAL is returned if @a pipe is not a valid pipe descriptor. * * - -EIDRM is returned if @a pipe is a closed pipe descriptor. * * - -EPERM is returned if this service was called from an * asynchronous context. * * @apitags{mode-unrestricted, switch-secondary} */ int rt_pipe_delete(RT_PIPE *pipe) { struct alchemy_pipe *pcb; struct service svc; int ret = 0; if (threadobj_irq_p()) return -EPERM; CANCEL_DEFER(svc); pcb = find_alchemy_pipe(pipe, &ret); if (pcb == NULL) goto out; ret = __RT(close(pcb->sock)); if (ret) { ret = -errno; if (ret == -EBADF) ret = -EIDRM; goto out; } syncluster_delobj(&alchemy_pipe_table, &pcb->cobj); pcb->magic = ~pipe_magic; out: CANCEL_RESTORE(svc); return ret; } /** * @fn ssize_t rt_pipe_read(RT_PIPE *pipe, void *buf, size_t size, RTIME timeout) * @brief Read from a pipe (with relative scalar timeout). * * This routine is a variant of rt_queue_read_timed() accepting a * relative timeout specification expressed as a scalar value. * * @param pipe The pipe descriptor. * * @param buf A pointer to a memory area which will be written upon * success with the message received. * * @param size The count of bytes from the received message to read up * into @a buf. If @a size is lower than the actual message size, * -ENOBUFS is returned since the incompletely received message would * be lost. If @a size is zero, this call returns immediately with no * other action. * * @param timeout A delay expressed in clock ticks. Passing * TM_INFINITE causes the caller to block indefinitely until * a message is available. Passing TM_NONBLOCK causes the service * to return immediately without blocking in case no message is * available. * * @apitags{xthread-nowait, switch-primary} */ ssize_t rt_pipe_read(RT_PIPE *pipe, void *buf, size_t size, RTIME timeout) { struct alchemy_pipe *pcb; int err = 0, flags; struct timespec ts; struct timeval tv; ssize_t ret; pcb = find_alchemy_pipe(pipe, &err); if (pcb == NULL) return err; if (timeout == TM_NONBLOCK) flags = MSG_DONTWAIT; else { if (!threadobj_current_p()) return -EPERM; if (timeout != TM_INFINITE) { clockobj_ticks_to_timespec(&alchemy_clock, timeout, &ts); tv.tv_sec = ts.tv_sec; tv.tv_usec = ts.tv_nsec / 1000; } else { tv.tv_sec = 0; tv.tv_usec = 0; } __RT(setsockopt(pcb->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))); flags = 0; } ret = __RT(recvfrom(pcb->sock, buf, size, flags, NULL, 0)); if (ret < 0) ret = -errno; return ret; } /** * @fn ssize_t rt_pipe_read_until(RT_PIPE *pipe, void *buf, size_t size, RTIME abs_timeout) * @brief Read from a pipe (with absolute scalar timeout). * * This routine is a variant of rt_queue_read_timed() accepting an * absolute timeout specification expressed as a scalar value. * * @param pipe The pipe descriptor. * * @param buf A pointer to a memory area which will be written upon * success with the message received. * * @param size The count of bytes from the received message to read up * into @a buf. If @a size is lower than the actual message size, * -ENOBUFS is returned since the incompletely received message would * be lost. If @a size is zero, this call returns immediately with no * other action. * * @param abs_timeout An absolute date expressed in clock ticks. * Passing TM_INFINITE causes the caller to block indefinitely until * a message is available. Passing TM_NONBLOCK causes the service * to return immediately without blocking in case no message is * available. * * @apitags{xthread-nowait, switch-primary} */ /** * @fn ssize_t rt_pipe_read_timed(RT_PIPE *pipe, void *buf, size_t size, const struct timespec *abs_timeout) * @brief Read a message from a pipe. * * This service reads the next available message from a given pipe. * * @param pipe The pipe descriptor. * * @param buf A pointer to a memory area which will be written upon * success with the message received. * * @param size The count of bytes from the received message to read up * into @a buf. If @a size is lower than the actual message size, * -ENOBUFS is returned since the incompletely received message would * be lost. If @a size is zero, this call returns immediately with no * other action. * * @param abs_timeout An absolute date expressed in seconds / nanoseconds, * based on the Alchemy clock, specifying a time limit to wait for a * message to be available from the pipe. Passing NULL causes the caller * to block indefinitely until a message is available. Passing * { .tv_sec = 0, .tv_nsec = 0 } causes the service to return immediately * without blocking in case no message is available. * * @return The number of bytes available from the received message is * returned upon success. Otherwise: * * - -ETIMEDOUT is returned if @a abs_timeout is reached before a * message arrives. * * - -EWOULDBLOCK is returned if @a abs_timeout is { .tv_sec = 0, * .tv_nsec = 0 } and no message is immediately available on entry to * the call. * * - -EINTR is returned if rt_task_unblock() was called for the * current task before a message was available. * * - -EINVAL is returned if @a pipe is not a valid pipe descriptor. * * - -EIDRM is returned if @a pipe is deleted while the caller was * waiting for a message. In such event, @a pipe is no more valid upon * return of this service. * * - -EPERM is returned if this service should block, but was not * called from a Xenomai thread. * * @apitags{xthread-nowait, switch-primary} */ ssize_t rt_pipe_read_timed(RT_PIPE *pipe, void *buf, size_t size, const struct timespec *abs_timeout) { struct timespec now, timeout; struct alchemy_pipe *pcb; int err = 0, flags; struct timeval tv; ssize_t ret; pcb = find_alchemy_pipe(pipe, &err); if (pcb == NULL) return err; if (alchemy_poll_mode(abs_timeout)) flags = MSG_DONTWAIT; else { if (!threadobj_current_p()) return -EPERM; if (abs_timeout) { __RT(clock_gettime(CLOCK_COPPERPLATE, &now)); timespec_sub(&timeout, abs_timeout, &now); tv.tv_sec = timeout.tv_sec; tv.tv_usec = timeout.tv_nsec / 1000; } else { tv.tv_sec = 0; tv.tv_usec = 0; } __RT(setsockopt(pcb->sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))); flags = 0; } ret = __RT(recvfrom(pcb->sock, buf, size, flags, NULL, 0)); if (ret < 0) ret = -errno; return ret; } static ssize_t do_write_pipe(RT_PIPE *pipe, const void *buf, size_t size, int flags) { struct alchemy_pipe *pcb; struct service svc; ssize_t ret; int err = 0; CANCEL_DEFER(svc); pcb = find_alchemy_pipe(pipe, &err); if (pcb == NULL) { ret = err; goto out; } ret = __RT(sendto(pcb->sock, buf, size, flags, NULL, 0)); if (ret < 0) { ret = -errno; if (ret == -EBADF) ret = -EIDRM; } out: CANCEL_RESTORE(svc); return ret; } /** * @fn ssize_t rt_pipe_write(RT_PIPE *pipe,const void *buf,size_t size,int mode) * @brief Write a message to a pipe. * * This service writes a complete message to be received from the * associated special device. rt_pipe_write() always preserves message * boundaries, which means that all data sent through a single call of * this service will be gathered in a single read(2) operation from * the special device. * * This service differs from rt_pipe_send() in that it accepts a * pointer to the raw data to be sent, instead of a canned message * buffer. * * @param pipe The pipe descriptor. * * @param buf The address of the first data byte to send. The * data will be copied to an internal buffer before transmission. * * @param size The size in bytes of the message (payload data * only). Zero is a valid value, in which case the service returns * immediately without sending any message. * * @param mode A set of flags affecting the operation: * * - P_URGENT causes the message to be prepended to the output * queue, ensuring a LIFO ordering. * * - P_NORMAL causes the message to be appended to the output * queue, ensuring a FIFO ordering. * * @return Upon success, this service returns @a size. Upon error, one * of the following error codes is returned: * * - -EINVAL is returned if @a mode is invalid or @a pipe is not a * pipe descriptor. * * - -ENOMEM is returned if not enough buffer space is available to * complete the operation. * * - -EIDRM is returned if @a pipe is a closed pipe descriptor. * * @note Writing data to a pipe before any peer has opened the * associated special device is allowed. The output will be buffered * until then, only restricted by the available memory in the * associated buffer pool (see rt_pipe_create()). * * @apitags{xcontext, switch-primary} */ ssize_t rt_pipe_write(RT_PIPE *pipe, const void *buf, size_t size, int mode) { int flags = 0; if (mode & ~P_URGENT) return -EINVAL; if (mode & P_URGENT) flags |= MSG_OOB; return do_write_pipe(pipe, buf, size, flags); } /** * @brief Stream bytes through a pipe. * * This service writes a sequence of bytes to be received from the * associated special device. Unlike rt_pipe_send(), this service does * not preserve message boundaries. Instead, an internal buffer is * filled on the fly with the data, which will be consumed as soon as * the receiver wakes up. * * Data buffers sent by the rt_pipe_stream() service are always * transmitted in FIFO order (i.e. P_NORMAL mode). * * @param pipe The pipe descriptor. * * @param buf The address of the first data byte to send. The * data will be copied to an internal buffer before transmission. * * @param size The size in bytes of the buffer. Zero is a valid value, * in which case the service returns immediately without sending any * data. * * @return The number of bytes sent upon success; this value may be * lower than @a size, depending on the available space in the * internal buffer. Otherwise: * * - -EINVAL is returned if @a mode is invalid or @a pipe is not a * pipe descriptor. * * - -ENOMEM is returned if not enough buffer space is available to * complete the operation. * * - -EIDRM is returned if @a pipe is a closed pipe descriptor. * * @note Writing data to a pipe before any peer has opened the * associated special device is allowed. The output will be buffered * until then, only restricted by the available memory in the * associated buffer pool (see rt_pipe_create()). * * @apitags{xcontext, switch-primary} */ ssize_t rt_pipe_stream(RT_PIPE *pipe, const void *buf, size_t size) { return do_write_pipe(pipe, buf, size, MSG_MORE); } /** * @fn int rt_pipe_bind(RT_PIPE *pipe, const char *name, RTIME timeout) * @brief Bind to a message pipe. * * This routine creates a new descriptor to refer to an existing * message pipe identified by its symbolic name. If the object does * not exist on entry, the caller may block until a pipe of the given * name is created. * * @param pipe The address of a pipe descriptor filled in by the * operation. Contents of this memory is undefined upon failure. * * @param name A valid NULL-terminated name which identifies the * pipe to bind to. This string should match the object name * argument passed to rt_pipe_create(). * * @param timeout The number of clock ticks to wait for the * registration to occur (see note). Passing TM_INFINITE causes the * caller to block indefinitely until the object is * registered. Passing TM_NONBLOCK causes the service to return * immediately without waiting if the object is not registered on * entry. * * @return Zero is returned upon success. Otherwise: * * - -EINTR is returned if rt_task_unblock() was called for the * current task before the retrieval has completed. * * - -EWOULDBLOCK is returned if @a timeout is equal to TM_NONBLOCK * and the searched object is not registered on entry. * * - -ETIMEDOUT is returned if the object cannot be retrieved within * the specified amount of time. * * - -EPERM is returned if this service should block, but was not * called from a Xenomai thread. * * @apitags{xthread-nowait} * * @note The @a timeout value is interpreted as a multiple of the * Alchemy clock resolution (see --alchemy-clock-resolution option, * defaults to 1 nanosecond). */ int rt_pipe_bind(RT_PIPE *pipe, const char *name, RTIME timeout) { return alchemy_bind_object(name, &alchemy_pipe_table, timeout, offsetof(struct alchemy_pipe, cobj), &pipe->handle); } /** * @fn int rt_pipe_unbind(RT_PIPE *pipe) * @brief Unbind from a message pipe. * * @param pipe The pipe descriptor. * * This routine releases a previous binding to a message pipe. After * this call has returned, the descriptor is no more valid for * referencing this object. * * @apitags{thread-unrestricted} */ int rt_pipe_unbind(RT_PIPE *pipe) { pipe->handle = 0; return 0; } /** @} */