From 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Tue, 22 Oct 2024 10:36:11 +0000
Subject: [PATCH] 修改4g拨号为QMI,需要在系统里后台执行quectel-CM

---
 kernel/net/sunrpc/xprtrdma/rpc_rdma.c |  908 +++++++++++++++++++++++++++++++------------------------
 1 files changed, 511 insertions(+), 397 deletions(-)

diff --git a/kernel/net/sunrpc/xprtrdma/rpc_rdma.c b/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
index f2eaf26..b8174c7 100644
--- a/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,16 +71,13 @@
 	size = RPCRDMA_HDRLEN_MIN;
 
 	/* Maximum Read list size */
-	maxsegs += 2;	/* segment for head and tail buffers */
-	size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
+	size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
 
 	/* Minimal Read chunk size */
 	size += sizeof(__be32);	/* segment count */
 	size += rpcrdma_segment_maxsz * sizeof(__be32);
 	size += sizeof(__be32);	/* list discriminator */
 
-	dprintk("RPC:       %s: max call header size = %u\n",
-		__func__, size);
 	return size;
 }
 
@@ -97,26 +94,29 @@
 	size = RPCRDMA_HDRLEN_MIN;
 
 	/* Maximum Write list size */
-	maxsegs += 2;	/* segment for head and tail buffers */
-	size = sizeof(__be32);		/* segment count */
+	size += sizeof(__be32);		/* segment count */
 	size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
 	size += sizeof(__be32);	/* list discriminator */
 
-	dprintk("RPC:       %s: max reply header size = %u\n",
-		__func__, size);
 	return size;
 }
 
-void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
+/**
+ * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
+ * @ep: endpoint to initialize
+ *
+ * The max_inline fields contain the maximum size of an RPC message
+ * so the marshaling code doesn't have to repeat this calculation
+ * for every RPC.
+ */
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
 {
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	unsigned int maxsegs = ia->ri_max_segs;
+	unsigned int maxsegs = ep->re_max_rdma_segs;
 
-	ia->ri_max_inline_write = cdata->inline_wsize -
-				  rpcrdma_max_call_header_size(maxsegs);
-	ia->ri_max_inline_read = cdata->inline_rsize -
-				 rpcrdma_max_reply_header_size(maxsegs);
+	ep->re_max_inline_send =
+		ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
+	ep->re_max_inline_recv =
+		ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
 }
 
 /* The client can send a request inline as long as the RPCRDMA header
@@ -131,9 +131,10 @@
 				struct rpc_rqst *rqst)
 {
 	struct xdr_buf *xdr = &rqst->rq_snd_buf;
+	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 	unsigned int count, remaining, offset;
 
-	if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
+	if (xdr->len > ep->re_max_inline_send)
 		return false;
 
 	if (xdr->page_len) {
@@ -144,7 +145,7 @@
 			remaining -= min_t(unsigned int,
 					   PAGE_SIZE - offset, remaining);
 			offset = 0;
-			if (++count > r_xprt->rx_ia.ri_max_send_sges)
+			if (++count > ep->re_attr.cap.max_send_sge)
 				return false;
 		}
 	}
@@ -161,9 +162,46 @@
 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 				   struct rpc_rqst *rqst)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
+}
 
-	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
+/* The client is required to provide a Reply chunk if the maximum
+ * size of the non-payload part of the RPC Reply is larger than
+ * the inline threshold.
+ */
+static bool
+rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
+			  const struct rpc_rqst *rqst)
+{
+	const struct xdr_buf *buf = &rqst->rq_rcv_buf;
+
+	return (buf->head[0].iov_len + buf->tail[0].iov_len) <
+		r_xprt->rx_ep->re_max_inline_recv;
+}
+
+/* ACL likes to be lazy in allocating pages. For TCP, these
+ * pages can be allocated during receive processing. Not true
+ * for RDMA, which must always provision receive buffers
+ * up front.
+ */
+static noinline int
+rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
+{
+	struct page **ppages;
+	int len;
+
+	len = buf->page_len;
+	ppages = buf->pages + (buf->page_base >> PAGE_SHIFT);
+	while (len > 0) {
+		if (!*ppages)
+			*ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
+		if (!*ppages)
+			return -ENOBUFS;
+		ppages++;
+		len -= PAGE_SIZE;
+	}
+
+	return 0;
 }
 
 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
@@ -220,14 +258,6 @@
 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 	page_base = offset_in_page(xdrbuf->page_base);
 	while (len) {
-		if (unlikely(!*ppages)) {
-			/* XXX: Certain upper layer operations do
-			 *	not provide receive buffer pages.
-			 */
-			*ppages = alloc_page(GFP_ATOMIC);
-			if (!*ppages)
-				return -ENOBUFS;
-		}
 		seg->mr_page = *ppages;
 		seg->mr_offset = (char *)page_base;
 		seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
@@ -241,7 +271,7 @@
 	/* When encoding a Read chunk, the tail iovec contains an
 	 * XDR pad and may be omitted.
 	 */
-	if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
+	if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup)
 		goto out;
 
 	/* When encoding a Write chunk, some servers need to see an
@@ -249,7 +279,7 @@
 	 * layer provides space in the tail iovec that may be used
 	 * for this purpose.
 	 */
-	if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
+	if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup)
 		goto out;
 
 	if (xdrbuf->tail[0].iov_len)
@@ -261,40 +291,6 @@
 	return n;
 }
 
-static inline int
-encode_item_present(struct xdr_stream *xdr)
-{
-	__be32 *p;
-
-	p = xdr_reserve_space(xdr, sizeof(*p));
-	if (unlikely(!p))
-		return -EMSGSIZE;
-
-	*p = xdr_one;
-	return 0;
-}
-
-static inline int
-encode_item_not_present(struct xdr_stream *xdr)
-{
-	__be32 *p;
-
-	p = xdr_reserve_space(xdr, sizeof(*p));
-	if (unlikely(!p))
-		return -EMSGSIZE;
-
-	*p = xdr_zero;
-	return 0;
-}
-
-static void
-xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
-{
-	*iptr++ = cpu_to_be32(mr->mr_handle);
-	*iptr++ = cpu_to_be32(mr->mr_length);
-	xdr_encode_hyper(iptr, mr->mr_offset);
-}
-
 static int
 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
 {
@@ -304,7 +300,7 @@
 	if (unlikely(!p))
 		return -EMSGSIZE;
 
-	xdr_encode_rdma_segment(p, mr);
+	xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
 	return 0;
 }
 
@@ -319,9 +315,34 @@
 		return -EMSGSIZE;
 
 	*p++ = xdr_one;			/* Item present */
-	*p++ = cpu_to_be32(position);
-	xdr_encode_rdma_segment(p, mr);
+	xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
+				mr->mr_offset);
 	return 0;
+}
+
+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
+						 struct rpcrdma_req *req,
+						 struct rpcrdma_mr_seg *seg,
+						 int nsegs, bool writing,
+						 struct rpcrdma_mr **mr)
+{
+	*mr = rpcrdma_mr_pop(&req->rl_free_mrs);
+	if (!*mr) {
+		*mr = rpcrdma_mr_get(r_xprt);
+		if (!*mr)
+			goto out_getmr_err;
+		trace_xprtrdma_mr_get(req);
+		(*mr)->mr_req = req;
+	}
+
+	rpcrdma_mr_push(*mr, &req->rl_registered);
+	return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
+
+out_getmr_err:
+	trace_xprtrdma_nomrs(req);
+	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+	rpcrdma_mrs_refresh(r_xprt);
+	return ERR_PTR(-EAGAIN);
 }
 
 /* Register and XDR encode the Read list. Supports encoding a list of read
@@ -338,15 +359,19 @@
  *
  * Only a single @pos value is currently supported.
  */
-static noinline int
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req,
+				    struct rpc_rqst *rqst,
+				    enum rpcrdma_chunktype rtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mr *mr;
 	unsigned int pos;
 	int nsegs;
+
+	if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
+		goto done;
 
 	pos = rqst->rq_snd_buf.head[0].iov_len;
 	if (rtype == rpcrdma_areadch)
@@ -358,20 +383,21 @@
 		return nsegs;
 
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   false, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_read_segment(xdr, mr, pos) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
+		trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
 		r_xprt->rx_stats.read_chunk_count++;
 		nsegs -= mr->mr_nents;
 	} while (nsegs);
 
+done:
+	if (xdr_stream_encode_item_absent(xdr) < 0)
+		return -EMSGSIZE;
 	return 0;
 }
 
@@ -390,15 +416,19 @@
  *
  * Only a single Write chunk is currently supported.
  */
-static noinline int
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			  struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req,
+				     struct rpc_rqst *rqst,
+				     enum rpcrdma_chunktype wtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mr *mr;
 	int nsegs, nchunks;
 	__be32 *segcount;
+
+	if (wtype != rpcrdma_writech)
+		goto done;
 
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
@@ -407,7 +437,7 @@
 	if (nsegs < 0)
 		return nsegs;
 
-	if (encode_item_present(xdr) < 0)
+	if (xdr_stream_encode_item_present(xdr) < 0)
 		return -EMSGSIZE;
 	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 	if (unlikely(!segcount))
@@ -416,16 +446,14 @@
 
 	nchunks = 0;
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   true, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_rdma_segment(xdr, mr) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
+		trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
 		r_xprt->rx_stats.write_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 		nchunks++;
@@ -435,6 +463,9 @@
 	/* Update count of segments in this Write chunk */
 	*segcount = cpu_to_be32(nchunks);
 
+done:
+	if (xdr_stream_encode_item_absent(xdr) < 0)
+		return -EMSGSIZE;
 	return 0;
 }
 
@@ -450,9 +481,10 @@
  * Returns zero on success, or a negative errno if a failure occurred.
  * @xdr is advanced to the next position in the stream.
  */
-static noinline int
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			   struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+				      struct rpcrdma_req *req,
+				      struct rpc_rqst *rqst,
+				      enum rpcrdma_chunktype wtype)
 {
 	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
@@ -460,12 +492,18 @@
 	int nsegs, nchunks;
 	__be32 *segcount;
 
+	if (wtype != rpcrdma_replych) {
+		if (xdr_stream_encode_item_absent(xdr) < 0)
+			return -EMSGSIZE;
+		return 0;
+	}
+
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 	if (nsegs < 0)
 		return nsegs;
 
-	if (encode_item_present(xdr) < 0)
+	if (xdr_stream_encode_item_present(xdr) < 0)
 		return -EMSGSIZE;
 	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
 	if (unlikely(!segcount))
@@ -474,16 +512,14 @@
 
 	nchunks = 0;
 	do {
-		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						   true, &mr);
+		seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
 		if (IS_ERR(seg))
 			return PTR_ERR(seg);
-		rpcrdma_mr_push(mr, &req->rl_registered);
 
 		if (encode_rdma_segment(xdr, mr) < 0)
 			return -EMSGSIZE;
 
-		trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
+		trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
 		r_xprt->rx_stats.reply_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += mr->mr_length;
 		nchunks++;
@@ -496,181 +532,265 @@
 	return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
+	struct rpcrdma_rep *rep = req->rl_reply;
+
+	rpcrdma_complete_rqst(rep);
+	rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
- * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
  *
  */
-void
-rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
-	struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
+	struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
 	struct ib_sge *sge;
-	unsigned int count;
+
+	if (!sc->sc_unmap_count)
+		return;
 
 	/* The first two SGEs contain the transport header and
 	 * the inline buffer. These are always left mapped so
 	 * they can be cheaply re-used.
 	 */
-	sge = &sc->sc_sges[2];
-	for (count = sc->sc_unmap_count; count; ++sge, --count)
-		ib_dma_unmap_page(ia->ri_device,
-				  sge->addr, sge->length, DMA_TO_DEVICE);
+	for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
+	     ++sge, --sc->sc_unmap_count)
+		ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
+				  DMA_TO_DEVICE);
 
-	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
-		smp_mb__after_atomic();
-		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
-	}
+	kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
  */
-static bool
-rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
-			u32 len)
+static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req, u32 len)
 {
 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
 	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
-	struct ib_sge *sge = sc->sc_sges;
+	struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
 
-	if (!rpcrdma_dma_map_regbuf(ia, rb))
-		goto out_regbuf;
 	sge->addr = rdmab_addr(rb);
 	sge->length = len;
 	sge->lkey = rdmab_lkey(rb);
 
-	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
-				      sge->length, DMA_TO_DEVICE);
-	sc->sc_wr.num_sge++;
+	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+				      DMA_TO_DEVICE);
+}
+
+/* The head iovec is straightforward, as it is usually already
+ * DMA-mapped. Sync the content that has changed.
+ */
+static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req, unsigned int len)
+{
+	struct rpcrdma_sendctx *sc = req->rl_sendctx;
+	struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
+	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+
+	if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
+		return false;
+
+	sge->addr = rdmab_addr(rb);
+	sge->length = len;
+	sge->lkey = rdmab_lkey(rb);
+
+	ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
+				      DMA_TO_DEVICE);
+	return true;
+}
+
+/* If there is a page list present, DMA map and prepare an
+ * SGE for each page to be sent.
+ */
+static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
+				     struct xdr_buf *xdr)
+{
+	struct rpcrdma_sendctx *sc = req->rl_sendctx;
+	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+	unsigned int page_base, len, remaining;
+	struct page **ppages;
+	struct ib_sge *sge;
+
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	page_base = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		sge = &sc->sc_sges[req->rl_wr.num_sge++];
+		len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
+		sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
+					    page_base, len, DMA_TO_DEVICE);
+		if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
+			goto out_mapping_err;
+
+		sge->length = len;
+		sge->lkey = rdmab_lkey(rb);
+
+		sc->sc_unmap_count++;
+		ppages++;
+		remaining -= len;
+		page_base = 0;
+	}
+
 	return true;
 
-out_regbuf:
-	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+out_mapping_err:
+	trace_xprtrdma_dma_maperr(sge->addr);
 	return false;
 }
 
-/* Prepare the Send SGEs. The head and tail iovec, and each entry
- * in the page list, gets its own SGE.
+/* The tail iovec may include an XDR pad for the page list,
+ * as well as additional content, and may not reside in the
+ * same page as the head iovec.
  */
-static bool
-rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
-			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
+				     struct xdr_buf *xdr,
+				     unsigned int page_base, unsigned int len)
 {
 	struct rpcrdma_sendctx *sc = req->rl_sendctx;
-	unsigned int sge_no, page_base, len, remaining;
+	struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
 	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
-	struct ib_device *device = ia->ri_device;
-	struct ib_sge *sge = sc->sc_sges;
-	u32 lkey = ia->ri_pd->local_dma_lkey;
-	struct page *page, **ppages;
+	struct page *page = virt_to_page(xdr->tail[0].iov_base);
 
-	/* The head iovec is straightforward, as it is already
-	 * DMA-mapped. Sync the content that has changed.
-	 */
-	if (!rpcrdma_dma_map_regbuf(ia, rb))
-		goto out_regbuf;
-	sge_no = 1;
-	sge[sge_no].addr = rdmab_addr(rb);
-	sge[sge_no].length = xdr->head[0].iov_len;
-	sge[sge_no].lkey = rdmab_lkey(rb);
-	ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
-				      sge[sge_no].length, DMA_TO_DEVICE);
+	sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
+				    DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
+		goto out_mapping_err;
 
-	/* If there is a Read chunk, the page list is being handled
-	 * via explicit RDMA, and thus is skipped here. However, the
-	 * tail iovec may include an XDR pad for the page list, as
-	 * well as additional content, and may not reside in the
-	 * same page as the head iovec.
-	 */
-	if (rtype == rpcrdma_readch) {
-		len = xdr->tail[0].iov_len;
-
-		/* Do not include the tail if it is only an XDR pad */
-		if (len < 4)
-			goto out;
-
-		page = virt_to_page(xdr->tail[0].iov_base);
-		page_base = offset_in_page(xdr->tail[0].iov_base);
-
-		/* If the content in the page list is an odd length,
-		 * xdr_write_pages() has added a pad at the beginning
-		 * of the tail iovec. Force the tail's non-pad content
-		 * to land at the next XDR position in the Send message.
-		 */
-		page_base += len & 3;
-		len -= len & 3;
-		goto map_tail;
-	}
-
-	/* If there is a page list present, temporarily DMA map
-	 * and prepare an SGE for each page to be sent.
-	 */
-	if (xdr->page_len) {
-		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
-		page_base = offset_in_page(xdr->page_base);
-		remaining = xdr->page_len;
-		while (remaining) {
-			sge_no++;
-			if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
-				goto out_mapping_overflow;
-
-			len = min_t(u32, PAGE_SIZE - page_base, remaining);
-			sge[sge_no].addr = ib_dma_map_page(device, *ppages,
-							   page_base, len,
-							   DMA_TO_DEVICE);
-			if (ib_dma_mapping_error(device, sge[sge_no].addr))
-				goto out_mapping_err;
-			sge[sge_no].length = len;
-			sge[sge_no].lkey = lkey;
-
-			sc->sc_unmap_count++;
-			ppages++;
-			remaining -= len;
-			page_base = 0;
-		}
-	}
-
-	/* The tail iovec is not always constructed in the same
-	 * page where the head iovec resides (see, for example,
-	 * gss_wrap_req_priv). To neatly accommodate that case,
-	 * DMA map it separately.
-	 */
-	if (xdr->tail[0].iov_len) {
-		page = virt_to_page(xdr->tail[0].iov_base);
-		page_base = offset_in_page(xdr->tail[0].iov_base);
-		len = xdr->tail[0].iov_len;
-
-map_tail:
-		sge_no++;
-		sge[sge_no].addr = ib_dma_map_page(device, page,
-						   page_base, len,
-						   DMA_TO_DEVICE);
-		if (ib_dma_mapping_error(device, sge[sge_no].addr))
-			goto out_mapping_err;
-		sge[sge_no].length = len;
-		sge[sge_no].lkey = lkey;
-		sc->sc_unmap_count++;
-	}
-
-out:
-	sc->sc_wr.num_sge += sge_no;
-	if (sc->sc_unmap_count)
-		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+	sge->length = len;
+	sge->lkey = rdmab_lkey(rb);
+	++sc->sc_unmap_count;
 	return true;
 
-out_regbuf:
-	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
-	return false;
-
-out_mapping_overflow:
-	rpcrdma_unmap_sendctx(sc);
-	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
-	return false;
-
 out_mapping_err:
-	rpcrdma_unmap_sendctx(sc);
-	pr_err("rpcrdma: Send mapping error\n");
+	trace_xprtrdma_dma_maperr(sge->addr);
 	return false;
+}
+
+/* Copy the tail to the end of the head buffer.
+ */
+static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req,
+				    struct xdr_buf *xdr)
+{
+	unsigned char *dst;
+
+	dst = (unsigned char *)xdr->head[0].iov_base;
+	dst += xdr->head[0].iov_len + xdr->page_len;
+	memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
+	r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
+}
+
+/* Copy pagelist content into the head buffer.
+ */
+static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
+				    struct rpcrdma_req *req,
+				    struct xdr_buf *xdr)
+{
+	unsigned int len, page_base, remaining;
+	struct page **ppages;
+	unsigned char *src, *dst;
+
+	dst = (unsigned char *)xdr->head[0].iov_base;
+	dst += xdr->head[0].iov_len;
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	page_base = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		src = page_address(*ppages);
+		src += page_base;
+		len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
+		memcpy(dst, src, len);
+		r_xprt->rx_stats.pullup_copy_count += len;
+
+		ppages++;
+		dst += len;
+		remaining -= len;
+		page_base = 0;
+	}
+}
+
+/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
+ * When the head, pagelist, and tail are small, a pull-up copy
+ * is considerably less costly than DMA mapping the components
+ * of @xdr.
+ *
+ * Assumptions:
+ *  - the caller has already verified that the total length
+ *    of the RPC Call body will fit into @rl_sendbuf.
+ */
+static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
+					struct rpcrdma_req *req,
+					struct xdr_buf *xdr)
+{
+	if (unlikely(xdr->tail[0].iov_len))
+		rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
+
+	if (unlikely(xdr->page_len))
+		rpcrdma_pullup_pagelist(r_xprt, req, xdr);
+
+	/* The whole RPC message resides in the head iovec now */
+	return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
+}
+
+static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
+					struct rpcrdma_req *req,
+					struct xdr_buf *xdr)
+{
+	struct kvec *tail = &xdr->tail[0];
+
+	if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
+		return false;
+	if (xdr->page_len)
+		if (!rpcrdma_prepare_pagelist(req, xdr))
+			return false;
+	if (tail->iov_len)
+		if (!rpcrdma_prepare_tail_iov(req, xdr,
+					      offset_in_page(tail->iov_base),
+					      tail->iov_len))
+			return false;
+
+	if (req->rl_sendctx->sc_unmap_count)
+		kref_get(&req->rl_kref);
+	return true;
+}
+
+static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
+				   struct rpcrdma_req *req,
+				   struct xdr_buf *xdr)
+{
+	if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
+		return false;
+
+	/* If there is a Read chunk, the page list is being handled
+	 * via explicit RDMA, and thus is skipped here.
+	 */
+
+	/* Do not include the tail if it is only an XDR pad */
+	if (xdr->tail[0].iov_len > 3) {
+		unsigned int page_base, len;
+
+		/* If the content in the page list is an odd length,
+		 * xdr_write_pages() adds a pad at the beginning of
+		 * the tail iovec. Force the tail's non-pad content to
+		 * land at the next XDR position in the Send message.
+		 */
+		page_base = offset_in_page(xdr->tail[0].iov_base);
+		len = xdr->tail[0].iov_len;
+		page_base += len & 3;
+		len -= len & 3;
+		if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
+			return false;
+		kref_get(&req->rl_kref);
+	}
+
+	return true;
 }
 
 /**
@@ -683,27 +803,54 @@
  *
  * Returns 0 on success; otherwise a negative errno is returned.
  */
-int
-rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
-			  struct rpcrdma_req *req, u32 hdrlen,
-			  struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
+				     struct rpcrdma_req *req, u32 hdrlen,
+				     struct xdr_buf *xdr,
+				     enum rpcrdma_chunktype rtype)
 {
-	req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+	int ret;
+
+	ret = -EAGAIN;
+	req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
 	if (!req->rl_sendctx)
-		return -EAGAIN;
-	req->rl_sendctx->sc_wr.num_sge = 0;
+		goto out_nosc;
 	req->rl_sendctx->sc_unmap_count = 0;
 	req->rl_sendctx->sc_req = req;
-	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+	kref_init(&req->rl_kref);
+	req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
+	req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
+	req->rl_wr.num_sge = 0;
+	req->rl_wr.opcode = IB_WR_SEND;
 
-	if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
-		return -EIO;
+	rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
 
-	if (rtype != rpcrdma_areadch)
-		if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
-			return -EIO;
+	ret = -EIO;
+	switch (rtype) {
+	case rpcrdma_noch_pullup:
+		if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
+			goto out_unmap;
+		break;
+	case rpcrdma_noch_mapped:
+		if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
+			goto out_unmap;
+		break;
+	case rpcrdma_readch:
+		if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
+			goto out_unmap;
+		break;
+	case rpcrdma_areadch:
+		break;
+	default:
+		goto out_unmap;
+	}
 
 	return 0;
+
+out_unmap:
+	rpcrdma_sendctx_unmap(req->rl_sendctx);
+out_nosc:
+	trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+	return ret;
 }
 
 /**
@@ -731,13 +878,20 @@
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct xdr_stream *xdr = &req->rl_stream;
 	enum rpcrdma_chunktype rtype, wtype;
+	struct xdr_buf *buf = &rqst->rq_snd_buf;
 	bool ddp_allowed;
 	__be32 *p;
 	int ret;
 
+	if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
+		ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf);
+		if (ret)
+			return ret;
+	}
+
 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
-	xdr_init_encode(xdr, &req->rl_hdrbuf,
-			req->rl_rdmabuf->rg_base);
+	xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
+			rqst);
 
 	/* Fixed header fields */
 	ret = -EMSGSIZE;
@@ -746,14 +900,14 @@
 		goto out_err;
 	*p++ = rqst->rq_xid;
 	*p++ = rpcrdma_version;
-	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+	*p++ = r_xprt->rx_buf.rb_max_requests;
 
 	/* When the ULP employs a GSS flavor that guarantees integrity
 	 * or privacy, direct data placement of individual data items
 	 * is not allowed.
 	 */
-	ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
-						RPCAUTH_AUTH_DATATOUCH);
+	ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
+				&rqst->rq_cred->cr_auth->au_flags);
 
 	/*
 	 * Chunks needed for results?
@@ -766,7 +920,8 @@
 	 */
 	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
-	else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
+	else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
+		 rpcrdma_nonpayload_inline(r_xprt, rqst))
 		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
@@ -787,25 +942,15 @@
 	 */
 	if (rpcrdma_args_inline(r_xprt, rqst)) {
 		*p++ = rdma_msg;
-		rtype = rpcrdma_noch;
-	} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
+		rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
+			rpcrdma_noch_pullup : rpcrdma_noch_mapped;
+	} else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
 		*p++ = rdma_msg;
 		rtype = rpcrdma_readch;
 	} else {
 		r_xprt->rx_stats.nomsg_call_count++;
 		*p++ = rdma_nomsg;
 		rtype = rpcrdma_areadch;
-	}
-
-	/* If this is a retransmit, discard previously registered
-	 * chunks. Very likely the connection has been replaced,
-	 * so these registrations are invalid and unusable.
-	 */
-	while (unlikely(!list_empty(&req->rl_registered))) {
-		struct rpcrdma_mr *mr;
-
-		mr = rpcrdma_mr_pop(&req->rl_registered);
-		rpcrdma_mr_defer_recovery(mr);
 	}
 
 	/* This implementation supports the following combinations
@@ -830,50 +975,63 @@
 	 * send a Call message with a Position Zero Read chunk and a
 	 * regular Read chunk at the same time.
 	 */
-	if (rtype != rpcrdma_noch) {
-		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
+	ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+	if (ret)
+		goto out_err;
+	ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
 	if (ret)
 		goto out_err;
 
-	if (wtype == rpcrdma_writech) {
-		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
-		if (ret)
-			goto out_err;
-	}
-	ret = encode_item_not_present(xdr);
+	ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
+					buf, rtype);
 	if (ret)
 		goto out_err;
 
-	if (wtype != rpcrdma_replych)
-		ret = encode_item_not_present(xdr);
-	else
-		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
-	if (ret)
-		goto out_err;
-
-	trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
-	ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
-					&rqst->rq_snd_buf, rtype);
-	if (ret)
-		goto out_err;
+	trace_xprtrdma_marshal(req, rtype, wtype);
 	return 0;
 
 out_err:
-	switch (ret) {
-	case -EAGAIN:
-		xprt_wait_for_buffer_space(rqst->rq_task, NULL);
-		break;
-	case -ENOBUFS:
-		break;
-	default:
-		r_xprt->rx_stats.failed_marshal_count++;
-	}
+	trace_xprtrdma_marshal_failed(rqst, ret);
+	r_xprt->rx_stats.failed_marshal_count++;
+	frwr_reset(req);
 	return ret;
+}
+
+static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
+					 struct rpcrdma_buffer *buf,
+					 u32 grant)
+{
+	buf->rb_credits = grant;
+	xprt->cwnd = grant << RPC_CWNDSHIFT;
+}
+
+static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
+{
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+	spin_lock(&xprt->transport_lock);
+	__rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
+	spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * rpcrdma_reset_cwnd - Reset the xprt's congestion window
+ * @r_xprt: controlling transport instance
+ *
+ * Prepare @r_xprt for the next connection by reinitializing
+ * its credit grant to one (see RFC 8166, Section 3.3.3).
+ */
+void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
+{
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+	spin_lock(&xprt->transport_lock);
+	xprt->cong = 0;
+	__rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
+	spin_unlock(&xprt->transport_lock);
 }
 
 /**
@@ -915,7 +1073,6 @@
 	curlen = rqst->rq_rcv_buf.head[0].iov_len;
 	if (curlen > copy_len)
 		curlen = copy_len;
-	trace_xprtrdma_fixup(rqst, copy_len, curlen);
 	srcp += curlen;
 	copy_len -= curlen;
 
@@ -935,8 +1092,6 @@
 			if (curlen > pagelist_len)
 				curlen = pagelist_len;
 
-			trace_xprtrdma_fixup_pg(rqst, i, srcp,
-						copy_len, curlen);
 			destp = kmap_atomic(ppages[i]);
 			memcpy(destp + page_base, srcp, curlen);
 			flush_dcache_page(ppages[i]);
@@ -968,6 +1123,8 @@
 		rqst->rq_private_buf.tail[0].iov_base = srcp;
 	}
 
+	if (fixup_copy_count)
+		trace_xprtrdma_fixup(rqst, fixup_copy_count);
 	return fixup_copy_count;
 }
 
@@ -980,6 +1137,7 @@
 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 {
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct xdr_stream *xdr = &rep->rr_stream;
 	__be32 *p;
 
@@ -990,17 +1148,21 @@
 	p = xdr_inline_decode(xdr, 0);
 
 	/* Chunk lists */
-	if (*p++ != xdr_zero)
+	if (xdr_item_is_present(p++))
 		return false;
-	if (*p++ != xdr_zero)
+	if (xdr_item_is_present(p++))
 		return false;
-	if (*p++ != xdr_zero)
+	if (xdr_item_is_present(p++))
 		return false;
 
 	/* RPC header */
 	if (*p++ != rep->rr_xid)
 		return false;
 	if (*p != cpu_to_be32(RPC_CALL))
+		return false;
+
+	/* No bc service. */
+	if (xprt->bc_serv == NULL)
 		return false;
 
 	/* Now that we are sure this is a backchannel call,
@@ -1033,10 +1195,7 @@
 	if (unlikely(!p))
 		return -EIO;
 
-	handle = be32_to_cpup(p++);
-	*length = be32_to_cpup(p++);
-	xdr_decode_hyper(p, &offset);
-
+	xdr_decode_rdma_segment(p, &handle, length, &offset);
 	trace_xprtrdma_decode_seg(handle, *length, offset);
 	return 0;
 }
@@ -1072,7 +1231,7 @@
 	p = xdr_inline_decode(xdr, sizeof(*p));
 	if (unlikely(!p))
 		return -EIO;
-	if (unlikely(*p != xdr_zero))
+	if (unlikely(xdr_item_is_present(p)))
 		return -EIO;
 	return 0;
 }
@@ -1091,7 +1250,7 @@
 		p = xdr_inline_decode(xdr, sizeof(*p));
 		if (unlikely(!p))
 			return -EIO;
-		if (*p == xdr_zero)
+		if (xdr_item_is_absent(p))
 			break;
 		if (!first)
 			return -EIO;
@@ -1113,7 +1272,7 @@
 		return -EIO;
 
 	*length = 0;
-	if (*p != xdr_zero)
+	if (xdr_item_is_present(p))
 		if (decode_write_chunk(xdr, length))
 			return -EIO;
 	return 0;
@@ -1190,21 +1349,23 @@
 		p = xdr_inline_decode(xdr, 2 * sizeof(*p));
 		if (!p)
 			break;
-		dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
-			rqst->rq_task->tk_pid, __func__,
-			be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+		dprintk("RPC:       %s: server reports "
+			"version error (%u-%u), xid %08x\n", __func__,
+			be32_to_cpup(p), be32_to_cpu(*(p + 1)),
+			be32_to_cpu(rep->rr_xid));
 		break;
 	case err_chunk:
-		dprintk("RPC: %5u: %s: server reports header decoding error\n",
-			rqst->rq_task->tk_pid, __func__);
+		dprintk("RPC:       %s: server reports "
+			"header decoding error, xid %08x\n", __func__,
+			be32_to_cpu(rep->rr_xid));
 		break;
 	default:
-		dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
-			rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+		dprintk("RPC:       %s: server reports "
+			"unrecognized error %d, xid %08x\n", __func__,
+			be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
 	}
 
-	r_xprt->rx_stats.bad_reply_count++;
-	return -EREMOTEIO;
+	return -EIO;
 }
 
 /* Perform XID lookup, reconstruction of the RPC reply, and
@@ -1216,10 +1377,7 @@
 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpc_rqst *rqst = rep->rr_rqst;
-	unsigned long cwnd;
 	int status;
-
-	xprt->reestablish_timeout = 0;
 
 	switch (rep->rr_proc) {
 	case rdma_msg:
@@ -1238,74 +1396,31 @@
 		goto out_badheader;
 
 out:
-	spin_lock(&xprt->recv_lock);
-	cwnd = xprt->cwnd;
-	xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
-	if (xprt->cwnd > cwnd)
-		xprt_release_rqst_cong(rqst->rq_task);
-
+	spin_lock(&xprt->queue_lock);
 	xprt_complete_rqst(rqst->rq_task, status);
 	xprt_unpin_rqst(rqst);
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	return;
 
-/* If the incoming reply terminated a pending RPC, the next
- * RPC call will post a replacement receive buffer as it is
- * being marshaled.
- */
 out_badheader:
 	trace_xprtrdma_reply_hdr(rep);
 	r_xprt->rx_stats.bad_reply_count++;
-	status = -EIO;
+	rqst->rq_task->tk_status = status;
+	status = 0;
 	goto out;
 }
 
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-	/* Invalidate and unmap the data payloads before waking
-	 * the waiting application. This guarantees the memory
-	 * regions are properly fenced from the server before the
-	 * application accesses the data. It also ensures proper
-	 * send flow control: waking the next RPC waits until this
-	 * RPC has relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&req->rl_registered))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
-						    &req->rl_registered);
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
 
-	/* Ensure that any DMA mapped pages associated with
-	 * the Send of the RPC Call have been unmapped before
-	 * allowing the RPC to complete. This protects argument
-	 * memory not controlled by the RPC client from being
-	 * re-used before we're done with it.
-	 */
-	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-		r_xprt->rx_stats.reply_waits_for_send++;
-		out_of_line_wait_on_bit(&req->rl_flags,
-					RPCRDMA_REQ_F_TX_RESOURCES,
-					bit_wait,
-					TASK_UNINTERRUPTIBLE);
-	}
+	rpcrdma_complete_rqst(req->rl_reply);
 }
 
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
-{
-	struct rpcrdma_rep *rep =
-			container_of(work, struct rpcrdma_rep, rr_work);
-	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-
-	trace_xprtrdma_defer_cmp(rep);
-	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
-		r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
-	rpcrdma_release_rqst(r_xprt, req);
-	rpcrdma_complete_rqst(rep);
-}
-
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
  *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
@@ -1320,14 +1435,15 @@
 	u32 credits;
 	__be32 *p;
 
-	--buf->rb_posted_receives;
-
-	if (rep->rr_hdrbuf.head[0].iov_len == 0)
-		goto out_badstatus;
+	/* Any data means we had a useful conversation, so
+	 * then we don't need to delay the next reconnect.
+	 */
+	if (xprt->reestablish_timeout)
+		xprt->reestablish_timeout = 0;
 
 	/* Fixed transport header fields */
 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
-			rep->rr_hdrbuf.head[0].iov_base);
+			rep->rr_hdrbuf.head[0].iov_base, NULL);
 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
 	if (unlikely(!p))
 		goto out_shortreply;
@@ -1345,19 +1461,21 @@
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock(&xprt->recv_lock);
+	spin_lock(&xprt->queue_lock);
 	rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
 	if (!rqst)
 		goto out_norqst;
 	xprt_pin_rqst(rqst);
+	spin_unlock(&xprt->queue_lock);
 
 	if (credits == 0)
 		credits = 1;	/* don't deadlock */
-	else if (credits > buf->rb_max_requests)
-		credits = buf->rb_max_requests;
-	buf->rb_credits = credits;
-
-	spin_unlock(&xprt->recv_lock);
+	else if (credits > r_xprt->rx_ep->re_max_requests)
+		credits = r_xprt->rx_ep->re_max_requests;
+	rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1),
+			   false);
+	if (buf->rb_credits != credits)
+		rpcrdma_update_cwnd(r_xprt, credits);
 
 	req = rpcr_to_rdmar(rqst);
 	if (req->rl_reply) {
@@ -1366,34 +1484,30 @@
 	}
 	req->rl_reply = rep;
 	rep->rr_rqst = rqst;
-	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
 
 	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
 
-	rpcrdma_post_recvs(r_xprt, false);
-	queue_work(rpcrdma_receive_wq, &rep->rr_work);
+	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+		frwr_reminv(rep, &req->rl_registered);
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_async(r_xprt, req);
+		/* LocalInv completion will complete the RPC */
+	else
+		kref_put(&req->rl_kref, rpcrdma_reply_done);
 	return;
 
 out_badversion:
 	trace_xprtrdma_reply_vers(rep);
-	goto repost;
+	goto out;
 
-/* The RPC transaction has already been terminated, or the header
- * is corrupt.
- */
 out_norqst:
-	spin_unlock(&xprt->recv_lock);
+	spin_unlock(&xprt->queue_lock);
 	trace_xprtrdma_reply_rqst(rep);
-	goto repost;
+	goto out;
 
 out_shortreply:
 	trace_xprtrdma_reply_short(rep);
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
- */
-repost:
-	rpcrdma_post_recvs(r_xprt, false);
-out_badstatus:
+out:
 	rpcrdma_recv_buffer_put(rep);
 }

--
Gitblit v1.6.2