From 37f49e37ab4cb5d0bc4c60eb5c6d4dd57db767bb Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Fri, 10 May 2024 07:44:59 +0000
Subject: [PATCH] gmac get mac form eeprom
---
kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c | 695 ++++++++++++++++++++++++++++++++-------------------------
1 files changed, 386 insertions(+), 309 deletions(-)
diff --git a/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 4062cd6..c3d588b 100644
--- a/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -106,7 +106,6 @@
#include <rdma/rdma_cm.h>
#include <linux/sunrpc/debug.h>
-#include <linux/sunrpc/rpc_rdma.h>
#include <linux/sunrpc/svc_rdma.h>
#include "xprt_rdma.h"
@@ -121,6 +120,13 @@
{
return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
sc_list);
+}
+
+static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
+ struct rpc_rdma_cid *cid)
+{
+ cid->ci_queue_id = rdma->sc_sq_cq->res.id;
+ cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
}
static struct svc_rdma_send_ctxt *
@@ -145,12 +151,16 @@
if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
goto fail2;
+ svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
+
ctxt->sc_send_wr.next = NULL;
ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
ctxt->sc_xprt_buf = buffer;
+ xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+ rdma->sc_max_req_size);
ctxt->sc_sges[0].addr = addr;
for (i = 0; i < rdma->sc_max_send_sges; i++)
@@ -204,6 +214,10 @@
spin_unlock(&rdma->sc_send_lock);
out:
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+ ctxt->sc_xprt_buf, NULL);
+
ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0;
@@ -233,11 +247,15 @@
/* The first SGE contains the transport header, which
* remains mapped until @ctxt is destroyed.
*/
- for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
+ for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
ib_dma_unmap_page(device,
ctxt->sc_sges[i].addr,
ctxt->sc_sges[i].length,
DMA_TO_DEVICE);
+ trace_svcrdma_dma_unmap_page(rdma,
+ ctxt->sc_sges[i].addr,
+ ctxt->sc_sges[i].length);
+ }
for (i = 0; i < ctxt->sc_page_count; ++i)
put_page(ctxt->sc_pages[i]);
@@ -259,41 +277,42 @@
{
struct svcxprt_rdma *rdma = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
- struct svc_rdma_send_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt =
+ container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
- trace_svcrdma_wc_send(wc);
+ trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
atomic_inc(&rdma->sc_sq_avail);
wake_up(&rdma->sc_send_wait);
- ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
svc_rdma_send_ctxt_put(rdma, ctxt);
if (unlikely(wc->status != IB_WC_SUCCESS)) {
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt);
- if (wc->status != IB_WC_WR_FLUSH_ERR)
- pr_err("svcrdma: Send: %s (%u/0x%x)\n",
- ib_wc_status_msg(wc->status),
- wc->status, wc->vendor_err);
}
-
- svc_xprt_put(&rdma->sc_xprt);
}
/**
* svc_rdma_send - Post a single Send WR
* @rdma: transport on which to post the WR
- * @wr: prepared Send WR to post
+ * @ctxt: send ctxt with a Send WR ready to post
*
* Returns zero the Send WR was posted successfully. Otherwise, a
* negative errno is returned.
*/
-int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
{
+ struct ib_send_wr *wr = &ctxt->sc_send_wr;
int ret;
might_sleep();
+
+ /* Sync the transport header buffer */
+ ib_dma_sync_single_for_device(rdma->sc_pd->device,
+ wr->sg_list[0].addr,
+ wr->sg_list[0].length,
+ DMA_TO_DEVICE);
/* If the SQ is full, wait until an SQ entry is available */
while (1) {
@@ -309,8 +328,7 @@
continue;
}
- svc_xprt_get(&rdma->sc_xprt);
- trace_svcrdma_post_send(wr);
+ trace_svcrdma_post_send(ctxt);
ret = ib_post_send(rdma->sc_qp, wr, NULL);
if (ret)
break;
@@ -319,197 +337,173 @@
trace_svcrdma_sq_post_err(rdma, ret);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- svc_xprt_put(&rdma->sc_xprt);
wake_up(&rdma->sc_send_wait);
return ret;
}
-static u32 xdr_padsize(u32 len)
-{
- return (len & 3) ? (4 - (len & 3)) : 0;
-}
-
-/* Returns length of transport header, in bytes.
- */
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
-{
- unsigned int nsegs;
- __be32 *p;
-
- p = rdma_resp;
-
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p += rpcrdma_fixed_maxsz + 1;
-
- /* Skip Write list. */
- while (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
- }
-
- /* Skip Reply chunk. */
- if (*p++ != xdr_zero) {
- nsegs = be32_to_cpup(p++);
- p += nsegs * rpcrdma_segment_maxsz;
- }
-
- return (unsigned long)p - (unsigned long)rdma_resp;
-}
-
-/* One Write chunk is copied from Call transport header to Reply
- * transport header. Each segment's length field is updated to
- * reflect number of bytes consumed in the segment.
+/**
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+ * @sctxt: Send context for the RPC Reply
*
- * Returns number of segments in this chunk.
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply Read list
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
+{
+ /* RPC-over-RDMA version 1 replies never have a Read list. */
+ return xdr_stream_encode_item_absent(&sctxt->sc_stream);
+}
+
+/**
+ * svc_rdma_encode_write_segment - Encode one Write segment
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: remaining bytes of the payload left in the Write chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write segment
+ * %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_write_segment(__be32 *src,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int *remaining)
+{
+ __be32 *p;
+ const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
+ u32 handle, length;
+ u64 offset;
+
+ p = xdr_reserve_space(&sctxt->sc_stream, len);
+ if (!p)
+ return -EMSGSIZE;
+
+ xdr_decode_rdma_segment(src, &handle, &length, &offset);
+
+ if (*remaining < length) {
+ /* segment only partly filled */
+ length = *remaining;
+ *remaining = 0;
+ } else {
+ /* entire segment was consumed */
+ *remaining -= length;
+ }
+ xdr_encode_rdma_segment(p, handle, length, offset);
+
+ trace_svcrdma_encode_wseg(handle, length, offset);
+ return len;
+}
+
+/**
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: size in bytes of the payload in the Write chunk
+ *
+ * Copy a Write chunk from the Call transport header to the
+ * Reply transport header. Update each segment's length field
+ * to reflect the number of bytes written in that segment.
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Write chunk
+ * %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
+ struct svc_rdma_send_ctxt *sctxt,
unsigned int remaining)
{
unsigned int i, nsegs;
- u32 seg_len;
+ ssize_t len, ret;
- /* Write list discriminator */
- *dst++ = *src++;
+ len = 0;
+ trace_svcrdma_encode_write_chunk(remaining);
- /* number of segments in this chunk */
- nsegs = be32_to_cpup(src);
- *dst++ = *src++;
+ src++;
+ ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
+ if (ret < 0)
+ return -EMSGSIZE;
+ len += ret;
+
+ nsegs = be32_to_cpup(src++);
+ ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+ if (ret < 0)
+ return -EMSGSIZE;
+ len += ret;
for (i = nsegs; i; i--) {
- /* segment's RDMA handle */
- *dst++ = *src++;
-
- /* bytes returned in this segment */
- seg_len = be32_to_cpu(*src);
- if (remaining >= seg_len) {
- /* entire segment was consumed */
- *dst = *src;
- remaining -= seg_len;
- } else {
- /* segment only partly filled */
- *dst = cpu_to_be32(remaining);
- remaining = 0;
- }
- dst++; src++;
-
- /* segment's RDMA offset */
- *dst++ = *src++;
- *dst++ = *src++;
+ ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+ if (ret < 0)
+ return -EMSGSIZE;
+ src += rpcrdma_segment_maxsz;
+ len += ret;
}
- return nsegs;
+ return len;
}
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
+/**
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the first Write chunk
+ *
+ * The client provides a Write chunk list in the Call message. Fill
+ * in the segments in the first Write chunk in the Reply's transport
* header with the number of bytes consumed in each segment.
* Remaining chunks are returned unused.
*
* Assumptions:
* - Client has provided only one Write chunk
+ *
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Write list
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
- unsigned int consumed)
+static ssize_t
+svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- unsigned int nsegs;
- __be32 *p, *q;
+ ssize_t len, ret;
- /* RPC-over-RDMA V1 replies never have a Read list. */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+ ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
+ if (ret < 0)
+ return ret;
+ len = ret;
- q = wr_ch;
- while (*q != xdr_zero) {
- nsegs = xdr_encode_write_chunk(p, q, consumed);
- q += 2 + nsegs * rpcrdma_segment_maxsz;
- p += 2 + nsegs * rpcrdma_segment_maxsz;
- consumed = 0;
- }
+ /* Terminate the Write list */
+ ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
+ if (ret < 0)
+ return ret;
- /* Terminate Write list */
- *p++ = xdr_zero;
-
- /* Reply chunk discriminator; may be replaced later */
- *p = xdr_zero;
+ return len + ret;
}
-/* The client provided a Reply chunk in the Call message. Fill in
- * the segments in the Reply chunk in the Reply message with the
- * number of bytes consumed in each segment.
+/**
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the Reply chunk
*
* Assumptions:
- * - Reply can always fit in the provided Reply chunk
- */
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
- unsigned int consumed)
-{
- __be32 *p;
-
- /* Find the Reply chunk in the Reply's xprt header.
- * RPC-over-RDMA V1 replies never have a Read list.
- */
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
- /* Skip past Write list */
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
-
- xdr_encode_write_chunk(p, rp_ch, consumed);
-}
-
-/* Parse the RPC Call's transport header.
- */
-static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
- __be32 **write, __be32 **reply)
-{
- __be32 *p;
-
- p = rdma_argp + rpcrdma_fixed_maxsz;
-
- /* Read list */
- while (*p++ != xdr_zero)
- p += 5;
-
- /* Write list */
- if (*p != xdr_zero) {
- *write = p;
- while (*p++ != xdr_zero)
- p += 1 + be32_to_cpu(*p) * 4;
- } else {
- *write = NULL;
- p++;
- }
-
- /* Reply chunk */
- if (*p != xdr_zero)
- *reply = p;
- else
- *reply = NULL;
-}
-
-/* RPC-over-RDMA Version One private extension: Remote Invalidation.
- * Responder's choice: requester signals it can handle Send With
- * Invalidate, and responder chooses one rkey to invalidate.
+ * - Reply can always fit in the client-provided Reply chunk
*
- * Find a candidate rkey to invalidate when sending a reply. Picks the
- * first R_key it finds in the chunk lists.
- *
- * Returns zero if RPC's chunk lists are empty.
+ * Return values:
+ * On success, returns length in bytes of the Reply XDR buffer
+ * that was consumed by the Reply's Reply chunk
+ * %-EMSGSIZE on XDR buffer overflow
*/
-static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
- __be32 *wr_lst, __be32 *rp_ch)
+static ssize_t
+svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ unsigned int length)
{
- __be32 *p;
-
- p = rdma_argp + rpcrdma_fixed_maxsz;
- if (*p != xdr_zero)
- p += 2;
- else if (wr_lst && be32_to_cpup(wr_lst + 1))
- p = wr_lst + 2;
- else if (rp_ch && be32_to_cpup(rp_ch + 1))
- p = rp_ch + 2;
- else
- return 0;
- return be32_to_cpup(p);
+ return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
+ length);
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -522,6 +516,7 @@
dma_addr_t dma_addr;
dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
+ trace_svcrdma_dma_map_page(rdma, dma_addr, len);
if (ib_dma_mapping_error(dev, dma_addr))
goto out_maperr;
@@ -531,7 +526,6 @@
return 0;
out_maperr:
- trace_svcrdma_dma_map_page(rdma, page);
return -EIO;
}
@@ -548,38 +542,36 @@
}
/**
- * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
+ * svc_rdma_pull_up_needed - Determine whether to use pull-up
* @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
- * @len: length of transport header
+ * @sctxt: send_ctxt for the Send WR
+ * @rctxt: Write and Reply chunks provided by client
+ * @xdr: xdr_buf containing RPC message to transmit
*
- */
-void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- unsigned int len)
-{
- ctxt->sc_sges[0].length = len;
- ctxt->sc_send_wr.num_sge++;
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
- ctxt->sc_sges[0].addr, len,
- DMA_TO_DEVICE);
-}
-
-/* If the xdr_buf has more elements than the device can
- * transmit in a single RDMA Send, then the reply will
- * have to be copied into a bounce buffer.
+ * Returns:
+ * %true if pull-up must be used
+ * %false otherwise
*/
static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
- struct xdr_buf *xdr,
- __be32 *wr_lst)
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct xdr_buf *xdr)
{
int elements;
+ /* For small messages, copying bytes is cheaper than DMA mapping.
+ */
+ if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+ return true;
+
+ /* Check whether the xdr_buf has more elements than can
+ * fit in a single RDMA Send.
+ */
/* xdr->head */
elements = 1;
/* xdr->pages */
- if (!wr_lst) {
+ if (!rctxt || !rctxt->rc_write_list) {
unsigned int remaining;
unsigned long pageoff;
@@ -601,29 +593,36 @@
return elements >= rdma->sc_max_send_sges;
}
-/* The device is not capable of sending the reply directly.
- * Assemble the elements of @xdr into the transport header
- * buffer.
+/**
+ * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
+ * @rdma: controlling transport
+ * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
+ * @rctxt: Write and Reply chunks provided by client
+ * @xdr: prepared xdr_buf containing RPC message
+ *
+ * The device is not capable of sending the reply directly.
+ * Assemble the elements of @xdr into the transport header buffer.
+ *
+ * Returns zero on success, or a negative errno on failure.
*/
static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ const struct xdr_buf *xdr)
{
unsigned char *dst, *tailbase;
unsigned int taillen;
- dst = ctxt->sc_xprt_buf;
- dst += ctxt->sc_sges[0].length;
-
+ dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
dst += xdr->head[0].iov_len;
tailbase = xdr->tail[0].iov_base;
taillen = xdr->tail[0].iov_len;
- if (wr_lst) {
+ if (rctxt && rctxt->rc_write_list) {
u32 xdrpad;
- xdrpad = xdr_padsize(xdr->page_len);
+ xdrpad = xdr_pad_size(xdr->page_len);
if (taillen && xdrpad) {
tailbase += xdrpad;
taillen -= xdrpad;
@@ -650,29 +649,26 @@
if (taillen)
memcpy(dst, tailbase, taillen);
- ctxt->sc_sges[0].length += xdr->len;
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
- ctxt->sc_sges[0].addr,
- ctxt->sc_sges[0].length,
- DMA_TO_DEVICE);
-
+ sctxt->sc_sges[0].length += xdr->len;
+ trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
return 0;
}
-/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
+/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
* @rdma: controlling transport
- * @ctxt: send_ctxt for the Send WR
+ * @sctxt: send_ctxt for the Send WR
+ * @rctxt: Write and Reply chunks provided by client
* @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
*
* Load the xdr_buf into the ctxt's sge array, and DMA map each
- * element as it is added.
+ * element as it is added. The Send WR's num_sge field is set.
*
* Returns zero on success, or a negative errno on failure.
*/
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct xdr_buf *xdr)
{
unsigned int len, remaining;
unsigned long page_off;
@@ -681,11 +677,24 @@
u32 xdr_pad;
int ret;
- if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
- return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+ /* Set up the (persistently-mapped) transport header SGE. */
+ sctxt->sc_send_wr.num_sge = 1;
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
- ++ctxt->sc_cur_sge_no;
- ret = svc_rdma_dma_map_buf(rdma, ctxt,
+ /* If there is a Reply chunk, nothing follows the transport
+ * header, and we're done here.
+ */
+ if (rctxt && rctxt->rc_reply_chunk)
+ return 0;
+
+ /* For pull-up, svc_rdma_send() will sync the transport header.
+ * No additional DMA mapping is necessary.
+ */
+ if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
+ return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
+
+ ++sctxt->sc_cur_sge_no;
+ ret = svc_rdma_dma_map_buf(rdma, sctxt,
xdr->head[0].iov_base,
xdr->head[0].iov_len);
if (ret < 0)
@@ -696,10 +705,10 @@
* have added XDR padding in the tail buffer, and that
* should not be included inline.
*/
- if (wr_lst) {
+ if (rctxt && rctxt->rc_write_list) {
base = xdr->tail[0].iov_base;
len = xdr->tail[0].iov_len;
- xdr_pad = xdr_padsize(xdr->page_len);
+ xdr_pad = xdr_pad_size(xdr->page_len);
if (len && xdr_pad) {
base += xdr_pad;
@@ -715,8 +724,8 @@
while (remaining) {
len = min_t(u32, PAGE_SIZE - page_off, remaining);
- ++ctxt->sc_cur_sge_no;
- ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
+ ++sctxt->sc_cur_sge_no;
+ ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
page_off, len);
if (ret < 0)
return ret;
@@ -729,8 +738,8 @@
len = xdr->tail[0].iov_len;
tail:
if (len) {
- ++ctxt->sc_cur_sge_no;
- ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
+ ++sctxt->sc_cur_sge_no;
+ ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
if (ret < 0)
return ret;
}
@@ -768,7 +777,7 @@
*
* RDMA Send is the last step of transmitting an RPC reply. Pages
* involved in the earlier RDMA Writes are here transferred out
- * of the rqstp and into the ctxt's page array. These pages are
+ * of the rqstp and into the sctxt's page array. These pages are
* DMA unmapped by each Write completion, but the subsequent Send
* completion finally releases these pages.
*
@@ -776,69 +785,94 @@
* - The Reply's transport header will never be larger than a page.
*/
static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
- __be32 *rdma_argp,
- struct svc_rqst *rqstp,
- __be32 *wr_lst, __be32 *rp_ch)
-{
- int ret;
-
- if (!rp_ch) {
- ret = svc_rdma_map_reply_msg(rdma, ctxt,
- &rqstp->rq_res, wr_lst);
- if (ret < 0)
- return ret;
- }
-
- svc_rdma_save_io_pages(rqstp, ctxt);
-
- ctxt->sc_send_wr.opcode = IB_WR_SEND;
- if (rdma->sc_snd_w_inv) {
- ctxt->sc_send_wr.ex.invalidate_rkey =
- svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
- if (ctxt->sc_send_wr.ex.invalidate_rkey)
- ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
- }
- dprintk("svcrdma: posting Send WR with %u sge(s)\n",
- ctxt->sc_send_wr.num_sge);
- return svc_rdma_send(rdma, &ctxt->sc_send_wr);
-}
-
-/* Given the client-provided Write and Reply chunks, the server was not
- * able to form a complete reply. Return an RDMA_ERROR message so the
- * client can retire this RPC transaction. As above, the Send completion
- * routine releases payload pages that were part of a previous RDMA Write.
- *
- * Remote Invalidation is skipped for simplicity.
- */
-static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rqst *rqstp)
{
- __be32 *p;
int ret;
- p = ctxt->sc_xprt_buf;
- trace_svcrdma_err_chunk(*p);
- p += 3;
- *p++ = rdma_error;
- *p = err_chunk;
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
-
- svc_rdma_save_io_pages(rqstp, ctxt);
-
- ctxt->sc_send_wr.opcode = IB_WR_SEND;
- ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
- if (ret) {
- svc_rdma_send_ctxt_put(rdma, ctxt);
+ ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
+ if (ret < 0)
return ret;
- }
- return 0;
+ svc_rdma_save_io_pages(rqstp, sctxt);
+
+ if (rctxt->rc_inv_rkey) {
+ sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
+ sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
+ } else {
+ sctxt->sc_send_wr.opcode = IB_WR_SEND;
+ }
+ return svc_rdma_send(rdma, sctxt);
}
-void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+/**
+ * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
+ * @rdma: controlling transport context
+ * @sctxt: Send context for the response
+ * @rctxt: Receive context for incoming bad message
+ * @status: negative errno indicating error that occurred
+ *
+ * Given the client-provided Read, Write, and Reply chunks, the
+ * server was not able to parse the Call or form a complete Reply.
+ * Return an RDMA_ERROR message so the client can retire the RPC
+ * transaction.
+ *
+ * The caller does not have to release @sctxt. It is released by
+ * Send completion, or by this function on error.
+ */
+void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ struct svc_rdma_recv_ctxt *rctxt,
+ int status)
{
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
+ __be32 *p;
+
+ rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
+ xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
+ sctxt->sc_xprt_buf, NULL);
+
+ p = xdr_reserve_space(&sctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p++ = *rdma_argp;
+ *p++ = *(rdma_argp + 1);
+ *p++ = rdma->sc_fc_credits;
+ *p = rdma_error;
+
+ switch (status) {
+ case -EPROTONOSUPPORT:
+ p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p++ = err_vers;
+ *p++ = rpcrdma_version;
+ *p = rpcrdma_version;
+ trace_svcrdma_err_vers(*rdma_argp);
+ break;
+ default:
+ p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
+ if (!p)
+ goto put_ctxt;
+
+ *p = err_chunk;
+ trace_svcrdma_err_chunk(*rdma_argp);
+ }
+
+ /* Remote Invalidation is skipped for simplicity. */
+ sctxt->sc_send_wr.num_sge = 1;
+ sctxt->sc_send_wr.opcode = IB_WR_SEND;
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
+ if (svc_rdma_send(rdma, sctxt))
+ goto put_ctxt;
+ return;
+
+put_ctxt:
+ svc_rdma_send_ctxt_put(rdma, sctxt);
}
/**
@@ -859,54 +893,68 @@
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
- __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
+ __be32 *wr_lst = rctxt->rc_write_list;
+ __be32 *rp_ch = rctxt->rc_reply_chunk;
struct xdr_buf *xdr = &rqstp->rq_res;
struct svc_rdma_send_ctxt *sctxt;
+ __be32 *p;
int ret;
- rdma_argp = rctxt->rc_recv_buf;
- svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
+ ret = -ENOTCONN;
+ if (svc_xprt_is_dead(xprt))
+ goto err0;
- /* Create the RDMA response header. xprt->xpt_mutex,
- * acquired in svc_send(), serializes RPC replies. The
- * code path below that inserts the credit grant value
- * into each transport header runs only inside this
- * critical section.
- */
ret = -ENOMEM;
sctxt = svc_rdma_send_ctxt_get(rdma);
if (!sctxt)
goto err0;
- rdma_resp = sctxt->sc_xprt_buf;
- p = rdma_resp;
+ p = xdr_reserve_space(&sctxt->sc_stream,
+ rpcrdma_fixed_maxsz * sizeof(*p));
+ if (!p)
+ goto err0;
*p++ = *rdma_argp;
*p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits;
- *p++ = rp_ch ? rdma_nomsg : rdma_msg;
+ *p = rp_ch ? rdma_nomsg : rdma_msg;
- /* Start with empty chunks */
- *p++ = xdr_zero;
- *p++ = xdr_zero;
- *p = xdr_zero;
-
+ if (svc_rdma_encode_read_list(sctxt) < 0)
+ goto err0;
if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */
- ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
+ unsigned long offset;
+ unsigned int length;
+
+ if (rctxt->rc_read_payload_length) {
+ offset = rctxt->rc_read_payload_offset;
+ length = rctxt->rc_read_payload_length;
+ } else {
+ offset = xdr->head[0].iov_len;
+ length = xdr->page_len;
+ }
+ ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
+ length);
if (ret < 0)
goto err2;
- svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
+ if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
+ goto err0;
+ } else {
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+ goto err0;
}
if (rp_ch) {
- ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
+ ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
goto err2;
- svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
+ if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
+ goto err0;
+ } else {
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+ goto err0;
}
- svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
- ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
- wr_lst, rp_ch);
+ ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
if (ret < 0)
goto err1;
return 0;
@@ -915,15 +963,44 @@
if (ret != -E2BIG && ret != -EINVAL)
goto err1;
- ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
- if (ret < 0)
- goto err1;
+ /* Send completion releases payload pages that were part
+ * of previously posted RDMA Writes.
+ */
+ svc_rdma_save_io_pages(rqstp, sctxt);
+ svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
return 0;
err1:
svc_rdma_send_ctxt_put(rdma, sctxt);
err0:
- trace_svcrdma_send_failed(rqstp, ret);
+ trace_svcrdma_send_err(rqstp, ret);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
return -ENOTCONN;
}
+
+/**
+ * svc_rdma_read_payload - special processing for a READ payload
+ * @rqstp: svc_rqst to operate on
+ * @offset: payload's byte offset in @xdr
+ * @length: size of payload, in bytes
+ *
+ * Returns zero on success.
+ *
+ * For the moment, just record the xdr_buf location of the READ
+ * payload. svc_rdma_sendto will use that location later when
+ * we actually send the payload.
+ */
+int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
+ unsigned int length)
+{
+ struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+
+ /* XXX: Just one READ payload slot for now, since our
+ * transport implementation currently supports only one
+ * Write chunk.
+ */
+ rctxt->rc_read_payload_offset = offset;
+ rctxt->rc_read_payload_length = length;
+
+ return 0;
+}
--
Gitblit v1.6.2