From 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5 Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Tue, 22 Oct 2024 10:36:11 +0000 Subject: [PATCH] 修改4g拨号为QMI,需要在系统里后台执行quectel-CM --- kernel/net/sunrpc/xprtsock.c | 1697 ++++++++++++++++++++++++++------------------------------- 1 files changed, 777 insertions(+), 920 deletions(-) diff --git a/kernel/net/sunrpc/xprtsock.c b/kernel/net/sunrpc/xprtsock.c index 798fbd8..ae5b538 100644 --- a/kernel/net/sunrpc/xprtsock.c +++ b/kernel/net/sunrpc/xprtsock.c @@ -47,12 +47,15 @@ #include <net/checksum.h> #include <net/udp.h> #include <net/tcp.h> +#include <linux/bvec.h> +#include <linux/highmem.h> +#include <linux/uio.h> +#include <linux/sched/mm.h> #include <trace/events/sunrpc.h> +#include "socklib.h" #include "sunrpc.h" - -#define RPC_TCP_READ_CHUNK_SZ (3*512*1024) static void xs_close(struct rpc_xprt *xprt); static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, @@ -67,8 +70,6 @@ static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; - -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) #define XS_TCP_LINGER_TO (15U * HZ) static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; @@ -158,8 +159,6 @@ }, { }, }; - -#endif /* * Wait duration for a reply from the RPC portmapper. @@ -325,160 +324,452 @@ } } +static size_t +xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp) +{ + size_t i,n; + + if (!want || !(buf->flags & XDRBUF_SPARSE_PAGES)) + return want; + n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT; + for (i = 0; i < n; i++) { + if (buf->pages[i]) + continue; + buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp); + if (!buf->pages[i]) { + i *= PAGE_SIZE; + return i > buf->page_base ? i - buf->page_base : 0; + } + } + return want; +} + +static ssize_t +xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek) +{ + ssize_t ret; + if (seek != 0) + iov_iter_advance(&msg->msg_iter, seek); + ret = sock_recvmsg(sock, msg, flags); + return ret > 0 ? ret + seek : ret; +} + +static ssize_t +xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags, + struct kvec *kvec, size_t count, size_t seek) +{ + iov_iter_kvec(&msg->msg_iter, READ, kvec, 1, count); + return xs_sock_recvmsg(sock, msg, flags, seek); +} + +static ssize_t +xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags, + struct bio_vec *bvec, unsigned long nr, size_t count, + size_t seek) +{ + iov_iter_bvec(&msg->msg_iter, READ, bvec, nr, count); + return xs_sock_recvmsg(sock, msg, flags, seek); +} + +static ssize_t +xs_read_discard(struct socket *sock, struct msghdr *msg, int flags, + size_t count) +{ + iov_iter_discard(&msg->msg_iter, READ, count); + return sock_recvmsg(sock, msg, flags); +} + +#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE +static void +xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek) +{ + struct bvec_iter bi = { + .bi_size = count, + }; + struct bio_vec bv; + + bvec_iter_advance(bvec, &bi, seek & PAGE_MASK); + for_each_bvec(bv, bvec, bi, bi) + flush_dcache_page(bv.bv_page); +} +#else +static inline void +xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek) +{ +} +#endif + +static ssize_t +xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, + struct xdr_buf *buf, size_t count, size_t seek, size_t *read) +{ + size_t want, seek_init = seek, offset = 0; + ssize_t ret; + + want = min_t(size_t, count, buf->head[0].iov_len); + if (seek < want) { + ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek); + if (ret <= 0) + goto sock_err; + offset += ret; + if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + goto out; + if (ret != want) + goto out; + seek = 0; + } else { + seek -= want; + offset += want; + } + + want = xs_alloc_sparse_pages(buf, + min_t(size_t, count - offset, buf->page_len), + GFP_KERNEL); + if (seek < want) { + ret = xs_read_bvec(sock, msg, flags, buf->bvec, + xdr_buf_pagecount(buf), + want + buf->page_base, + seek + buf->page_base); + if (ret <= 0) + goto sock_err; + xs_flush_bvec(buf->bvec, ret, seek + buf->page_base); + ret -= buf->page_base; + offset += ret; + if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + goto out; + if (ret != want) + goto out; + seek = 0; + } else { + seek -= want; + offset += want; + } + + want = min_t(size_t, count - offset, buf->tail[0].iov_len); + if (seek < want) { + ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek); + if (ret <= 0) + goto sock_err; + offset += ret; + if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + goto out; + if (ret != want) + goto out; + } else if (offset < seek_init) + offset = seek_init; + ret = -EMSGSIZE; +out: + *read = offset - seek_init; + return ret; +sock_err: + offset += seek; + goto out; +} + +static void +xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf) +{ + if (!transport->recv.copied) { + if (buf->head[0].iov_len >= transport->recv.offset) + memcpy(buf->head[0].iov_base, + &transport->recv.xid, + transport->recv.offset); + transport->recv.copied = transport->recv.offset; + } +} + +static bool +xs_read_stream_request_done(struct sock_xprt *transport) +{ + return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); +} + +static void +xs_read_stream_check_eor(struct sock_xprt *transport, + struct msghdr *msg) +{ + if (xs_read_stream_request_done(transport)) + msg->msg_flags |= MSG_EOR; +} + +static ssize_t +xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, + int flags, struct rpc_rqst *req) +{ + struct xdr_buf *buf = &req->rq_private_buf; + size_t want, read; + ssize_t ret; + + xs_read_header(transport, buf); + + want = transport->recv.len - transport->recv.offset; + if (want != 0) { + ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, + transport->recv.copied + want, + transport->recv.copied, + &read); + transport->recv.offset += read; + transport->recv.copied += read; + } + + if (transport->recv.offset == transport->recv.len) + xs_read_stream_check_eor(transport, msg); + + if (want == 0) + return 0; + + switch (ret) { + default: + break; + case -EFAULT: + case -EMSGSIZE: + msg->msg_flags |= MSG_TRUNC; + return read; + case 0: + return -ESHUTDOWN; + } + return ret < 0 ? ret : read; +} + +static size_t +xs_read_stream_headersize(bool isfrag) +{ + if (isfrag) + return sizeof(__be32); + return 3 * sizeof(__be32); +} + +static ssize_t +xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg, + int flags, size_t want, size_t seek) +{ + struct kvec kvec = { + .iov_base = &transport->recv.fraghdr, + .iov_len = want, + }; + return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek); +} + +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +static ssize_t +xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct rpc_rqst *req; + ssize_t ret; + + /* Look up and lock the request corresponding to the given XID */ + req = xprt_lookup_bc_request(xprt, transport->recv.xid); + if (!req) { + printk(KERN_WARNING "Callback slot table overflowed\n"); + return -ESHUTDOWN; + } + if (transport->recv.copied && !req->rq_private_buf.len) + return -ESHUTDOWN; + + ret = xs_read_stream_request(transport, msg, flags, req); + if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + xprt_complete_bc_request(req, transport->recv.copied); + else + req->rq_private_buf.len = transport->recv.copied; + + return ret; +} +#else /* CONFIG_SUNRPC_BACKCHANNEL */ +static ssize_t +xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) +{ + return -ESHUTDOWN; +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + +static ssize_t +xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct rpc_rqst *req; + ssize_t ret = 0; + + /* Look up and lock the request corresponding to the given XID */ + spin_lock(&xprt->queue_lock); + req = xprt_lookup_rqst(xprt, transport->recv.xid); + if (!req || (transport->recv.copied && !req->rq_private_buf.len)) { + msg->msg_flags |= MSG_TRUNC; + goto out; + } + xprt_pin_rqst(req); + spin_unlock(&xprt->queue_lock); + + ret = xs_read_stream_request(transport, msg, flags, req); + + spin_lock(&xprt->queue_lock); + if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) + xprt_complete_rqst(req->rq_task, transport->recv.copied); + else + req->rq_private_buf.len = transport->recv.copied; + xprt_unpin_rqst(req); +out: + spin_unlock(&xprt->queue_lock); + return ret; +} + +static ssize_t +xs_read_stream(struct sock_xprt *transport, int flags) +{ + struct msghdr msg = { 0 }; + size_t want, read = 0; + ssize_t ret = 0; + + if (transport->recv.len == 0) { + want = xs_read_stream_headersize(transport->recv.copied != 0); + ret = xs_read_stream_header(transport, &msg, flags, want, + transport->recv.offset); + if (ret <= 0) + goto out_err; + transport->recv.offset = ret; + if (transport->recv.offset != want) + return transport->recv.offset; + transport->recv.len = be32_to_cpu(transport->recv.fraghdr) & + RPC_FRAGMENT_SIZE_MASK; + transport->recv.offset -= sizeof(transport->recv.fraghdr); + read = ret; + } + + switch (be32_to_cpu(transport->recv.calldir)) { + default: + msg.msg_flags |= MSG_TRUNC; + break; + case RPC_CALL: + ret = xs_read_stream_call(transport, &msg, flags); + break; + case RPC_REPLY: + ret = xs_read_stream_reply(transport, &msg, flags); + } + if (msg.msg_flags & MSG_TRUNC) { + transport->recv.calldir = cpu_to_be32(-1); + transport->recv.copied = -1; + } + if (ret < 0) + goto out_err; + read += ret; + if (transport->recv.offset < transport->recv.len) { + if (!(msg.msg_flags & MSG_TRUNC)) + return read; + msg.msg_flags = 0; + ret = xs_read_discard(transport->sock, &msg, flags, + transport->recv.len - transport->recv.offset); + if (ret <= 0) + goto out_err; + transport->recv.offset += ret; + read += ret; + if (transport->recv.offset != transport->recv.len) + return read; + } + if (xs_read_stream_request_done(transport)) { + trace_xs_stream_read_request(transport); + transport->recv.copied = 0; + } + transport->recv.offset = 0; + transport->recv.len = 0; + return read; +out_err: + return ret != 0 ? ret : -ESHUTDOWN; +} + +static __poll_t xs_poll_socket(struct sock_xprt *transport) +{ + return transport->sock->ops->poll(transport->file, transport->sock, + NULL); +} + +static bool xs_poll_socket_readable(struct sock_xprt *transport) +{ + __poll_t events = xs_poll_socket(transport); + + return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP); +} + +static void xs_poll_check_readable(struct sock_xprt *transport) +{ + + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + if (!xs_poll_socket_readable(transport)) + return; + if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + queue_work(xprtiod_workqueue, &transport->recv_worker); +} + +static void xs_stream_data_receive(struct sock_xprt *transport) +{ + size_t read = 0; + ssize_t ret = 0; + + mutex_lock(&transport->recv_mutex); + if (transport->sock == NULL) + goto out; + for (;;) { + ret = xs_read_stream(transport, MSG_DONTWAIT); + if (ret < 0) + break; + read += ret; + cond_resched(); + } + if (ret == -ESHUTDOWN) + kernel_sock_shutdown(transport->sock, SHUT_RDWR); + else + xs_poll_check_readable(transport); +out: + mutex_unlock(&transport->recv_mutex); + trace_xs_stream_read_data(&transport->xprt, ret, read); +} + +static void xs_stream_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + unsigned int pflags = memalloc_nofs_save(); + + xs_stream_data_receive(transport); + memalloc_nofs_restore(pflags); +} + +static void +xs_stream_reset_connect(struct sock_xprt *transport) +{ + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->xmit.offset = 0; +} + +static void +xs_stream_start_connect(struct sock_xprt *transport) +{ + transport->xprt.stat.connect_count++; + transport->xprt.stat.connect_start = jiffies; +} + #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) -static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) -{ - struct msghdr msg = { - .msg_name = addr, - .msg_namelen = addrlen, - .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0), - }; - struct kvec iov = { - .iov_base = vec->iov_base + base, - .iov_len = vec->iov_len - base, - }; - - if (iov.iov_len != 0) - return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); - return kernel_sendmsg(sock, &msg, NULL, 0, 0); -} - -static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p) -{ - ssize_t (*do_sendpage)(struct socket *sock, struct page *page, - int offset, size_t size, int flags); - struct page **ppage; - unsigned int remainder; - int err; - - remainder = xdr->page_len - base; - base += xdr->page_base; - ppage = xdr->pages + (base >> PAGE_SHIFT); - base &= ~PAGE_MASK; - do_sendpage = sock->ops->sendpage; - if (!zerocopy) - do_sendpage = sock_no_sendpage; - for(;;) { - unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); - int flags = XS_SENDMSG_FLAGS; - - remainder -= len; - if (more) - flags |= MSG_MORE; - if (remainder != 0) - flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; - err = do_sendpage(sock, *ppage, base, len, flags); - if (remainder == 0 || err != len) - break; - *sent_p += err; - ppage++; - base = 0; - } - if (err > 0) { - *sent_p += err; - err = 0; - } - return err; -} - /** - * xs_sendpages - write pages directly to a socket - * @sock: socket to send on - * @addr: UDP only -- address of destination - * @addrlen: UDP only -- length of destination address - * @xdr: buffer containing this request - * @base: starting position in the buffer - * @zerocopy: true if it is safe to use sendpage() - * @sent_p: return the total number of bytes successfully queued for sending + * xs_nospace - handle transmit was incomplete + * @req: pointer to RPC request + * @transport: pointer to struct sock_xprt * */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p) +static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport) { - unsigned int remainder = xdr->len - base; - int err = 0; - int sent = 0; - - if (unlikely(!sock)) - return -ENOTSOCK; - - if (base != 0) { - addr = NULL; - addrlen = 0; - } - - if (base < xdr->head[0].iov_len || addr != NULL) { - unsigned int len = xdr->head[0].iov_len - base; - remainder -= len; - err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); - if (remainder == 0 || err != len) - goto out; - *sent_p += err; - base = 0; - } else - base -= xdr->head[0].iov_len; - - if (base < xdr->page_len) { - unsigned int len = xdr->page_len - base; - remainder -= len; - err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent); - *sent_p += sent; - if (remainder == 0 || sent != len) - goto out; - base = 0; - } else - base -= xdr->page_len; - - if (base >= xdr->tail[0].iov_len) - return 0; - err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); -out: - if (err > 0) { - *sent_p += err; - err = 0; - } - return err; -} - -static void xs_nospace_callback(struct rpc_task *task) -{ - struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); - - transport->inet->sk_write_pending--; -} - -/** - * xs_nospace - place task on wait queue if transmit was incomplete - * @task: task to put to sleep - * - */ -static int xs_nospace(struct rpc_task *task) -{ - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct rpc_xprt *xprt = &transport->xprt; struct sock *sk = transport->inet; int ret = -EAGAIN; - dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", - task->tk_pid, req->rq_slen - req->rq_bytes_sent, - req->rq_slen); + trace_rpc_socket_nospace(req, transport); /* Protect against races with write_space */ - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); /* Don't race with disconnect */ if (xprt_connected(xprt)) { - /* wait for more buffer space */ - sk->sk_write_pending++; - xprt_wait_for_buffer_space(task, xs_nospace_callback); - } else - ret = -ENOTCONN; - - spin_unlock_bh(&xprt->transport_lock); - - /* Race breaker in case memory is freed before above code is called */ - if (ret == -EAGAIN) { struct socket_wq *wq; rcu_read_lock(); @@ -486,24 +777,76 @@ set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags); rcu_read_unlock(); - sk->sk_write_space(sk); - } + /* wait for more buffer space */ + set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); + sk->sk_write_pending++; + xprt_wait_for_buffer_space(xprt); + } else + ret = -ENOTCONN; + + spin_unlock(&xprt->transport_lock); return ret; } -/* - * Construct a stream transport record marker in @buf. - */ -static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) +static int xs_sock_nospace(struct rpc_rqst *req) { - u32 reclen = buf->len - sizeof(rpc_fraghdr); - rpc_fraghdr *base = buf->head[0].iov_base; - *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); + struct sock_xprt *transport = + container_of(req->rq_xprt, struct sock_xprt, xprt); + struct sock *sk = transport->inet; + int ret = -EAGAIN; + + lock_sock(sk); + if (!sock_writeable(sk)) + ret = xs_nospace(req, transport); + release_sock(sk); + return ret; +} + +static int xs_stream_nospace(struct rpc_rqst *req) +{ + struct sock_xprt *transport = + container_of(req->rq_xprt, struct sock_xprt, xprt); + struct sock *sk = transport->inet; + int ret = -EAGAIN; + + lock_sock(sk); + if (!sk_stream_memory_free(sk)) + ret = xs_nospace(req, transport); + release_sock(sk); + return ret; +} + +static void +xs_stream_prepare_request(struct rpc_rqst *req) +{ + xdr_free_bvec(&req->rq_rcv_buf); + req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL); +} + +/* + * Determine if the previous message in the stream was aborted before it + * could complete transmission. + */ +static bool +xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) +{ + return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; +} + +/* + * Return the stream record marker field for a record of length < 2^31-1 + */ +static rpc_fraghdr +xs_stream_record_marker(struct xdr_buf *xdr) +{ + if (!xdr->len) + return 0; + return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len); } /** * xs_local_send_request - write an RPC request to an AF_LOCAL socket - * @task: RPC task that manages the state of an RPC request + * @req: pointer to RPC request * * Return values: * 0: The request has been sent @@ -512,35 +855,44 @@ * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occured, the request was not sent */ -static int xs_local_send_request(struct rpc_task *task) +static int xs_local_send_request(struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; + rpc_fraghdr rm = xs_stream_record_marker(xdr); + unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; + struct msghdr msg = { + .msg_flags = XS_SENDMSG_FLAGS, + }; + unsigned int sent; int status; - int sent = 0; - xs_encode_stream_record_marker(&req->rq_snd_buf); + /* Close the stream if the previous transmission was incomplete */ + if (xs_send_request_was_aborted(transport, req)) { + xprt_force_disconnect(xprt); + return -ENOTCONN; + } xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); req->rq_xtime = ktime_get(); - status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent, - true, &sent); + status = xprt_sock_sendmsg(transport->sock, &msg, xdr, + transport->xmit.offset, rm, &sent); dprintk("RPC: %s(%u) = %d\n", - __func__, xdr->len - req->rq_bytes_sent, status); + __func__, xdr->len - transport->xmit.offset, status); if (status == -EAGAIN && sock_writeable(transport->inet)) status = -ENOBUFS; if (likely(sent > 0) || status == 0) { - req->rq_bytes_sent += sent; - req->rq_xmit_bytes_sent += sent; - if (likely(req->rq_bytes_sent >= req->rq_slen)) { - req->rq_bytes_sent = 0; + transport->xmit.offset += sent; + req->rq_bytes_sent = transport->xmit.offset; + if (likely(req->rq_bytes_sent >= msglen)) { + req->rq_xmit_bytes_sent += transport->xmit.offset; + transport->xmit.offset = 0; return 0; } status = -EAGAIN; @@ -550,14 +902,14 @@ case -ENOBUFS: break; case -EAGAIN: - status = xs_nospace(task); + status = xs_stream_nospace(req); break; default: dprintk("RPC: sendmsg returned unrecognized error %d\n", -status); - /* fall through */ + fallthrough; case -EPIPE: - xs_close(xprt); + xprt_force_disconnect(xprt); status = -ENOTCONN; } @@ -566,7 +918,7 @@ /** * xs_udp_send_request - write an RPC request to a UDP socket - * @task: address of RPC task that manages the state of an RPC request + * @req: pointer to RPC request * * Return values: * 0: The request has been sent @@ -575,13 +927,17 @@ * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occurred, the request was not sent */ -static int xs_udp_send_request(struct rpc_task *task) +static int xs_udp_send_request(struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; - int sent = 0; + struct msghdr msg = { + .msg_name = xs_addr(xprt), + .msg_namelen = xprt->addrlen, + .msg_flags = XS_SENDMSG_FLAGS, + }; + unsigned int sent; int status; xs_pktdump("packet data:", @@ -590,12 +946,15 @@ if (!xprt_bound(xprt)) return -ENOTCONN; + + if (!xprt_request_get_cong(xprt, req)) + return -EBADSLT; + req->rq_xtime = ktime_get(); - status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, - xdr, req->rq_bytes_sent, true, &sent); + status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent); dprintk("RPC: xs_udp_send_request(%u) = %d\n", - xdr->len - req->rq_bytes_sent, status); + xdr->len, status); /* firewall is blocking us, don't return -EAGAIN or we end up looping */ if (status == -EPERM) @@ -619,7 +978,7 @@ /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(task); + status = xs_sock_nospace(req); break; case -ENETUNREACH: case -ENOBUFS: @@ -639,7 +998,7 @@ /** * xs_tcp_send_request - write an RPC request to a TCP socket - * @task: address of RPC task that manages the state of an RPC request + * @req: pointer to RPC request * * Return values: * 0: The request has been sent @@ -651,28 +1010,30 @@ * XXX: In the case of soft timeouts, should we eventually give up * if sendmsg is not able to make progress? */ -static int xs_tcp_send_request(struct rpc_task *task) +static int xs_tcp_send_request(struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; - bool zerocopy = true; + rpc_fraghdr rm = xs_stream_record_marker(xdr); + unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; + struct msghdr msg = { + .msg_flags = XS_SENDMSG_FLAGS, + }; bool vm_wait = false; + unsigned int sent; int status; - int sent; - xs_encode_stream_record_marker(&req->rq_snd_buf); + /* Close the stream if the previous transmission was incomplete */ + if (xs_send_request_was_aborted(transport, req)) { + if (transport->sock != NULL) + kernel_sock_shutdown(transport->sock, SHUT_RDWR); + return -ENOTCONN; + } xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); - /* Don't use zero copy if this is a resend. If the RPC call - * completes while the socket holds a reference to the pages, - * then we may end up resending corrupted data. - */ - if (task->tk_flags & RPC_TASK_SENT) - zerocopy = false; if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state)) xs_tcp_set_socket_timeouts(xprt, transport->sock); @@ -682,19 +1043,19 @@ * called sendmsg(). */ req->rq_xtime = ktime_get(); while (1) { - sent = 0; - status = xs_sendpages(transport->sock, NULL, 0, xdr, - req->rq_bytes_sent, zerocopy, &sent); + status = xprt_sock_sendmsg(transport->sock, &msg, xdr, + transport->xmit.offset, rm, &sent); dprintk("RPC: xs_tcp_send_request(%u) = %d\n", - xdr->len - req->rq_bytes_sent, status); + xdr->len - transport->xmit.offset, status); /* If we've sent the entire packet, immediately * reset the count of bytes sent. */ - req->rq_bytes_sent += sent; - req->rq_xmit_bytes_sent += sent; - if (likely(req->rq_bytes_sent >= req->rq_slen)) { - req->rq_bytes_sent = 0; + transport->xmit.offset += sent; + req->rq_bytes_sent = transport->xmit.offset; + if (likely(req->rq_bytes_sent >= msglen)) { + req->rq_xmit_bytes_sent += transport->xmit.offset; + transport->xmit.offset = 0; return 0; } @@ -732,7 +1093,7 @@ /* Should we call xs_close() here? */ break; case -EAGAIN: - status = xs_nospace(task); + status = xs_stream_nospace(req); break; case -ECONNRESET: case -ECONNREFUSED: @@ -747,35 +1108,6 @@ } return status; -} - -/** - * xs_tcp_release_xprt - clean up after a tcp transmission - * @xprt: transport - * @task: rpc task - * - * This cleans up if an error causes us to abort the transmission of a request. - * In this case, the socket may need to be reset in order to avoid confusing - * the server. - */ -static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) -{ - struct rpc_rqst *req; - - if (task != xprt->snd_task) - return; - if (task == NULL) - goto out_release; - req = task->tk_rqstp; - if (req == NULL) - goto out_release; - if (req->rq_bytes_sent == 0) - goto out_release; - if (req->rq_bytes_sent == req->rq_snd_buf.len) - goto out_release; - set_bit(XPRT_CLOSE_WAIT, &xprt->state); -out_release: - xprt_release_xprt(xprt, task); } static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) @@ -799,6 +1131,15 @@ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state); + clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state); +} + +static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr) +{ + set_bit(nr, &transport->sock_state); + queue_work(xprtiod_workqueue, &transport->error_worker); } static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) @@ -819,20 +1160,24 @@ */ static void xs_error_report(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - int err; read_lock_bh(&sk->sk_callback_lock); if (!(xprt = xprt_from_sock(sk))) goto out; - err = -sk->sk_err; - if (err == 0) + transport = container_of(xprt, struct sock_xprt, xprt); + transport->xprt_err = -sk->sk_err; + if (transport->xprt_err == 0) goto out; dprintk("RPC: xs_error_report client %p, error=%d...\n", - xprt, -err); - trace_rpc_socket_error(xprt, sk->sk_socket, err); - xprt_wake_pending_tasks(xprt, err); + xprt, -transport->xprt_err); + trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err); + + /* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */ + smp_mb__before_atomic(); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR); out: read_unlock_bh(&sk->sk_callback_lock); } @@ -842,9 +1187,20 @@ struct socket *sock = transport->sock; struct sock *sk = transport->inet; struct rpc_xprt *xprt = &transport->xprt; + struct file *filp = transport->file; if (sk == NULL) return; + /* + * Make sure we're calling this in a context from which it is safe + * to call __fput_sync(). In practice that means rpciod and the + * system workqueue. + */ + if (!(current->flags & PF_WQ_WORKER)) { + WARN_ON_ONCE(1); + set_bit(XPRT_CLOSE_WAIT, &xprt->state); + return; + } if (atomic_read(&transport->xprt.swapper)) sk_clear_memalloc(sk); @@ -855,6 +1211,7 @@ write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; + transport->file = NULL; sk->sk_user_data = NULL; @@ -862,10 +1219,14 @@ xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); + /* Reset stream record info */ + xs_stream_reset_connect(transport); mutex_unlock(&transport->recv_mutex); trace_rpc_socket_close(xprt, sock); - sock_release(sock); + __fput_sync(filp); + + xprt_disconnect_done(xprt); } /** @@ -886,8 +1247,6 @@ xs_reset_transport(transport); xprt->reestablish_timeout = 0; - - xprt_disconnect_done(xprt); } static void xs_inject_disconnect(struct rpc_xprt *xprt) @@ -917,116 +1276,9 @@ cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); cancel_work_sync(&transport->recv_worker); + cancel_work_sync(&transport->error_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); -} - -static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) -{ - struct xdr_skb_reader desc = { - .skb = skb, - .offset = sizeof(rpc_fraghdr), - .count = skb->len - sizeof(rpc_fraghdr), - }; - - if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0) - return -1; - if (desc.count) - return -1; - return 0; -} - -/** - * xs_local_data_read_skb - * @xprt: transport - * @sk: socket - * @skb: skbuff - * - * Currently this assumes we can read the whole reply in a single gulp. - */ -static void xs_local_data_read_skb(struct rpc_xprt *xprt, - struct sock *sk, - struct sk_buff *skb) -{ - struct rpc_task *task; - struct rpc_rqst *rovr; - int repsize, copied; - u32 _xid; - __be32 *xp; - - repsize = skb->len - sizeof(rpc_fraghdr); - if (repsize < 4) { - dprintk("RPC: impossible RPC reply size %d\n", repsize); - return; - } - - /* Copy the XID from the skb... */ - xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); - if (xp == NULL) - return; - - /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->recv_lock); - rovr = xprt_lookup_rqst(xprt, *xp); - if (!rovr) - goto out_unlock; - xprt_pin_rqst(rovr); - spin_unlock(&xprt->recv_lock); - task = rovr->rq_task; - - copied = rovr->rq_private_buf.buflen; - if (copied > repsize) - copied = repsize; - - if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { - dprintk("RPC: sk_buff copy failed\n"); - spin_lock(&xprt->recv_lock); - goto out_unpin; - } - - spin_lock(&xprt->recv_lock); - xprt_complete_rqst(task, copied); -out_unpin: - xprt_unpin_rqst(rovr); - out_unlock: - spin_unlock(&xprt->recv_lock); -} - -static void xs_local_data_receive(struct sock_xprt *transport) -{ - struct sk_buff *skb; - struct sock *sk; - int err; - -restart: - mutex_lock(&transport->recv_mutex); - sk = transport->inet; - if (sk == NULL) - goto out; - for (;;) { - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb != NULL) { - xs_local_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); - continue; - } - if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - break; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } - } -out: - mutex_unlock(&transport->recv_mutex); -} - -static void xs_local_data_receive_workfn(struct work_struct *work) -{ - struct sock_xprt *transport = - container_of(work, struct sock_xprt, recv_worker); - xs_local_data_receive(transport); } /** @@ -1058,13 +1310,13 @@ return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; xprt_pin_rqst(rovr); xprt_update_rtt(rovr->rq_task); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); task = rovr->rq_task; if ((copied = rovr->rq_private_buf.buflen) > repsize) @@ -1072,22 +1324,22 @@ /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); goto out_unpin; } - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, copied); - spin_unlock_bh(&xprt->transport_lock); - spin_lock(&xprt->recv_lock); + spin_unlock(&xprt->transport_lock); + spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); out_unpin: xprt_unpin_rqst(rovr); out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } static void xs_udp_data_receive(struct sock_xprt *transport) @@ -1096,26 +1348,19 @@ struct sock *sk; int err; -restart: mutex_lock(&transport->recv_mutex); sk = transport->inet; if (sk == NULL) goto out; for (;;) { skb = skb_recv_udp(sk, 0, 1, &err); - if (skb != NULL) { - xs_udp_data_read_skb(&transport->xprt, sk, skb); - consume_skb(skb); - continue; - } - if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + if (skb == NULL) break; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } + xs_udp_data_read_skb(&transport->xprt, sk, skb); + consume_skb(skb); + cond_resched(); } + xs_poll_check_readable(transport); out: mutex_unlock(&transport->recv_mutex); } @@ -1124,7 +1369,10 @@ { struct sock_xprt *transport = container_of(work, struct sock_xprt, recv_worker); + unsigned int pflags = memalloc_nofs_save(); + xs_udp_data_receive(transport); + memalloc_nofs_restore(pflags); } /** @@ -1163,416 +1411,12 @@ xprt_force_disconnect(xprt); } -static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - size_t len, used; - char *p; - - p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; - len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; - used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; - if (used != len) - return; - - transport->tcp_reclen = ntohl(transport->tcp_fraghdr); - if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) - transport->tcp_flags |= TCP_RCV_LAST_FRAG; - else - transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; - transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; - - transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; - transport->tcp_offset = 0; - - /* Sanity check of the record length */ - if (unlikely(transport->tcp_reclen < 8)) { - dprintk("RPC: invalid TCP record fragment length\n"); - xs_tcp_force_close(xprt); - return; - } - dprintk("RPC: reading TCP record fragment of length %d\n", - transport->tcp_reclen); -} - -static void xs_tcp_check_fraghdr(struct sock_xprt *transport) -{ - if (transport->tcp_offset == transport->tcp_reclen) { - transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; - transport->tcp_offset = 0; - if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - transport->tcp_flags |= TCP_RCV_COPY_XID; - transport->tcp_copied = 0; - } - } -} - -static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc) -{ - size_t len, used; - char *p; - - len = sizeof(transport->tcp_xid) - transport->tcp_offset; - dprintk("RPC: reading XID (%zu bytes)\n", len); - p = ((char *) &transport->tcp_xid) + transport->tcp_offset; - used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; - if (used != len) - return; - transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_READ_CALLDIR; - transport->tcp_copied = 4; - dprintk("RPC: reading %s XID %08x\n", - (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" - : "request with", - ntohl(transport->tcp_xid)); - xs_tcp_check_fraghdr(transport); -} - -static inline void xs_tcp_read_calldir(struct sock_xprt *transport, - struct xdr_skb_reader *desc) -{ - size_t len, used; - u32 offset; - char *p; - - /* - * We want transport->tcp_offset to be 8 at the end of this routine - * (4 bytes for the xid and 4 bytes for the call/reply flag). - * When this function is called for the first time, - * transport->tcp_offset is 4 (after having already read the xid). - */ - offset = transport->tcp_offset - sizeof(transport->tcp_xid); - len = sizeof(transport->tcp_calldir) - offset; - dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len); - p = ((char *) &transport->tcp_calldir) + offset; - used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; - if (used != len) - return; - transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; - /* - * We don't yet have the XDR buffer, so we will write the calldir - * out after we get the buffer from the 'struct rpc_rqst' - */ - switch (ntohl(transport->tcp_calldir)) { - case RPC_REPLY: - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; - transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_flags |= TCP_RPC_REPLY; - break; - case RPC_CALL: - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; - transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_flags &= ~TCP_RPC_REPLY; - break; - default: - dprintk("RPC: invalid request message type\n"); - xs_tcp_force_close(&transport->xprt); - } - xs_tcp_check_fraghdr(transport); -} - -static inline void xs_tcp_read_common(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc, - struct rpc_rqst *req) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct xdr_buf *rcvbuf; - size_t len; - ssize_t r; - - rcvbuf = &req->rq_private_buf; - - if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { - /* - * Save the RPC direction in the XDR buffer - */ - memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, - &transport->tcp_calldir, - sizeof(transport->tcp_calldir)); - transport->tcp_copied += sizeof(transport->tcp_calldir); - transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; - } - - len = desc->count; - if (len > transport->tcp_reclen - transport->tcp_offset) - desc->count = transport->tcp_reclen - transport->tcp_offset; - r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, - desc, xdr_skb_read_bits); - - if (desc->count) { - /* Error when copying to the receive buffer, - * usually because we weren't able to allocate - * additional buffer pages. All we can do now - * is turn off TCP_RCV_COPY_DATA, so the request - * will not receive any additional updates, - * and time out. - * Any remaining data from this record will - * be discarded. - */ - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - dprintk("RPC: XID %08x truncated request\n", - ntohl(transport->tcp_xid)); - dprintk("RPC: xprt = %p, tcp_copied = %lu, " - "tcp_offset = %u, tcp_reclen = %u\n", - xprt, transport->tcp_copied, - transport->tcp_offset, transport->tcp_reclen); - return; - } - - transport->tcp_copied += r; - transport->tcp_offset += r; - desc->count = len - r; - - dprintk("RPC: XID %08x read %zd bytes\n", - ntohl(transport->tcp_xid), r); - dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " - "tcp_reclen = %u\n", xprt, transport->tcp_copied, - transport->tcp_offset, transport->tcp_reclen); - - if (transport->tcp_copied == req->rq_private_buf.buflen) - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - else if (transport->tcp_offset == transport->tcp_reclen) { - if (transport->tcp_flags & TCP_RCV_LAST_FRAG) - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - } -} - -/* - * Finds the request corresponding to the RPC xid and invokes the common - * tcp read code to read the data. - */ -static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; - - dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); - - /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->recv_lock); - req = xprt_lookup_rqst(xprt, transport->tcp_xid); - if (!req) { - dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->tcp_xid)); - spin_unlock(&xprt->recv_lock); - return -1; - } - xprt_pin_rqst(req); - spin_unlock(&xprt->recv_lock); - - xs_tcp_read_common(xprt, desc, req); - - spin_lock(&xprt->recv_lock); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) - xprt_complete_rqst(req->rq_task, transport->tcp_copied); - xprt_unpin_rqst(req); - spin_unlock(&xprt->recv_lock); - return 0; -} - #if defined(CONFIG_SUNRPC_BACKCHANNEL) -/* - * Obtains an rpc_rqst previously allocated and invokes the common - * tcp read code to read the data. The result is placed in the callback - * queue. - * If we're unable to obtain the rpc_rqst we schedule the closing of the - * connection and return -1. - */ -static int xs_tcp_read_callback(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; - - /* Look up the request corresponding to the given XID */ - req = xprt_lookup_bc_request(xprt, transport->tcp_xid); - if (req == NULL) { - printk(KERN_WARNING "Callback slot table overflowed\n"); - xprt_force_disconnect(xprt); - return -1; - } - - dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); - xs_tcp_read_common(xprt, desc, req); - - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) - xprt_complete_bc_request(req, transport->tcp_copied); - - return 0; -} - -static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - - return (transport->tcp_flags & TCP_RPC_REPLY) ? - xs_tcp_read_reply(xprt, desc) : - xs_tcp_read_callback(xprt, desc); -} - -static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) -{ - int ret; - - ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0, - SVC_SOCK_ANONYMOUS); - if (ret < 0) - return ret; - return 0; -} - static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) { return PAGE_SIZE; } -#else -static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - return xs_tcp_read_reply(xprt, desc); -} #endif /* CONFIG_SUNRPC_BACKCHANNEL */ - -/* - * Read data off the transport. This can be either an RPC_CALL or an - * RPC_REPLY. Relay the processing to helper functions. - */ -static void xs_tcp_read_data(struct rpc_xprt *xprt, - struct xdr_skb_reader *desc) -{ - struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - - if (_xs_tcp_read_data(xprt, desc) == 0) - xs_tcp_check_fraghdr(transport); - else { - /* - * The transport_lock protects the request handling. - * There's no need to hold it to update the tcp_flags. - */ - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - } -} - -static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) -{ - size_t len; - - len = transport->tcp_reclen - transport->tcp_offset; - if (len > desc->count) - len = desc->count; - desc->count -= len; - desc->offset += len; - transport->tcp_offset += len; - dprintk("RPC: discarded %zu bytes\n", len); - xs_tcp_check_fraghdr(transport); -} - -static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) -{ - struct rpc_xprt *xprt = rd_desc->arg.data; - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct xdr_skb_reader desc = { - .skb = skb, - .offset = offset, - .count = len, - }; - size_t ret; - - dprintk("RPC: xs_tcp_data_recv started\n"); - do { - trace_xs_tcp_data_recv(transport); - /* Read in a new fragment marker if necessary */ - /* Can we ever really expect to get completely empty fragments? */ - if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { - xs_tcp_read_fraghdr(xprt, &desc); - continue; - } - /* Read in the xid if necessary */ - if (transport->tcp_flags & TCP_RCV_COPY_XID) { - xs_tcp_read_xid(transport, &desc); - continue; - } - /* Read in the call/reply flag */ - if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { - xs_tcp_read_calldir(transport, &desc); - continue; - } - /* Read in the request data */ - if (transport->tcp_flags & TCP_RCV_COPY_DATA) { - xs_tcp_read_data(xprt, &desc); - continue; - } - /* Skip over any trailing bytes on short reads */ - xs_tcp_read_discard(transport, &desc); - } while (desc.count); - ret = len - desc.count; - if (ret < rd_desc->count) - rd_desc->count -= ret; - else - rd_desc->count = 0; - trace_xs_tcp_data_recv(transport); - dprintk("RPC: xs_tcp_data_recv done\n"); - return ret; -} - -static void xs_tcp_data_receive(struct sock_xprt *transport) -{ - struct rpc_xprt *xprt = &transport->xprt; - struct sock *sk; - read_descriptor_t rd_desc = { - .arg.data = xprt, - }; - unsigned long total = 0; - int read = 0; - -restart: - mutex_lock(&transport->recv_mutex); - sk = transport->inet; - if (sk == NULL) - goto out; - - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - for (;;) { - rd_desc.count = RPC_TCP_READ_CHUNK_SZ; - lock_sock(sk); - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (rd_desc.count != 0 || read < 0) { - clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); - release_sock(sk); - break; - } - release_sock(sk); - total += read; - if (need_resched()) { - mutex_unlock(&transport->recv_mutex); - cond_resched(); - goto restart; - } - } - if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) - queue_work(xprtiod_workqueue, &transport->recv_worker); -out: - mutex_unlock(&transport->recv_mutex); - trace_xs_tcp_data_ready(xprt, read, total); -} - -static void xs_tcp_data_receive_workfn(struct work_struct *work) -{ - struct sock_xprt *transport = - container_of(work, struct sock_xprt, recv_worker); - xs_tcp_data_receive(transport); -} /** * xs_tcp_state_change - callback to handle TCP socket state changes @@ -1598,15 +1442,7 @@ trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: - spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { - - /* Reset TCP record info */ - transport->tcp_offset = 0; - transport->tcp_reclen = 0; - transport->tcp_copied = 0; - transport->tcp_flags = - TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); xprt_clear_connecting(xprt); @@ -1614,9 +1450,8 @@ xprt->stat.connect_count++; xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; - xprt_wake_pending_tasks(xprt, -EAGAIN); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING); } - spin_unlock(&xprt->transport_lock); break; case TCP_FIN_WAIT1: /* The client initiated a shutdown of the socket */ @@ -1632,8 +1467,8 @@ /* The server initiated a shutdown of the socket */ xprt->connect_cookie++; clear_bit(XPRT_CONNECTED, &xprt->state); - xs_tcp_force_close(xprt); - /* fall through */ + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); + fallthrough; case TCP_CLOSING: /* * If the server closed down the connection, make sure that @@ -1653,10 +1488,8 @@ &transport->sock_state)) xprt_clear_connecting(xprt); clear_bit(XPRT_CLOSING, &xprt->state); - if (sk->sk_err) - xprt_wake_pending_tasks(xprt, -sk->sk_err); /* Trigger the socket release */ - xs_tcp_force_close(xprt); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT); } out: read_unlock_bh(&sk->sk_callback_lock); @@ -1665,6 +1498,7 @@ static void xs_write_space(struct sock *sk) { struct socket_wq *wq; + struct sock_xprt *transport; struct rpc_xprt *xprt; if (!sk->sk_socket) @@ -1673,12 +1507,14 @@ if (unlikely(!(xprt = xprt_from_sock(sk)))) return; + transport = container_of(xprt, struct sock_xprt, xprt); rcu_read_lock(); wq = rcu_dereference(sk->sk_wq); if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) goto out; - xprt_write_space(xprt); + xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE); + sk->sk_write_pending--; out: rcu_read_unlock(); } @@ -1765,15 +1601,16 @@ /** * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport + * @xprt: controlling transport * @task: task that timed out * * Adjust the congestion window after a retransmit timeout has occurred. */ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) { - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static int xs_get_random_port(void) @@ -1787,21 +1624,6 @@ range = max - min + 1; rand = (unsigned short) prandom_u32() % range; return rand + min; -} - -/** - * xs_set_reuseaddr_port - set the socket's port and address reuse options - * @sock: socket - * - * Note that this function has to be called on all sockets that share the - * same port, and it must be called before binding. - */ -static void xs_sock_set_reuseport(struct socket *sock) -{ - int opt = 1; - - kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, - (char *)&opt, sizeof(opt)); } static unsigned short xs_sock_getport(struct socket *sock) @@ -1838,7 +1660,7 @@ static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock) { - if (transport->srcport == 0) + if (transport->srcport == 0 && transport->xprt.reuseport) transport->srcport = xs_sock_getport(sock); } @@ -1850,6 +1672,13 @@ port = xs_get_random_port(); return port; } + +unsigned short get_srcport(struct rpc_xprt *xprt) +{ + struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt); + return xs_sock_getport(sock->sock); +} +EXPORT_SYMBOL(get_srcport); static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) { @@ -1892,7 +1721,8 @@ err = kernel_bind(sock, (struct sockaddr *)&myaddr, transport->xprt.addrlen); if (err == 0) { - transport->srcport = port; + if (transport->xprt.reuseport) + transport->srcport = port; break; } last = port; @@ -1983,6 +1813,7 @@ struct sock_xprt *transport, int family, int type, int protocol, bool reuseport) { + struct file *filp; struct socket *sock; int err; @@ -1995,13 +1826,18 @@ xs_reclassify_socket(family, sock); if (reuseport) - xs_sock_set_reuseport(sock); + sock_set_reuseport(sock->sk); err = xs_bind(transport, sock); if (err) { sock_release(sock); goto out; } + + filp = sock_alloc_file(sock, O_NONBLOCK, NULL); + if (IS_ERR(filp)) + return ERR_CAST(filp); + transport->file = filp; return sock; out: @@ -2026,7 +1862,6 @@ sk->sk_write_space = xs_udp_write_space; sock_set_flag(sk, SOCK_FASYNC); sk->sk_error_report = xs_error_report; - sk->sk_allocation = GFP_NOIO; xprt_clear_connected(xprt); @@ -2037,7 +1872,8 @@ write_unlock_bh(&sk->sk_callback_lock); } - /* Tell the socket layer to start connecting... */ + xs_stream_start_connect(transport); + return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); } @@ -2048,8 +1884,9 @@ static int xs_local_setup_socket(struct sock_xprt *transport) { struct rpc_xprt *xprt = &transport->xprt; + struct file *filp; struct socket *sock; - int status = -EIO; + int status; status = __sock_create(xprt->xprt_net, AF_LOCAL, SOCK_STREAM, 0, &sock, 1); @@ -2059,6 +1896,13 @@ goto out; } xs_reclassify_socket(AF_LOCAL, sock); + + filp = sock_alloc_file(sock, O_NONBLOCK, NULL); + if (IS_ERR(filp)) { + status = PTR_ERR(filp); + goto out; + } + transport->file = filp; dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); @@ -2073,6 +1917,7 @@ xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; xprt_set_connected(xprt); + break; case -ENOBUFS: break; case -ENOENT: @@ -2110,6 +1955,7 @@ * we'll need to figure out how to pass a namespace to * connect. */ + task->tk_rpc_status = -ENOTCONN; rpc_exit(task, -ENOTCONN); return; } @@ -2213,7 +2059,6 @@ sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sock_set_flag(sk, SOCK_FASYNC); - sk->sk_allocation = GFP_NOIO; xprt_set_connected(xprt); @@ -2291,30 +2136,24 @@ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); unsigned int keepidle; unsigned int keepcnt; - unsigned int opt_on = 1; unsigned int timeo; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ); keepcnt = xprt->timeout->to_retries + 1; timeo = jiffies_to_msecs(xprt->timeout->to_initval) * (xprt->timeout->to_retries + 1); clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); /* TCP Keepalive options */ - kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&opt_on, sizeof(opt_on)); - kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, - (char *)&keepidle, sizeof(keepidle)); - kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, - (char *)&keepidle, sizeof(keepidle)); - kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, - (char *)&keepcnt, sizeof(keepcnt)); + sock_set_keepalive(sock->sk); + tcp_sock_set_keepidle(sock->sk, keepidle); + tcp_sock_set_keepintvl(sock->sk, keepidle); + tcp_sock_set_keepcnt(sock->sk, keepcnt); /* TCP user timeout (see RFC5482) */ - kernel_setsockopt(sock, SOL_TCP, TCP_USER_TIMEOUT, - (char *)&timeo, sizeof(timeo)); + tcp_sock_set_user_timeout(sock->sk, timeo); } static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, @@ -2325,7 +2164,7 @@ struct rpc_timeout to; unsigned long initval; - spin_lock_bh(&xprt->transport_lock); + spin_lock(&xprt->transport_lock); if (reconnect_timeout < xprt->max_reconnect_timeout) xprt->max_reconnect_timeout = reconnect_timeout; if (connect_timeout < xprt->connect_timeout) { @@ -2342,7 +2181,7 @@ xprt->connect_timeout = connect_timeout; } set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->transport_lock); } static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) @@ -2352,7 +2191,6 @@ if (!transport->inet) { struct sock *sk = sock->sk; - unsigned int addr_pref = IPV6_PREFER_SRC_PUBLIC; /* Avoid temporary address, they are bad for long-lived * connections such as NFS mounts. @@ -2361,8 +2199,10 @@ * knowledge about the normal duration of connections, * MAY override this as appropriate. */ - kernel_setsockopt(sock, SOL_IPV6, IPV6_ADDR_PREFERENCES, - (char *)&addr_pref, sizeof(addr_pref)); + if (xs_addr(xprt)->sa_family == PF_INET6) { + ip6_sock_set_addr_preferences(sk, + IPV6_PREFER_SRC_PUBLIC); + } xs_tcp_set_socket_timeouts(xprt, sock); @@ -2376,7 +2216,6 @@ sk->sk_write_space = xs_tcp_write_space; sock_set_flag(sk, SOCK_FASYNC); sk->sk_error_report = xs_error_report; - sk->sk_allocation = GFP_NOIO; /* socket options */ sock_reset_flag(sk, SOCK_LINGER); @@ -2396,13 +2235,15 @@ xs_set_memalloc(xprt); + xs_stream_start_connect(transport); + /* Tell the socket layer to start connecting... */ set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); switch (ret) { case 0: xs_set_srcport(transport, sock); - /* fall through */ + fallthrough; case -EINPROGRESS: /* SYN_SENT! */ if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) @@ -2418,6 +2259,7 @@ /** * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint + * @work: queued work item * * Invoked by a work queue tasklet. */ @@ -2429,10 +2271,14 @@ struct rpc_xprt *xprt = &transport->xprt; int status = -EIO; - if (!sock) { - sock = xs_create_sock(xprt, transport, - xs_addr(xprt)->sa_family, SOCK_STREAM, - IPPROTO_TCP, true); + if (xprt_connected(xprt)) + goto out; + if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT, + &transport->sock_state) || + !sock) { + xs_reset_transport(transport); + sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family, + SOCK_STREAM, IPPROTO_TCP, true); if (IS_ERR(sock)) { status = PTR_ERR(sock); goto out; @@ -2454,7 +2300,7 @@ default: printk("%s: connect returned unhandled error %d\n", __func__, status); - /* fall through */ + fallthrough; case -EADDRNOTAVAIL: /* We're probably in TIME_WAIT. Get rid of existing socket, * and retry @@ -2463,6 +2309,8 @@ break; case 0: case -EINPROGRESS: + set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state); + fallthrough; case -EALREADY: xprt_unlock_connect(xprt, transport); return; @@ -2493,25 +2341,6 @@ xprt_wake_pending_tasks(xprt, status); } -static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt) -{ - unsigned long start, now = jiffies; - - start = xprt->stat.connect_start + xprt->reestablish_timeout; - if (time_after(start, now)) - return start - now; - return 0; -} - -static void xs_reconnect_backoff(struct rpc_xprt *xprt) -{ - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) - xprt->reestablish_timeout = xprt->max_reconnect_timeout; - if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; -} - /** * xs_connect - connect a socket to a remote endpoint * @xprt: pointer to transport structure @@ -2535,14 +2364,10 @@ if (transport->sock != NULL) { dprintk("RPC: xs_connect delayed xprt %p for %lu " - "seconds\n", - xprt, xprt->reestablish_timeout / HZ); + "seconds\n", xprt, xprt->reestablish_timeout / HZ); - /* Start by resetting any existing state */ - xs_reset_transport(transport); - - delay = xs_reconnect_delay(xprt); - xs_reconnect_backoff(xprt); + delay = xprt_reconnect_delay(xprt); + xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO); } else dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); @@ -2550,6 +2375,53 @@ queue_delayed_work(xprtiod_workqueue, &transport->connect_worker, delay); +} + +static void xs_wake_disconnect(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state)) + xs_tcp_force_close(&transport->xprt); +} + +static void xs_wake_write(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state)) + xprt_write_space(&transport->xprt); +} + +static void xs_wake_error(struct sock_xprt *transport) +{ + int sockerr; + + if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) + return; + mutex_lock(&transport->recv_mutex); + if (transport->sock == NULL) + goto out; + if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state)) + goto out; + sockerr = xchg(&transport->xprt_err, 0); + if (sockerr < 0) + xprt_wake_pending_tasks(&transport->xprt, sockerr); +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_wake_pending(struct sock_xprt *transport) +{ + if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state)) + xprt_wake_pending_tasks(&transport->xprt, -EAGAIN); +} + +static void xs_error_handle(struct work_struct *work) +{ + struct sock_xprt *transport = container_of(work, + struct sock_xprt, error_worker); + + xs_wake_disconnect(transport); + xs_wake_write(transport); + xs_wake_error(transport); + xs_wake_pending(transport); } /** @@ -2569,7 +2441,7 @@ "%llu %llu %lu %llu %llu\n", xprt->stat.bind_count, xprt->stat.connect_count, - xprt->stat.connect_time, + xprt->stat.connect_time / HZ, idle_time, xprt->stat.sends, xprt->stat.recvs, @@ -2624,7 +2496,7 @@ transport->srcport, xprt->stat.bind_count, xprt->stat.connect_count, - xprt->stat.connect_time, + xprt->stat.connect_time / HZ, idle_time, xprt->stat.sends, xprt->stat.recvs, @@ -2678,47 +2550,43 @@ free_page((unsigned long)buf); } -/* - * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex - * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request. - */ static int bc_sendto(struct rpc_rqst *req) { - int len; - struct xdr_buf *xbufp = &req->rq_snd_buf; - struct rpc_xprt *xprt = req->rq_xprt; + struct xdr_buf *xdr = &req->rq_snd_buf; struct sock_xprt *transport = - container_of(xprt, struct sock_xprt, xprt); - struct socket *sock = transport->sock; - unsigned long headoff; - unsigned long tailoff; + container_of(req->rq_xprt, struct sock_xprt, xprt); + struct msghdr msg = { + .msg_flags = 0, + }; + rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | + (u32)xdr->len); + unsigned int sent = 0; + int err; - xs_encode_stream_record_marker(xbufp); - - tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK; - headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK; - len = svc_send_common(sock, xbufp, - virt_to_page(xbufp->head[0].iov_base), headoff, - xbufp->tail[0].iov_base, tailoff); - - if (len != xbufp->len) { - printk(KERN_NOTICE "Error sending entire callback!\n"); - len = -EAGAIN; - } - - return len; + req->rq_xtime = ktime_get(); + err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent); + xdr_free_bvec(xdr); + if (err < 0 || sent != (xdr->len + sizeof(marker))) + return -EAGAIN; + return sent; } -/* - * The send routine. Borrows from svc_send +/** + * bc_send_request - Send a backchannel Call on a TCP socket + * @req: rpc_rqst containing Call message to be sent + * + * xpt_mutex ensures @rqstp's whole message is written to the socket + * without interruption. + * + * Return values: + * %0 if the message was sent successfully + * %ENOTCONN if the message was not sent */ -static int bc_send_request(struct rpc_task *task) +static int bc_send_request(struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; struct svc_xprt *xprt; int len; - dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); /* * Get the server socket associated with this callback xprt */ @@ -2728,12 +2596,7 @@ * Grab the mutex to serialize data as the connection is shared * with the fore channel */ - if (!mutex_trylock(&xprt->xpt_mutex)) { - rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL); - if (!mutex_trylock(&xprt->xpt_mutex)) - return -EAGAIN; - rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task); - } + mutex_lock(&xprt->xpt_mutex); if (test_bit(XPT_DEAD, &xprt->xpt_flags)) len = -ENOTCONN; else @@ -2752,6 +2615,7 @@ static void bc_close(struct rpc_xprt *xprt) { + xprt_disconnect_done(xprt); } /* @@ -2769,7 +2633,7 @@ static const struct rpc_xprt_ops xs_local_ops = { .reserve_xprt = xprt_reserve_xprt, - .release_xprt = xs_tcp_release_xprt, + .release_xprt = xprt_release_xprt, .alloc_slot = xprt_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = xs_local_rpcbind, @@ -2777,8 +2641,9 @@ .connect = xs_local_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, + .prepare_request = xs_stream_prepare_request, .send_request = xs_local_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_def, + .wait_for_reply_request = xprt_wait_for_reply_request_def, .close = xs_close, .destroy = xs_destroy, .print_stats = xs_local_print_stats, @@ -2798,7 +2663,7 @@ .buf_alloc = rpc_malloc, .buf_free = rpc_free, .send_request = xs_udp_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_rtt, + .wait_for_reply_request = xprt_wait_for_reply_request_rtt, .timer = xs_udp_timer, .release_request = xprt_release_rqst_cong, .close = xs_close, @@ -2811,16 +2676,17 @@ static const struct rpc_xprt_ops xs_tcp_ops = { .reserve_xprt = xprt_reserve_xprt, - .release_xprt = xs_tcp_release_xprt, - .alloc_slot = xprt_lock_and_alloc_slot, + .release_xprt = xprt_release_xprt, + .alloc_slot = xprt_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = rpcb_getport_async, .set_port = xs_set_port, .connect = xs_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, + .prepare_request = xs_stream_prepare_request, .send_request = xs_tcp_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_def, + .wait_for_reply_request = xprt_wait_for_reply_request_def, .close = xs_tcp_shutdown, .destroy = xs_destroy, .set_connect_timeout = xs_tcp_set_connect_timeout, @@ -2830,8 +2696,8 @@ .inject_disconnect = xs_inject_disconnect, #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, - .bc_up = xs_tcp_bc_up, .bc_maxpayload = xs_tcp_bc_maxpayload, + .bc_num_slots = xprt_bc_max_slots, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif @@ -2849,7 +2715,7 @@ .buf_alloc = bc_malloc, .buf_free = bc_free, .send_request = bc_send_request, - .set_retrans_timeout = xprt_set_retrans_timeout_def, + .wait_for_reply_request = xprt_wait_for_reply_request_def, .close = bc_close, .destroy = bc_destroy, .print_stats = xs_tcp_print_stats, @@ -2950,7 +2816,6 @@ transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = 0; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; @@ -2960,9 +2825,9 @@ xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; - INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_dummy_setup_socket); + INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); + INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); switch (sun->sun_family) { case AF_LOCAL: @@ -2974,9 +2839,6 @@ } xprt_set_bound(xprt); xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); - ret = ERR_PTR(xs_local_setup_socket(transport)); - if (ret) - goto out_err; break; default: ret = ERR_PTR(-EAFNOSUPPORT); @@ -3020,7 +2882,6 @@ transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_UDP; - xprt->tsh_size = 0; /* XXX: header size can vary due to auth type, IPv6, etc. */ xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); @@ -3033,6 +2894,7 @@ xprt->timeout = &xs_udp_default_timeout; INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); switch (addr->sa_family) { @@ -3100,7 +2962,6 @@ transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; @@ -3114,7 +2975,8 @@ xprt->connect_timeout = xprt->timeout->to_initval * (xprt->timeout->to_retries + 1); - INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); + INIT_WORK(&transport->error_worker, xs_error_handle); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); switch (addr->sa_family) { @@ -3173,7 +3035,6 @@ transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->timeout = &xs_tcp_default_timeout; @@ -3277,10 +3138,8 @@ */ int init_socket_xprt(void) { -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (!sunrpc_table_header) sunrpc_table_header = register_sysctl_table(sunrpc_table); -#endif xprt_register_transport(&xs_local_transport); xprt_register_transport(&xs_udp_transport); @@ -3296,12 +3155,10 @@ */ void cleanup_socket_xprt(void) { -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (sunrpc_table_header) { unregister_sysctl_table(sunrpc_table_header); sunrpc_table_header = NULL; } -#endif xprt_unregister_transport(&xs_local_transport); xprt_unregister_transport(&xs_udp_transport); -- Gitblit v1.6.2