From 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Tue, 22 Oct 2024 10:36:11 +0000
Subject: [PATCH] 修改4g拨号为QMI,需要在系统里后台执行quectel-CM

---
 kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c |  695 ++++++++++++++++++++++++++++++++-------------------------
 1 files changed, 386 insertions(+), 309 deletions(-)

diff --git a/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 4062cd6..c3d588b 100644
--- a/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -106,7 +106,6 @@
 #include <rdma/rdma_cm.h>
 
 #include <linux/sunrpc/debug.h>
-#include <linux/sunrpc/rpc_rdma.h>
 #include <linux/sunrpc/svc_rdma.h>
 
 #include "xprt_rdma.h"
@@ -121,6 +120,13 @@
 {
 	return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
 					sc_list);
+}
+
+static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
+				   struct rpc_rdma_cid *cid)
+{
+	cid->ci_queue_id = rdma->sc_sq_cq->res.id;
+	cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
 }
 
 static struct svc_rdma_send_ctxt *
@@ -145,12 +151,16 @@
 	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
 		goto fail2;
 
+	svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
+
 	ctxt->sc_send_wr.next = NULL;
 	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
 	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
 	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
 	ctxt->sc_cqe.done = svc_rdma_wc_send;
 	ctxt->sc_xprt_buf = buffer;
+	xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+		     rdma->sc_max_req_size);
 	ctxt->sc_sges[0].addr = addr;
 
 	for (i = 0; i < rdma->sc_max_send_sges; i++)
@@ -204,6 +214,10 @@
 	spin_unlock(&rdma->sc_send_lock);
 
 out:
+	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+			ctxt->sc_xprt_buf, NULL);
+
 	ctxt->sc_send_wr.num_sge = 0;
 	ctxt->sc_cur_sge_no = 0;
 	ctxt->sc_page_count = 0;
@@ -233,11 +247,15 @@
 	/* The first SGE contains the transport header, which
 	 * remains mapped until @ctxt is destroyed.
 	 */
-	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
+	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
 		ib_dma_unmap_page(device,
 				  ctxt->sc_sges[i].addr,
 				  ctxt->sc_sges[i].length,
 				  DMA_TO_DEVICE);
+		trace_svcrdma_dma_unmap_page(rdma,
+					     ctxt->sc_sges[i].addr,
+					     ctxt->sc_sges[i].length);
+	}
 
 	for (i = 0; i < ctxt->sc_page_count; ++i)
 		put_page(ctxt->sc_pages[i]);
@@ -259,41 +277,42 @@
 {
 	struct svcxprt_rdma *rdma = cq->cq_context;
 	struct ib_cqe *cqe = wc->wr_cqe;
-	struct svc_rdma_send_ctxt *ctxt;
+	struct svc_rdma_send_ctxt *ctxt =
+		container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
 
-	trace_svcrdma_wc_send(wc);
+	trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
 
 	atomic_inc(&rdma->sc_sq_avail);
 	wake_up(&rdma->sc_send_wait);
 
-	ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
 	svc_rdma_send_ctxt_put(rdma, ctxt);
 
 	if (unlikely(wc->status != IB_WC_SUCCESS)) {
 		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 		svc_xprt_enqueue(&rdma->sc_xprt);
-		if (wc->status != IB_WC_WR_FLUSH_ERR)
-			pr_err("svcrdma: Send: %s (%u/0x%x)\n",
-			       ib_wc_status_msg(wc->status),
-			       wc->status, wc->vendor_err);
 	}
-
-	svc_xprt_put(&rdma->sc_xprt);
 }
 
 /**
  * svc_rdma_send - Post a single Send WR
  * @rdma: transport on which to post the WR
- * @wr: prepared Send WR to post
+ * @ctxt: send ctxt with a Send WR ready to post
  *
  * Returns zero the Send WR was posted successfully. Otherwise, a
  * negative errno is returned.
  */
-int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
 {
+	struct ib_send_wr *wr = &ctxt->sc_send_wr;
 	int ret;
 
 	might_sleep();
+
+	/* Sync the transport header buffer */
+	ib_dma_sync_single_for_device(rdma->sc_pd->device,
+				      wr->sg_list[0].addr,
+				      wr->sg_list[0].length,
+				      DMA_TO_DEVICE);
 
 	/* If the SQ is full, wait until an SQ entry is available */
 	while (1) {
@@ -309,8 +328,7 @@
 			continue;
 		}
 
-		svc_xprt_get(&rdma->sc_xprt);
-		trace_svcrdma_post_send(wr);
+		trace_svcrdma_post_send(ctxt);
 		ret = ib_post_send(rdma->sc_qp, wr, NULL);
 		if (ret)
 			break;
@@ -319,197 +337,173 @@
 
 	trace_svcrdma_sq_post_err(rdma, ret);
 	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-	svc_xprt_put(&rdma->sc_xprt);
 	wake_up(&rdma->sc_send_wait);
 	return ret;
 }
 
-static u32 xdr_padsize(u32 len)
-{
-	return (len & 3) ? (4 - (len & 3)) : 0;
-}
-
-/* Returns length of transport header, in bytes.
- */
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
-{
-	unsigned int nsegs;
-	__be32 *p;
-
-	p = rdma_resp;
-
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p += rpcrdma_fixed_maxsz + 1;
-
-	/* Skip Write list. */
-	while (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
-
-	/* Skip Reply chunk. */
-	if (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
-
-	return (unsigned long)p - (unsigned long)rdma_resp;
-}
-
-/* One Write chunk is copied from Call transport header to Reply
- * transport header. Each segment's length field is updated to
- * reflect number of bytes consumed in the segment.
+/**
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+ * @sctxt: Send context for the RPC Reply
  *
- * Returns number of segments in this chunk.
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Reply Read list
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
+{
+	/* RPC-over-RDMA version 1 replies never have a Read list. */
+	return xdr_stream_encode_item_absent(&sctxt->sc_stream);
+}
+
+/**
+ * svc_rdma_encode_write_segment - Encode one Write segment
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: remaining bytes of the payload left in the Write chunk
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Write segment
+ *   %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_write_segment(__be32 *src,
+					     struct svc_rdma_send_ctxt *sctxt,
+					     unsigned int *remaining)
+{
+	__be32 *p;
+	const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
+	u32 handle, length;
+	u64 offset;
+
+	p = xdr_reserve_space(&sctxt->sc_stream, len);
+	if (!p)
+		return -EMSGSIZE;
+
+	xdr_decode_rdma_segment(src, &handle, &length, &offset);
+
+	if (*remaining < length) {
+		/* segment only partly filled */
+		length = *remaining;
+		*remaining = 0;
+	} else {
+		/* entire segment was consumed */
+		*remaining -= length;
+	}
+	xdr_encode_rdma_segment(p, handle, length, offset);
+
+	trace_svcrdma_encode_wseg(handle, length, offset);
+	return len;
+}
+
+/**
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: size in bytes of the payload in the Write chunk
+ *
+ * Copy a Write chunk from the Call transport header to the
+ * Reply transport header. Update each segment's length field
+ * to reflect the number of bytes written in that segment.
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Write chunk
+ *   %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
+					   struct svc_rdma_send_ctxt *sctxt,
 					   unsigned int remaining)
 {
 	unsigned int i, nsegs;
-	u32 seg_len;
+	ssize_t len, ret;
 
-	/* Write list discriminator */
-	*dst++ = *src++;
+	len = 0;
+	trace_svcrdma_encode_write_chunk(remaining);
 
-	/* number of segments in this chunk */
-	nsegs = be32_to_cpup(src);
-	*dst++ = *src++;
+	src++;
+	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
+	if (ret < 0)
+		return -EMSGSIZE;
+	len += ret;
+
+	nsegs = be32_to_cpup(src++);
+	ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+	if (ret < 0)
+		return -EMSGSIZE;
+	len += ret;
 
 	for (i = nsegs; i; i--) {
-		/* segment's RDMA handle */
-		*dst++ = *src++;
-
-		/* bytes returned in this segment */
-		seg_len = be32_to_cpu(*src);
-		if (remaining >= seg_len) {
-			/* entire segment was consumed */
-			*dst = *src;
-			remaining -= seg_len;
-		} else {
-			/* segment only partly filled */
-			*dst = cpu_to_be32(remaining);
-			remaining = 0;
-		}
-		dst++; src++;
-
-		/* segment's RDMA offset */
-		*dst++ = *src++;
-		*dst++ = *src++;
+		ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+		if (ret < 0)
+			return -EMSGSIZE;
+		src += rpcrdma_segment_maxsz;
+		len += ret;
 	}
 
-	return nsegs;
+	return len;
 }
 
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
+/**
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the first Write chunk
+ *
+ * The client provides a Write chunk list in the Call message. Fill
+ * in the segments in the first Write chunk in the Reply's transport
  * header with the number of bytes consumed in each segment.
  * Remaining chunks are returned unused.
  *
  * Assumptions:
  *  - Client has provided only one Write chunk
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Reply's Write list
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
-					   unsigned int consumed)
+static ssize_t
+svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
+			   struct svc_rdma_send_ctxt *sctxt,
+			   unsigned int length)
 {
-	unsigned int nsegs;
-	__be32 *p, *q;
+	ssize_t len, ret;
 
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+	ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
+	if (ret < 0)
+		return ret;
+	len = ret;
 
-	q = wr_ch;
-	while (*q != xdr_zero) {
-		nsegs = xdr_encode_write_chunk(p, q, consumed);
-		q += 2 + nsegs * rpcrdma_segment_maxsz;
-		p += 2 + nsegs * rpcrdma_segment_maxsz;
-		consumed = 0;
-	}
+	/* Terminate the Write list */
+	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
+	if (ret < 0)
+		return ret;
 
-	/* Terminate Write list */
-	*p++ = xdr_zero;
-
-	/* Reply chunk discriminator; may be replaced later */
-	*p = xdr_zero;
+	return len + ret;
 }
 
-/* The client provided a Reply chunk in the Call message. Fill in
- * the segments in the Reply chunk in the Reply message with the
- * number of bytes consumed in each segment.
+/**
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the Reply chunk
  *
  * Assumptions:
- * - Reply can always fit in the provided Reply chunk
- */
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
-					    unsigned int consumed)
-{
-	__be32 *p;
-
-	/* Find the Reply chunk in the Reply's xprt header.
-	 * RPC-over-RDMA V1 replies never have a Read list.
-	 */
-	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
-	/* Skip past Write list */
-	while (*p++ != xdr_zero)
-		p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
-
-	xdr_encode_write_chunk(p, rp_ch, consumed);
-}
-
-/* Parse the RPC Call's transport header.
- */
-static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
-				      __be32 **write, __be32 **reply)
-{
-	__be32 *p;
-
-	p = rdma_argp + rpcrdma_fixed_maxsz;
-
-	/* Read list */
-	while (*p++ != xdr_zero)
-		p += 5;
-
-	/* Write list */
-	if (*p != xdr_zero) {
-		*write = p;
-		while (*p++ != xdr_zero)
-			p += 1 + be32_to_cpu(*p) * 4;
-	} else {
-		*write = NULL;
-		p++;
-	}
-
-	/* Reply chunk */
-	if (*p != xdr_zero)
-		*reply = p;
-	else
-		*reply = NULL;
-}
-
-/* RPC-over-RDMA Version One private extension: Remote Invalidation.
- * Responder's choice: requester signals it can handle Send With
- * Invalidate, and responder chooses one rkey to invalidate.
+ * - Reply can always fit in the client-provided Reply chunk
  *
- * Find a candidate rkey to invalidate when sending a reply.  Picks the
- * first R_key it finds in the chunk lists.
- *
- * Returns zero if RPC's chunk lists are empty.
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Reply's Reply chunk
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
-				 __be32 *wr_lst, __be32 *rp_ch)
+static ssize_t
+svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+			    struct svc_rdma_send_ctxt *sctxt,
+			    unsigned int length)
 {
-	__be32 *p;
-
-	p = rdma_argp + rpcrdma_fixed_maxsz;
-	if (*p != xdr_zero)
-		p += 2;
-	else if (wr_lst && be32_to_cpup(wr_lst + 1))
-		p = wr_lst + 2;
-	else if (rp_ch && be32_to_cpup(rp_ch + 1))
-		p = rp_ch + 2;
-	else
-		return 0;
-	return be32_to_cpup(p);
+	return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
+					   length);
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -522,6 +516,7 @@
 	dma_addr_t dma_addr;
 
 	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
+	trace_svcrdma_dma_map_page(rdma, dma_addr, len);
 	if (ib_dma_mapping_error(dev, dma_addr))
 		goto out_maperr;
 
@@ -531,7 +526,6 @@
 	return 0;
 
 out_maperr:
-	trace_svcrdma_dma_map_page(rdma, page);
 	return -EIO;
 }
 
@@ -548,38 +542,36 @@
 }
 
 /**
- * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
+ * svc_rdma_pull_up_needed - Determine whether to use pull-up
  * @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
- * @len: length of transport header
+ * @sctxt: send_ctxt for the Send WR
+ * @rctxt: Write and Reply chunks provided by client
+ * @xdr: xdr_buf containing RPC message to transmit
  *
- */
-void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
-			     struct svc_rdma_send_ctxt *ctxt,
-			     unsigned int len)
-{
-	ctxt->sc_sges[0].length = len;
-	ctxt->sc_send_wr.num_sge++;
-	ib_dma_sync_single_for_device(rdma->sc_pd->device,
-				      ctxt->sc_sges[0].addr, len,
-				      DMA_TO_DEVICE);
-}
-
-/* If the xdr_buf has more elements than the device can
- * transmit in a single RDMA Send, then the reply will
- * have to be copied into a bounce buffer.
+ * Returns:
+ *	%true if pull-up must be used
+ *	%false otherwise
  */
 static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
-				    struct xdr_buf *xdr,
-				    __be32 *wr_lst)
+				    struct svc_rdma_send_ctxt *sctxt,
+				    const struct svc_rdma_recv_ctxt *rctxt,
+				    struct xdr_buf *xdr)
 {
 	int elements;
 
+	/* For small messages, copying bytes is cheaper than DMA mapping.
+	 */
+	if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+		return true;
+
+	/* Check whether the xdr_buf has more elements than can
+	 * fit in a single RDMA Send.
+	 */
 	/* xdr->head */
 	elements = 1;
 
 	/* xdr->pages */
-	if (!wr_lst) {
+	if (!rctxt || !rctxt->rc_write_list) {
 		unsigned int remaining;
 		unsigned long pageoff;
 
@@ -601,29 +593,36 @@
 	return elements >= rdma->sc_max_send_sges;
 }
 
-/* The device is not capable of sending the reply directly.
- * Assemble the elements of @xdr into the transport header
- * buffer.
+/**
+ * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
+ * @rdma: controlling transport
+ * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
+ * @rctxt: Write and Reply chunks provided by client
+ * @xdr: prepared xdr_buf containing RPC message
+ *
+ * The device is not capable of sending the reply directly.
+ * Assemble the elements of @xdr into the transport header buffer.
+ *
+ * Returns zero on success, or a negative errno on failure.
  */
 static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
-				      struct svc_rdma_send_ctxt *ctxt,
-				      struct xdr_buf *xdr, __be32 *wr_lst)
+				      struct svc_rdma_send_ctxt *sctxt,
+				      const struct svc_rdma_recv_ctxt *rctxt,
+				      const struct xdr_buf *xdr)
 {
 	unsigned char *dst, *tailbase;
 	unsigned int taillen;
 
-	dst = ctxt->sc_xprt_buf;
-	dst += ctxt->sc_sges[0].length;
-
+	dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
 	memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
 	dst += xdr->head[0].iov_len;
 
 	tailbase = xdr->tail[0].iov_base;
 	taillen = xdr->tail[0].iov_len;
-	if (wr_lst) {
+	if (rctxt && rctxt->rc_write_list) {
 		u32 xdrpad;
 
-		xdrpad = xdr_padsize(xdr->page_len);
+		xdrpad = xdr_pad_size(xdr->page_len);
 		if (taillen && xdrpad) {
 			tailbase += xdrpad;
 			taillen -= xdrpad;
@@ -650,29 +649,26 @@
 	if (taillen)
 		memcpy(dst, tailbase, taillen);
 
-	ctxt->sc_sges[0].length += xdr->len;
-	ib_dma_sync_single_for_device(rdma->sc_pd->device,
-				      ctxt->sc_sges[0].addr,
-				      ctxt->sc_sges[0].length,
-				      DMA_TO_DEVICE);
-
+	sctxt->sc_sges[0].length += xdr->len;
+	trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
 	return 0;
 }
 
-/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
+/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
  * @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
+ * @sctxt: send_ctxt for the Send WR
+ * @rctxt: Write and Reply chunks provided by client
  * @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
  *
  * Load the xdr_buf into the ctxt's sge array, and DMA map each
- * element as it is added.
+ * element as it is added. The Send WR's num_sge field is set.
  *
  * Returns zero on success, or a negative errno on failure.
  */
 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
-			   struct svc_rdma_send_ctxt *ctxt,
-			   struct xdr_buf *xdr, __be32 *wr_lst)
+			   struct svc_rdma_send_ctxt *sctxt,
+			   const struct svc_rdma_recv_ctxt *rctxt,
+			   struct xdr_buf *xdr)
 {
 	unsigned int len, remaining;
 	unsigned long page_off;
@@ -681,11 +677,24 @@
 	u32 xdr_pad;
 	int ret;
 
-	if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
-		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+	/* Set up the (persistently-mapped) transport header SGE. */
+	sctxt->sc_send_wr.num_sge = 1;
+	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
 
-	++ctxt->sc_cur_sge_no;
-	ret = svc_rdma_dma_map_buf(rdma, ctxt,
+	/* If there is a Reply chunk, nothing follows the transport
+	 * header, and we're done here.
+	 */
+	if (rctxt && rctxt->rc_reply_chunk)
+		return 0;
+
+	/* For pull-up, svc_rdma_send() will sync the transport header.
+	 * No additional DMA mapping is necessary.
+	 */
+	if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
+		return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
+
+	++sctxt->sc_cur_sge_no;
+	ret = svc_rdma_dma_map_buf(rdma, sctxt,
 				   xdr->head[0].iov_base,
 				   xdr->head[0].iov_len);
 	if (ret < 0)
@@ -696,10 +705,10 @@
 	 * have added XDR padding in the tail buffer, and that
 	 * should not be included inline.
 	 */
-	if (wr_lst) {
+	if (rctxt && rctxt->rc_write_list) {
 		base = xdr->tail[0].iov_base;
 		len = xdr->tail[0].iov_len;
-		xdr_pad = xdr_padsize(xdr->page_len);
+		xdr_pad = xdr_pad_size(xdr->page_len);
 
 		if (len && xdr_pad) {
 			base += xdr_pad;
@@ -715,8 +724,8 @@
 	while (remaining) {
 		len = min_t(u32, PAGE_SIZE - page_off, remaining);
 
-		++ctxt->sc_cur_sge_no;
-		ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
+		++sctxt->sc_cur_sge_no;
+		ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
 					    page_off, len);
 		if (ret < 0)
 			return ret;
@@ -729,8 +738,8 @@
 	len = xdr->tail[0].iov_len;
 tail:
 	if (len) {
-		++ctxt->sc_cur_sge_no;
-		ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
+		++sctxt->sc_cur_sge_no;
+		ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
 		if (ret < 0)
 			return ret;
 	}
@@ -768,7 +777,7 @@
  *
  * RDMA Send is the last step of transmitting an RPC reply. Pages
  * involved in the earlier RDMA Writes are here transferred out
- * of the rqstp and into the ctxt's page array. These pages are
+ * of the rqstp and into the sctxt's page array. These pages are
  * DMA unmapped by each Write completion, but the subsequent Send
  * completion finally releases these pages.
  *
@@ -776,69 +785,94 @@
  * - The Reply's transport header will never be larger than a page.
  */
 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
-				   struct svc_rdma_send_ctxt *ctxt,
-				   __be32 *rdma_argp,
-				   struct svc_rqst *rqstp,
-				   __be32 *wr_lst, __be32 *rp_ch)
-{
-	int ret;
-
-	if (!rp_ch) {
-		ret = svc_rdma_map_reply_msg(rdma, ctxt,
-					     &rqstp->rq_res, wr_lst);
-		if (ret < 0)
-			return ret;
-	}
-
-	svc_rdma_save_io_pages(rqstp, ctxt);
-
-	ctxt->sc_send_wr.opcode = IB_WR_SEND;
-	if (rdma->sc_snd_w_inv) {
-		ctxt->sc_send_wr.ex.invalidate_rkey =
-			svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
-		if (ctxt->sc_send_wr.ex.invalidate_rkey)
-			ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
-	}
-	dprintk("svcrdma: posting Send WR with %u sge(s)\n",
-		ctxt->sc_send_wr.num_sge);
-	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
-}
-
-/* Given the client-provided Write and Reply chunks, the server was not
- * able to form a complete reply. Return an RDMA_ERROR message so the
- * client can retire this RPC transaction. As above, the Send completion
- * routine releases payload pages that were part of a previous RDMA Write.
- *
- * Remote Invalidation is skipped for simplicity.
- */
-static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
-				   struct svc_rdma_send_ctxt *ctxt,
+				   struct svc_rdma_send_ctxt *sctxt,
+				   const struct svc_rdma_recv_ctxt *rctxt,
 				   struct svc_rqst *rqstp)
 {
-	__be32 *p;
 	int ret;
 
-	p = ctxt->sc_xprt_buf;
-	trace_svcrdma_err_chunk(*p);
-	p += 3;
-	*p++ = rdma_error;
-	*p   = err_chunk;
-	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
-
-	svc_rdma_save_io_pages(rqstp, ctxt);
-
-	ctxt->sc_send_wr.opcode = IB_WR_SEND;
-	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
-	if (ret) {
-		svc_rdma_send_ctxt_put(rdma, ctxt);
+	ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
+	if (ret < 0)
 		return ret;
-	}
 
-	return 0;
+	svc_rdma_save_io_pages(rqstp, sctxt);
+
+	if (rctxt->rc_inv_rkey) {
+		sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+		sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
+	} else {
+		sctxt->sc_send_wr.opcode = IB_WR_SEND;
+	}
+	return svc_rdma_send(rdma, sctxt);
 }
 
-void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+/**
+ * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
+ * @rdma: controlling transport context
+ * @sctxt: Send context for the response
+ * @rctxt: Receive context for incoming bad message
+ * @status: negative errno indicating error that occurred
+ *
+ * Given the client-provided Read, Write, and Reply chunks, the
+ * server was not able to parse the Call or form a complete Reply.
+ * Return an RDMA_ERROR message so the client can retire the RPC
+ * transaction.
+ *
+ * The caller does not have to release @sctxt. It is released by
+ * Send completion, or by this function on error.
+ */
+void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
+			     struct svc_rdma_send_ctxt *sctxt,
+			     struct svc_rdma_recv_ctxt *rctxt,
+			     int status)
 {
+	__be32 *rdma_argp = rctxt->rc_recv_buf;
+	__be32 *p;
+
+	rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
+	xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
+			sctxt->sc_xprt_buf, NULL);
+
+	p = xdr_reserve_space(&sctxt->sc_stream,
+			      rpcrdma_fixed_maxsz * sizeof(*p));
+	if (!p)
+		goto put_ctxt;
+
+	*p++ = *rdma_argp;
+	*p++ = *(rdma_argp + 1);
+	*p++ = rdma->sc_fc_credits;
+	*p = rdma_error;
+
+	switch (status) {
+	case -EPROTONOSUPPORT:
+		p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
+		if (!p)
+			goto put_ctxt;
+
+		*p++ = err_vers;
+		*p++ = rpcrdma_version;
+		*p = rpcrdma_version;
+		trace_svcrdma_err_vers(*rdma_argp);
+		break;
+	default:
+		p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
+		if (!p)
+			goto put_ctxt;
+
+		*p = err_chunk;
+		trace_svcrdma_err_chunk(*rdma_argp);
+	}
+
+	/* Remote Invalidation is skipped for simplicity. */
+	sctxt->sc_send_wr.num_sge = 1;
+	sctxt->sc_send_wr.opcode = IB_WR_SEND;
+	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
+	if (svc_rdma_send(rdma, sctxt))
+		goto put_ctxt;
+	return;
+
+put_ctxt:
+	svc_rdma_send_ctxt_put(rdma, sctxt);
 }
 
 /**
@@ -859,54 +893,68 @@
 	struct svcxprt_rdma *rdma =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
-	__be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+	__be32 *rdma_argp = rctxt->rc_recv_buf;
+	__be32 *wr_lst = rctxt->rc_write_list;
+	__be32 *rp_ch = rctxt->rc_reply_chunk;
 	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct svc_rdma_send_ctxt *sctxt;
+	__be32 *p;
 	int ret;
 
-	rdma_argp = rctxt->rc_recv_buf;
-	svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
+	ret = -ENOTCONN;
+	if (svc_xprt_is_dead(xprt))
+		goto err0;
 
-	/* Create the RDMA response header. xprt->xpt_mutex,
-	 * acquired in svc_send(), serializes RPC replies. The
-	 * code path below that inserts the credit grant value
-	 * into each transport header runs only inside this
-	 * critical section.
-	 */
 	ret = -ENOMEM;
 	sctxt = svc_rdma_send_ctxt_get(rdma);
 	if (!sctxt)
 		goto err0;
-	rdma_resp = sctxt->sc_xprt_buf;
 
-	p = rdma_resp;
+	p = xdr_reserve_space(&sctxt->sc_stream,
+			      rpcrdma_fixed_maxsz * sizeof(*p));
+	if (!p)
+		goto err0;
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p++ = rp_ch ? rdma_nomsg : rdma_msg;
+	*p   = rp_ch ? rdma_nomsg : rdma_msg;
 
-	/* Start with empty chunks */
-	*p++ = xdr_zero;
-	*p++ = xdr_zero;
-	*p   = xdr_zero;
-
+	if (svc_rdma_encode_read_list(sctxt) < 0)
+		goto err0;
 	if (wr_lst) {
 		/* XXX: Presume the client sent only one Write chunk */
-		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
+		unsigned long offset;
+		unsigned int length;
+
+		if (rctxt->rc_read_payload_length) {
+			offset = rctxt->rc_read_payload_offset;
+			length = rctxt->rc_read_payload_length;
+		} else {
+			offset = xdr->head[0].iov_len;
+			length = xdr->page_len;
+		}
+		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
+						length);
 		if (ret < 0)
 			goto err2;
-		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
+		if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
+			goto err0;
+	} else {
+		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+			goto err0;
 	}
 	if (rp_ch) {
-		ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
+		ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
 		if (ret < 0)
 			goto err2;
-		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
+		if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
+			goto err0;
+	} else {
+		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+			goto err0;
 	}
 
-	svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
-	ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
-				      wr_lst, rp_ch);
+	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
 	if (ret < 0)
 		goto err1;
 	return 0;
@@ -915,15 +963,44 @@
 	if (ret != -E2BIG && ret != -EINVAL)
 		goto err1;
 
-	ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
-	if (ret < 0)
-		goto err1;
+	/* Send completion releases payload pages that were part
+	 * of previously posted RDMA Writes.
+	 */
+	svc_rdma_save_io_pages(rqstp, sctxt);
+	svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
 	return 0;
 
  err1:
 	svc_rdma_send_ctxt_put(rdma, sctxt);
  err0:
-	trace_svcrdma_send_failed(rqstp, ret);
+	trace_svcrdma_send_err(rqstp, ret);
 	set_bit(XPT_CLOSE, &xprt->xpt_flags);
 	return -ENOTCONN;
 }
+
+/**
+ * svc_rdma_read_payload - special processing for a READ payload
+ * @rqstp: svc_rqst to operate on
+ * @offset: payload's byte offset in @xdr
+ * @length: size of payload, in bytes
+ *
+ * Returns zero on success.
+ *
+ * For the moment, just record the xdr_buf location of the READ
+ * payload. svc_rdma_sendto will use that location later when
+ * we actually send the payload.
+ */
+int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+			  unsigned int length)
+{
+	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+
+	/* XXX: Just one READ payload slot for now, since our
+	 * transport implementation currently supports only one
+	 * Write chunk.
+	 */
+	rctxt->rc_read_payload_offset = offset;
+	rctxt->rc_read_payload_length = length;
+
+	return 0;
+}

--
Gitblit v1.6.2