hc
2024-10-22 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5
kernel/net/sunrpc/xprtrdma/svc_rdma_sendto.c
....@@ -106,7 +106,6 @@
106106 #include <rdma/rdma_cm.h>
107107
108108 #include <linux/sunrpc/debug.h>
109
-#include <linux/sunrpc/rpc_rdma.h>
110109 #include <linux/sunrpc/svc_rdma.h>
111110
112111 #include "xprt_rdma.h"
....@@ -121,6 +120,13 @@
121120 {
122121 return list_first_entry_or_null(list, struct svc_rdma_send_ctxt,
123122 sc_list);
123
+}
124
+
125
+static void svc_rdma_send_cid_init(struct svcxprt_rdma *rdma,
126
+ struct rpc_rdma_cid *cid)
127
+{
128
+ cid->ci_queue_id = rdma->sc_sq_cq->res.id;
129
+ cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
124130 }
125131
126132 static struct svc_rdma_send_ctxt *
....@@ -145,12 +151,16 @@
145151 if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
146152 goto fail2;
147153
154
+ svc_rdma_send_cid_init(rdma, &ctxt->sc_cid);
155
+
148156 ctxt->sc_send_wr.next = NULL;
149157 ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
150158 ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
151159 ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
152160 ctxt->sc_cqe.done = svc_rdma_wc_send;
153161 ctxt->sc_xprt_buf = buffer;
162
+ xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
163
+ rdma->sc_max_req_size);
154164 ctxt->sc_sges[0].addr = addr;
155165
156166 for (i = 0; i < rdma->sc_max_send_sges; i++)
....@@ -204,6 +214,10 @@
204214 spin_unlock(&rdma->sc_send_lock);
205215
206216 out:
217
+ rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
218
+ xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
219
+ ctxt->sc_xprt_buf, NULL);
220
+
207221 ctxt->sc_send_wr.num_sge = 0;
208222 ctxt->sc_cur_sge_no = 0;
209223 ctxt->sc_page_count = 0;
....@@ -233,11 +247,15 @@
233247 /* The first SGE contains the transport header, which
234248 * remains mapped until @ctxt is destroyed.
235249 */
236
- for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
250
+ for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) {
237251 ib_dma_unmap_page(device,
238252 ctxt->sc_sges[i].addr,
239253 ctxt->sc_sges[i].length,
240254 DMA_TO_DEVICE);
255
+ trace_svcrdma_dma_unmap_page(rdma,
256
+ ctxt->sc_sges[i].addr,
257
+ ctxt->sc_sges[i].length);
258
+ }
241259
242260 for (i = 0; i < ctxt->sc_page_count; ++i)
243261 put_page(ctxt->sc_pages[i]);
....@@ -259,41 +277,42 @@
259277 {
260278 struct svcxprt_rdma *rdma = cq->cq_context;
261279 struct ib_cqe *cqe = wc->wr_cqe;
262
- struct svc_rdma_send_ctxt *ctxt;
280
+ struct svc_rdma_send_ctxt *ctxt =
281
+ container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
263282
264
- trace_svcrdma_wc_send(wc);
283
+ trace_svcrdma_wc_send(wc, &ctxt->sc_cid);
265284
266285 atomic_inc(&rdma->sc_sq_avail);
267286 wake_up(&rdma->sc_send_wait);
268287
269
- ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe);
270288 svc_rdma_send_ctxt_put(rdma, ctxt);
271289
272290 if (unlikely(wc->status != IB_WC_SUCCESS)) {
273291 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
274292 svc_xprt_enqueue(&rdma->sc_xprt);
275
- if (wc->status != IB_WC_WR_FLUSH_ERR)
276
- pr_err("svcrdma: Send: %s (%u/0x%x)\n",
277
- ib_wc_status_msg(wc->status),
278
- wc->status, wc->vendor_err);
279293 }
280
-
281
- svc_xprt_put(&rdma->sc_xprt);
282294 }
283295
284296 /**
285297 * svc_rdma_send - Post a single Send WR
286298 * @rdma: transport on which to post the WR
287
- * @wr: prepared Send WR to post
299
+ * @ctxt: send ctxt with a Send WR ready to post
288300 *
289301 * Returns zero the Send WR was posted successfully. Otherwise, a
290302 * negative errno is returned.
291303 */
292
-int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
304
+int svc_rdma_send(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt)
293305 {
306
+ struct ib_send_wr *wr = &ctxt->sc_send_wr;
294307 int ret;
295308
296309 might_sleep();
310
+
311
+ /* Sync the transport header buffer */
312
+ ib_dma_sync_single_for_device(rdma->sc_pd->device,
313
+ wr->sg_list[0].addr,
314
+ wr->sg_list[0].length,
315
+ DMA_TO_DEVICE);
297316
298317 /* If the SQ is full, wait until an SQ entry is available */
299318 while (1) {
....@@ -309,8 +328,7 @@
309328 continue;
310329 }
311330
312
- svc_xprt_get(&rdma->sc_xprt);
313
- trace_svcrdma_post_send(wr);
331
+ trace_svcrdma_post_send(ctxt);
314332 ret = ib_post_send(rdma->sc_qp, wr, NULL);
315333 if (ret)
316334 break;
....@@ -319,197 +337,173 @@
319337
320338 trace_svcrdma_sq_post_err(rdma, ret);
321339 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
322
- svc_xprt_put(&rdma->sc_xprt);
323340 wake_up(&rdma->sc_send_wait);
324341 return ret;
325342 }
326343
327
-static u32 xdr_padsize(u32 len)
328
-{
329
- return (len & 3) ? (4 - (len & 3)) : 0;
330
-}
331
-
332
-/* Returns length of transport header, in bytes.
333
- */
334
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
335
-{
336
- unsigned int nsegs;
337
- __be32 *p;
338
-
339
- p = rdma_resp;
340
-
341
- /* RPC-over-RDMA V1 replies never have a Read list. */
342
- p += rpcrdma_fixed_maxsz + 1;
343
-
344
- /* Skip Write list. */
345
- while (*p++ != xdr_zero) {
346
- nsegs = be32_to_cpup(p++);
347
- p += nsegs * rpcrdma_segment_maxsz;
348
- }
349
-
350
- /* Skip Reply chunk. */
351
- if (*p++ != xdr_zero) {
352
- nsegs = be32_to_cpup(p++);
353
- p += nsegs * rpcrdma_segment_maxsz;
354
- }
355
-
356
- return (unsigned long)p - (unsigned long)rdma_resp;
357
-}
358
-
359
-/* One Write chunk is copied from Call transport header to Reply
360
- * transport header. Each segment's length field is updated to
361
- * reflect number of bytes consumed in the segment.
344
+/**
345
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
346
+ * @sctxt: Send context for the RPC Reply
362347 *
363
- * Returns number of segments in this chunk.
348
+ * Return values:
349
+ * On success, returns length in bytes of the Reply XDR buffer
350
+ * that was consumed by the Reply Read list
351
+ * %-EMSGSIZE on XDR buffer overflow
364352 */
365
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
353
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
354
+{
355
+ /* RPC-over-RDMA version 1 replies never have a Read list. */
356
+ return xdr_stream_encode_item_absent(&sctxt->sc_stream);
357
+}
358
+
359
+/**
360
+ * svc_rdma_encode_write_segment - Encode one Write segment
361
+ * @src: matching Write chunk in the RPC Call header
362
+ * @sctxt: Send context for the RPC Reply
363
+ * @remaining: remaining bytes of the payload left in the Write chunk
364
+ *
365
+ * Return values:
366
+ * On success, returns length in bytes of the Reply XDR buffer
367
+ * that was consumed by the Write segment
368
+ * %-EMSGSIZE on XDR buffer overflow
369
+ */
370
+static ssize_t svc_rdma_encode_write_segment(__be32 *src,
371
+ struct svc_rdma_send_ctxt *sctxt,
372
+ unsigned int *remaining)
373
+{
374
+ __be32 *p;
375
+ const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
376
+ u32 handle, length;
377
+ u64 offset;
378
+
379
+ p = xdr_reserve_space(&sctxt->sc_stream, len);
380
+ if (!p)
381
+ return -EMSGSIZE;
382
+
383
+ xdr_decode_rdma_segment(src, &handle, &length, &offset);
384
+
385
+ if (*remaining < length) {
386
+ /* segment only partly filled */
387
+ length = *remaining;
388
+ *remaining = 0;
389
+ } else {
390
+ /* entire segment was consumed */
391
+ *remaining -= length;
392
+ }
393
+ xdr_encode_rdma_segment(p, handle, length, offset);
394
+
395
+ trace_svcrdma_encode_wseg(handle, length, offset);
396
+ return len;
397
+}
398
+
399
+/**
400
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
401
+ * @src: matching Write chunk in the RPC Call header
402
+ * @sctxt: Send context for the RPC Reply
403
+ * @remaining: size in bytes of the payload in the Write chunk
404
+ *
405
+ * Copy a Write chunk from the Call transport header to the
406
+ * Reply transport header. Update each segment's length field
407
+ * to reflect the number of bytes written in that segment.
408
+ *
409
+ * Return values:
410
+ * On success, returns length in bytes of the Reply XDR buffer
411
+ * that was consumed by the Write chunk
412
+ * %-EMSGSIZE on XDR buffer overflow
413
+ */
414
+static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
415
+ struct svc_rdma_send_ctxt *sctxt,
366416 unsigned int remaining)
367417 {
368418 unsigned int i, nsegs;
369
- u32 seg_len;
419
+ ssize_t len, ret;
370420
371
- /* Write list discriminator */
372
- *dst++ = *src++;
421
+ len = 0;
422
+ trace_svcrdma_encode_write_chunk(remaining);
373423
374
- /* number of segments in this chunk */
375
- nsegs = be32_to_cpup(src);
376
- *dst++ = *src++;
424
+ src++;
425
+ ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
426
+ if (ret < 0)
427
+ return -EMSGSIZE;
428
+ len += ret;
429
+
430
+ nsegs = be32_to_cpup(src++);
431
+ ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
432
+ if (ret < 0)
433
+ return -EMSGSIZE;
434
+ len += ret;
377435
378436 for (i = nsegs; i; i--) {
379
- /* segment's RDMA handle */
380
- *dst++ = *src++;
381
-
382
- /* bytes returned in this segment */
383
- seg_len = be32_to_cpu(*src);
384
- if (remaining >= seg_len) {
385
- /* entire segment was consumed */
386
- *dst = *src;
387
- remaining -= seg_len;
388
- } else {
389
- /* segment only partly filled */
390
- *dst = cpu_to_be32(remaining);
391
- remaining = 0;
392
- }
393
- dst++; src++;
394
-
395
- /* segment's RDMA offset */
396
- *dst++ = *src++;
397
- *dst++ = *src++;
437
+ ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
438
+ if (ret < 0)
439
+ return -EMSGSIZE;
440
+ src += rpcrdma_segment_maxsz;
441
+ len += ret;
398442 }
399443
400
- return nsegs;
444
+ return len;
401445 }
402446
403
-/* The client provided a Write list in the Call message. Fill in
404
- * the segments in the first Write chunk in the Reply's transport
447
+/**
448
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
449
+ * @rctxt: Reply context with information about the RPC Call
450
+ * @sctxt: Send context for the RPC Reply
451
+ * @length: size in bytes of the payload in the first Write chunk
452
+ *
453
+ * The client provides a Write chunk list in the Call message. Fill
454
+ * in the segments in the first Write chunk in the Reply's transport
405455 * header with the number of bytes consumed in each segment.
406456 * Remaining chunks are returned unused.
407457 *
408458 * Assumptions:
409459 * - Client has provided only one Write chunk
460
+ *
461
+ * Return values:
462
+ * On success, returns length in bytes of the Reply XDR buffer
463
+ * that was consumed by the Reply's Write list
464
+ * %-EMSGSIZE on XDR buffer overflow
410465 */
411
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
412
- unsigned int consumed)
466
+static ssize_t
467
+svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
468
+ struct svc_rdma_send_ctxt *sctxt,
469
+ unsigned int length)
413470 {
414
- unsigned int nsegs;
415
- __be32 *p, *q;
471
+ ssize_t len, ret;
416472
417
- /* RPC-over-RDMA V1 replies never have a Read list. */
418
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
473
+ ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
474
+ if (ret < 0)
475
+ return ret;
476
+ len = ret;
419477
420
- q = wr_ch;
421
- while (*q != xdr_zero) {
422
- nsegs = xdr_encode_write_chunk(p, q, consumed);
423
- q += 2 + nsegs * rpcrdma_segment_maxsz;
424
- p += 2 + nsegs * rpcrdma_segment_maxsz;
425
- consumed = 0;
426
- }
478
+ /* Terminate the Write list */
479
+ ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
480
+ if (ret < 0)
481
+ return ret;
427482
428
- /* Terminate Write list */
429
- *p++ = xdr_zero;
430
-
431
- /* Reply chunk discriminator; may be replaced later */
432
- *p = xdr_zero;
483
+ return len + ret;
433484 }
434485
435
-/* The client provided a Reply chunk in the Call message. Fill in
436
- * the segments in the Reply chunk in the Reply message with the
437
- * number of bytes consumed in each segment.
486
+/**
487
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
488
+ * @rctxt: Reply context with information about the RPC Call
489
+ * @sctxt: Send context for the RPC Reply
490
+ * @length: size in bytes of the payload in the Reply chunk
438491 *
439492 * Assumptions:
440
- * - Reply can always fit in the provided Reply chunk
441
- */
442
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
443
- unsigned int consumed)
444
-{
445
- __be32 *p;
446
-
447
- /* Find the Reply chunk in the Reply's xprt header.
448
- * RPC-over-RDMA V1 replies never have a Read list.
449
- */
450
- p = rdma_resp + rpcrdma_fixed_maxsz + 1;
451
-
452
- /* Skip past Write list */
453
- while (*p++ != xdr_zero)
454
- p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
455
-
456
- xdr_encode_write_chunk(p, rp_ch, consumed);
457
-}
458
-
459
-/* Parse the RPC Call's transport header.
460
- */
461
-static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
462
- __be32 **write, __be32 **reply)
463
-{
464
- __be32 *p;
465
-
466
- p = rdma_argp + rpcrdma_fixed_maxsz;
467
-
468
- /* Read list */
469
- while (*p++ != xdr_zero)
470
- p += 5;
471
-
472
- /* Write list */
473
- if (*p != xdr_zero) {
474
- *write = p;
475
- while (*p++ != xdr_zero)
476
- p += 1 + be32_to_cpu(*p) * 4;
477
- } else {
478
- *write = NULL;
479
- p++;
480
- }
481
-
482
- /* Reply chunk */
483
- if (*p != xdr_zero)
484
- *reply = p;
485
- else
486
- *reply = NULL;
487
-}
488
-
489
-/* RPC-over-RDMA Version One private extension: Remote Invalidation.
490
- * Responder's choice: requester signals it can handle Send With
491
- * Invalidate, and responder chooses one rkey to invalidate.
493
+ * - Reply can always fit in the client-provided Reply chunk
492494 *
493
- * Find a candidate rkey to invalidate when sending a reply. Picks the
494
- * first R_key it finds in the chunk lists.
495
- *
496
- * Returns zero if RPC's chunk lists are empty.
495
+ * Return values:
496
+ * On success, returns length in bytes of the Reply XDR buffer
497
+ * that was consumed by the Reply's Reply chunk
498
+ * %-EMSGSIZE on XDR buffer overflow
497499 */
498
-static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
499
- __be32 *wr_lst, __be32 *rp_ch)
500
+static ssize_t
501
+svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
502
+ struct svc_rdma_send_ctxt *sctxt,
503
+ unsigned int length)
500504 {
501
- __be32 *p;
502
-
503
- p = rdma_argp + rpcrdma_fixed_maxsz;
504
- if (*p != xdr_zero)
505
- p += 2;
506
- else if (wr_lst && be32_to_cpup(wr_lst + 1))
507
- p = wr_lst + 2;
508
- else if (rp_ch && be32_to_cpup(rp_ch + 1))
509
- p = rp_ch + 2;
510
- else
511
- return 0;
512
- return be32_to_cpup(p);
505
+ return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
506
+ length);
513507 }
514508
515509 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
....@@ -522,6 +516,7 @@
522516 dma_addr_t dma_addr;
523517
524518 dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
519
+ trace_svcrdma_dma_map_page(rdma, dma_addr, len);
525520 if (ib_dma_mapping_error(dev, dma_addr))
526521 goto out_maperr;
527522
....@@ -531,7 +526,6 @@
531526 return 0;
532527
533528 out_maperr:
534
- trace_svcrdma_dma_map_page(rdma, page);
535529 return -EIO;
536530 }
537531
....@@ -548,38 +542,36 @@
548542 }
549543
550544 /**
551
- * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
545
+ * svc_rdma_pull_up_needed - Determine whether to use pull-up
552546 * @rdma: controlling transport
553
- * @ctxt: send_ctxt for the Send WR
554
- * @len: length of transport header
547
+ * @sctxt: send_ctxt for the Send WR
548
+ * @rctxt: Write and Reply chunks provided by client
549
+ * @xdr: xdr_buf containing RPC message to transmit
555550 *
556
- */
557
-void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
558
- struct svc_rdma_send_ctxt *ctxt,
559
- unsigned int len)
560
-{
561
- ctxt->sc_sges[0].length = len;
562
- ctxt->sc_send_wr.num_sge++;
563
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
564
- ctxt->sc_sges[0].addr, len,
565
- DMA_TO_DEVICE);
566
-}
567
-
568
-/* If the xdr_buf has more elements than the device can
569
- * transmit in a single RDMA Send, then the reply will
570
- * have to be copied into a bounce buffer.
551
+ * Returns:
552
+ * %true if pull-up must be used
553
+ * %false otherwise
571554 */
572555 static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
573
- struct xdr_buf *xdr,
574
- __be32 *wr_lst)
556
+ struct svc_rdma_send_ctxt *sctxt,
557
+ const struct svc_rdma_recv_ctxt *rctxt,
558
+ struct xdr_buf *xdr)
575559 {
576560 int elements;
577561
562
+ /* For small messages, copying bytes is cheaper than DMA mapping.
563
+ */
564
+ if (sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
565
+ return true;
566
+
567
+ /* Check whether the xdr_buf has more elements than can
568
+ * fit in a single RDMA Send.
569
+ */
578570 /* xdr->head */
579571 elements = 1;
580572
581573 /* xdr->pages */
582
- if (!wr_lst) {
574
+ if (!rctxt || !rctxt->rc_write_list) {
583575 unsigned int remaining;
584576 unsigned long pageoff;
585577
....@@ -601,29 +593,36 @@
601593 return elements >= rdma->sc_max_send_sges;
602594 }
603595
604
-/* The device is not capable of sending the reply directly.
605
- * Assemble the elements of @xdr into the transport header
606
- * buffer.
596
+/**
597
+ * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
598
+ * @rdma: controlling transport
599
+ * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
600
+ * @rctxt: Write and Reply chunks provided by client
601
+ * @xdr: prepared xdr_buf containing RPC message
602
+ *
603
+ * The device is not capable of sending the reply directly.
604
+ * Assemble the elements of @xdr into the transport header buffer.
605
+ *
606
+ * Returns zero on success, or a negative errno on failure.
607607 */
608608 static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
609
- struct svc_rdma_send_ctxt *ctxt,
610
- struct xdr_buf *xdr, __be32 *wr_lst)
609
+ struct svc_rdma_send_ctxt *sctxt,
610
+ const struct svc_rdma_recv_ctxt *rctxt,
611
+ const struct xdr_buf *xdr)
611612 {
612613 unsigned char *dst, *tailbase;
613614 unsigned int taillen;
614615
615
- dst = ctxt->sc_xprt_buf;
616
- dst += ctxt->sc_sges[0].length;
617
-
616
+ dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
618617 memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
619618 dst += xdr->head[0].iov_len;
620619
621620 tailbase = xdr->tail[0].iov_base;
622621 taillen = xdr->tail[0].iov_len;
623
- if (wr_lst) {
622
+ if (rctxt && rctxt->rc_write_list) {
624623 u32 xdrpad;
625624
626
- xdrpad = xdr_padsize(xdr->page_len);
625
+ xdrpad = xdr_pad_size(xdr->page_len);
627626 if (taillen && xdrpad) {
628627 tailbase += xdrpad;
629628 taillen -= xdrpad;
....@@ -650,29 +649,26 @@
650649 if (taillen)
651650 memcpy(dst, tailbase, taillen);
652651
653
- ctxt->sc_sges[0].length += xdr->len;
654
- ib_dma_sync_single_for_device(rdma->sc_pd->device,
655
- ctxt->sc_sges[0].addr,
656
- ctxt->sc_sges[0].length,
657
- DMA_TO_DEVICE);
658
-
652
+ sctxt->sc_sges[0].length += xdr->len;
653
+ trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
659654 return 0;
660655 }
661656
662
-/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
657
+/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
663658 * @rdma: controlling transport
664
- * @ctxt: send_ctxt for the Send WR
659
+ * @sctxt: send_ctxt for the Send WR
660
+ * @rctxt: Write and Reply chunks provided by client
665661 * @xdr: prepared xdr_buf containing RPC message
666
- * @wr_lst: pointer to Call header's Write list, or NULL
667662 *
668663 * Load the xdr_buf into the ctxt's sge array, and DMA map each
669
- * element as it is added.
664
+ * element as it is added. The Send WR's num_sge field is set.
670665 *
671666 * Returns zero on success, or a negative errno on failure.
672667 */
673668 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
674
- struct svc_rdma_send_ctxt *ctxt,
675
- struct xdr_buf *xdr, __be32 *wr_lst)
669
+ struct svc_rdma_send_ctxt *sctxt,
670
+ const struct svc_rdma_recv_ctxt *rctxt,
671
+ struct xdr_buf *xdr)
676672 {
677673 unsigned int len, remaining;
678674 unsigned long page_off;
....@@ -681,11 +677,24 @@
681677 u32 xdr_pad;
682678 int ret;
683679
684
- if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst))
685
- return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
680
+ /* Set up the (persistently-mapped) transport header SGE. */
681
+ sctxt->sc_send_wr.num_sge = 1;
682
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
686683
687
- ++ctxt->sc_cur_sge_no;
688
- ret = svc_rdma_dma_map_buf(rdma, ctxt,
684
+ /* If there is a Reply chunk, nothing follows the transport
685
+ * header, and we're done here.
686
+ */
687
+ if (rctxt && rctxt->rc_reply_chunk)
688
+ return 0;
689
+
690
+ /* For pull-up, svc_rdma_send() will sync the transport header.
691
+ * No additional DMA mapping is necessary.
692
+ */
693
+ if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
694
+ return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
695
+
696
+ ++sctxt->sc_cur_sge_no;
697
+ ret = svc_rdma_dma_map_buf(rdma, sctxt,
689698 xdr->head[0].iov_base,
690699 xdr->head[0].iov_len);
691700 if (ret < 0)
....@@ -696,10 +705,10 @@
696705 * have added XDR padding in the tail buffer, and that
697706 * should not be included inline.
698707 */
699
- if (wr_lst) {
708
+ if (rctxt && rctxt->rc_write_list) {
700709 base = xdr->tail[0].iov_base;
701710 len = xdr->tail[0].iov_len;
702
- xdr_pad = xdr_padsize(xdr->page_len);
711
+ xdr_pad = xdr_pad_size(xdr->page_len);
703712
704713 if (len && xdr_pad) {
705714 base += xdr_pad;
....@@ -715,8 +724,8 @@
715724 while (remaining) {
716725 len = min_t(u32, PAGE_SIZE - page_off, remaining);
717726
718
- ++ctxt->sc_cur_sge_no;
719
- ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++,
727
+ ++sctxt->sc_cur_sge_no;
728
+ ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++,
720729 page_off, len);
721730 if (ret < 0)
722731 return ret;
....@@ -729,8 +738,8 @@
729738 len = xdr->tail[0].iov_len;
730739 tail:
731740 if (len) {
732
- ++ctxt->sc_cur_sge_no;
733
- ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len);
741
+ ++sctxt->sc_cur_sge_no;
742
+ ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len);
734743 if (ret < 0)
735744 return ret;
736745 }
....@@ -768,7 +777,7 @@
768777 *
769778 * RDMA Send is the last step of transmitting an RPC reply. Pages
770779 * involved in the earlier RDMA Writes are here transferred out
771
- * of the rqstp and into the ctxt's page array. These pages are
780
+ * of the rqstp and into the sctxt's page array. These pages are
772781 * DMA unmapped by each Write completion, but the subsequent Send
773782 * completion finally releases these pages.
774783 *
....@@ -776,69 +785,94 @@
776785 * - The Reply's transport header will never be larger than a page.
777786 */
778787 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
779
- struct svc_rdma_send_ctxt *ctxt,
780
- __be32 *rdma_argp,
781
- struct svc_rqst *rqstp,
782
- __be32 *wr_lst, __be32 *rp_ch)
783
-{
784
- int ret;
785
-
786
- if (!rp_ch) {
787
- ret = svc_rdma_map_reply_msg(rdma, ctxt,
788
- &rqstp->rq_res, wr_lst);
789
- if (ret < 0)
790
- return ret;
791
- }
792
-
793
- svc_rdma_save_io_pages(rqstp, ctxt);
794
-
795
- ctxt->sc_send_wr.opcode = IB_WR_SEND;
796
- if (rdma->sc_snd_w_inv) {
797
- ctxt->sc_send_wr.ex.invalidate_rkey =
798
- svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
799
- if (ctxt->sc_send_wr.ex.invalidate_rkey)
800
- ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
801
- }
802
- dprintk("svcrdma: posting Send WR with %u sge(s)\n",
803
- ctxt->sc_send_wr.num_sge);
804
- return svc_rdma_send(rdma, &ctxt->sc_send_wr);
805
-}
806
-
807
-/* Given the client-provided Write and Reply chunks, the server was not
808
- * able to form a complete reply. Return an RDMA_ERROR message so the
809
- * client can retire this RPC transaction. As above, the Send completion
810
- * routine releases payload pages that were part of a previous RDMA Write.
811
- *
812
- * Remote Invalidation is skipped for simplicity.
813
- */
814
-static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
815
- struct svc_rdma_send_ctxt *ctxt,
788
+ struct svc_rdma_send_ctxt *sctxt,
789
+ const struct svc_rdma_recv_ctxt *rctxt,
816790 struct svc_rqst *rqstp)
817791 {
818
- __be32 *p;
819792 int ret;
820793
821
- p = ctxt->sc_xprt_buf;
822
- trace_svcrdma_err_chunk(*p);
823
- p += 3;
824
- *p++ = rdma_error;
825
- *p = err_chunk;
826
- svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
827
-
828
- svc_rdma_save_io_pages(rqstp, ctxt);
829
-
830
- ctxt->sc_send_wr.opcode = IB_WR_SEND;
831
- ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
832
- if (ret) {
833
- svc_rdma_send_ctxt_put(rdma, ctxt);
794
+ ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
795
+ if (ret < 0)
834796 return ret;
835
- }
836797
837
- return 0;
798
+ svc_rdma_save_io_pages(rqstp, sctxt);
799
+
800
+ if (rctxt->rc_inv_rkey) {
801
+ sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
802
+ sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
803
+ } else {
804
+ sctxt->sc_send_wr.opcode = IB_WR_SEND;
805
+ }
806
+ return svc_rdma_send(rdma, sctxt);
838807 }
839808
840
-void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
809
+/**
810
+ * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
811
+ * @rdma: controlling transport context
812
+ * @sctxt: Send context for the response
813
+ * @rctxt: Receive context for incoming bad message
814
+ * @status: negative errno indicating error that occurred
815
+ *
816
+ * Given the client-provided Read, Write, and Reply chunks, the
817
+ * server was not able to parse the Call or form a complete Reply.
818
+ * Return an RDMA_ERROR message so the client can retire the RPC
819
+ * transaction.
820
+ *
821
+ * The caller does not have to release @sctxt. It is released by
822
+ * Send completion, or by this function on error.
823
+ */
824
+void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
825
+ struct svc_rdma_send_ctxt *sctxt,
826
+ struct svc_rdma_recv_ctxt *rctxt,
827
+ int status)
841828 {
829
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
830
+ __be32 *p;
831
+
832
+ rpcrdma_set_xdrlen(&sctxt->sc_hdrbuf, 0);
833
+ xdr_init_encode(&sctxt->sc_stream, &sctxt->sc_hdrbuf,
834
+ sctxt->sc_xprt_buf, NULL);
835
+
836
+ p = xdr_reserve_space(&sctxt->sc_stream,
837
+ rpcrdma_fixed_maxsz * sizeof(*p));
838
+ if (!p)
839
+ goto put_ctxt;
840
+
841
+ *p++ = *rdma_argp;
842
+ *p++ = *(rdma_argp + 1);
843
+ *p++ = rdma->sc_fc_credits;
844
+ *p = rdma_error;
845
+
846
+ switch (status) {
847
+ case -EPROTONOSUPPORT:
848
+ p = xdr_reserve_space(&sctxt->sc_stream, 3 * sizeof(*p));
849
+ if (!p)
850
+ goto put_ctxt;
851
+
852
+ *p++ = err_vers;
853
+ *p++ = rpcrdma_version;
854
+ *p = rpcrdma_version;
855
+ trace_svcrdma_err_vers(*rdma_argp);
856
+ break;
857
+ default:
858
+ p = xdr_reserve_space(&sctxt->sc_stream, sizeof(*p));
859
+ if (!p)
860
+ goto put_ctxt;
861
+
862
+ *p = err_chunk;
863
+ trace_svcrdma_err_chunk(*rdma_argp);
864
+ }
865
+
866
+ /* Remote Invalidation is skipped for simplicity. */
867
+ sctxt->sc_send_wr.num_sge = 1;
868
+ sctxt->sc_send_wr.opcode = IB_WR_SEND;
869
+ sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
870
+ if (svc_rdma_send(rdma, sctxt))
871
+ goto put_ctxt;
872
+ return;
873
+
874
+put_ctxt:
875
+ svc_rdma_send_ctxt_put(rdma, sctxt);
842876 }
843877
844878 /**
....@@ -859,54 +893,68 @@
859893 struct svcxprt_rdma *rdma =
860894 container_of(xprt, struct svcxprt_rdma, sc_xprt);
861895 struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
862
- __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
896
+ __be32 *rdma_argp = rctxt->rc_recv_buf;
897
+ __be32 *wr_lst = rctxt->rc_write_list;
898
+ __be32 *rp_ch = rctxt->rc_reply_chunk;
863899 struct xdr_buf *xdr = &rqstp->rq_res;
864900 struct svc_rdma_send_ctxt *sctxt;
901
+ __be32 *p;
865902 int ret;
866903
867
- rdma_argp = rctxt->rc_recv_buf;
868
- svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
904
+ ret = -ENOTCONN;
905
+ if (svc_xprt_is_dead(xprt))
906
+ goto err0;
869907
870
- /* Create the RDMA response header. xprt->xpt_mutex,
871
- * acquired in svc_send(), serializes RPC replies. The
872
- * code path below that inserts the credit grant value
873
- * into each transport header runs only inside this
874
- * critical section.
875
- */
876908 ret = -ENOMEM;
877909 sctxt = svc_rdma_send_ctxt_get(rdma);
878910 if (!sctxt)
879911 goto err0;
880
- rdma_resp = sctxt->sc_xprt_buf;
881912
882
- p = rdma_resp;
913
+ p = xdr_reserve_space(&sctxt->sc_stream,
914
+ rpcrdma_fixed_maxsz * sizeof(*p));
915
+ if (!p)
916
+ goto err0;
883917 *p++ = *rdma_argp;
884918 *p++ = *(rdma_argp + 1);
885919 *p++ = rdma->sc_fc_credits;
886
- *p++ = rp_ch ? rdma_nomsg : rdma_msg;
920
+ *p = rp_ch ? rdma_nomsg : rdma_msg;
887921
888
- /* Start with empty chunks */
889
- *p++ = xdr_zero;
890
- *p++ = xdr_zero;
891
- *p = xdr_zero;
892
-
922
+ if (svc_rdma_encode_read_list(sctxt) < 0)
923
+ goto err0;
893924 if (wr_lst) {
894925 /* XXX: Presume the client sent only one Write chunk */
895
- ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
926
+ unsigned long offset;
927
+ unsigned int length;
928
+
929
+ if (rctxt->rc_read_payload_length) {
930
+ offset = rctxt->rc_read_payload_offset;
931
+ length = rctxt->rc_read_payload_length;
932
+ } else {
933
+ offset = xdr->head[0].iov_len;
934
+ length = xdr->page_len;
935
+ }
936
+ ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
937
+ length);
896938 if (ret < 0)
897939 goto err2;
898
- svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
940
+ if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
941
+ goto err0;
942
+ } else {
943
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
944
+ goto err0;
899945 }
900946 if (rp_ch) {
901
- ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
947
+ ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
902948 if (ret < 0)
903949 goto err2;
904
- svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
950
+ if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
951
+ goto err0;
952
+ } else {
953
+ if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
954
+ goto err0;
905955 }
906956
907
- svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
908
- ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
909
- wr_lst, rp_ch);
957
+ ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
910958 if (ret < 0)
911959 goto err1;
912960 return 0;
....@@ -915,15 +963,44 @@
915963 if (ret != -E2BIG && ret != -EINVAL)
916964 goto err1;
917965
918
- ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
919
- if (ret < 0)
920
- goto err1;
966
+ /* Send completion releases payload pages that were part
967
+ * of previously posted RDMA Writes.
968
+ */
969
+ svc_rdma_save_io_pages(rqstp, sctxt);
970
+ svc_rdma_send_error_msg(rdma, sctxt, rctxt, ret);
921971 return 0;
922972
923973 err1:
924974 svc_rdma_send_ctxt_put(rdma, sctxt);
925975 err0:
926
- trace_svcrdma_send_failed(rqstp, ret);
976
+ trace_svcrdma_send_err(rqstp, ret);
927977 set_bit(XPT_CLOSE, &xprt->xpt_flags);
928978 return -ENOTCONN;
929979 }
980
+
981
+/**
982
+ * svc_rdma_read_payload - special processing for a READ payload
983
+ * @rqstp: svc_rqst to operate on
984
+ * @offset: payload's byte offset in @xdr
985
+ * @length: size of payload, in bytes
986
+ *
987
+ * Returns zero on success.
988
+ *
989
+ * For the moment, just record the xdr_buf location of the READ
990
+ * payload. svc_rdma_sendto will use that location later when
991
+ * we actually send the payload.
992
+ */
993
+int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
994
+ unsigned int length)
995
+{
996
+ struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
997
+
998
+ /* XXX: Just one READ payload slot for now, since our
999
+ * transport implementation currently supports only one
1000
+ * Write chunk.
1001
+ */
1002
+ rctxt->rc_read_payload_offset = offset;
1003
+ rctxt->rc_read_payload_length = length;
1004
+
1005
+ return 0;
1006
+}