hc
2024-10-22 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5
kernel/net/sunrpc/xprtrdma/svc_rdma_rw.c
....@@ -7,14 +7,12 @@
77
88 #include <rdma/rw.h>
99
10
+#include <linux/sunrpc/xdr.h>
1011 #include <linux/sunrpc/rpc_rdma.h>
1112 #include <linux/sunrpc/svc_rdma.h>
12
-#include <linux/sunrpc/debug.h>
1313
1414 #include "xprt_rdma.h"
1515 #include <trace/events/rpcrdma.h>
16
-
17
-#define RPCDBG_FACILITY RPCDBG_SVCXPRT
1816
1917 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
2018 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
....@@ -39,9 +37,9 @@
3937 struct svc_rdma_rw_ctxt {
4038 struct list_head rw_list;
4139 struct rdma_rw_ctx rw_ctx;
42
- int rw_nents;
40
+ unsigned int rw_nents;
4341 struct sg_table rw_sg_table;
44
- struct scatterlist rw_first_sgl[0];
42
+ struct scatterlist rw_first_sgl[];
4543 };
4644
4745 static inline struct svc_rdma_rw_ctxt *
....@@ -64,28 +62,31 @@
6462 spin_unlock(&rdma->sc_rw_ctxt_lock);
6563 } else {
6664 spin_unlock(&rdma->sc_rw_ctxt_lock);
67
- ctxt = kmalloc(sizeof(*ctxt) +
68
- SG_CHUNK_SIZE * sizeof(struct scatterlist),
65
+ ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
6966 GFP_KERNEL);
7067 if (!ctxt)
71
- goto out;
68
+ goto out_noctx;
7269 INIT_LIST_HEAD(&ctxt->rw_list);
7370 }
7471
7572 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
7673 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
77
- ctxt->rw_sg_table.sgl)) {
78
- kfree(ctxt);
79
- ctxt = NULL;
80
- }
81
-out:
74
+ ctxt->rw_sg_table.sgl,
75
+ SG_CHUNK_SIZE))
76
+ goto out_free;
8277 return ctxt;
78
+
79
+out_free:
80
+ kfree(ctxt);
81
+out_noctx:
82
+ trace_svcrdma_no_rwctx_err(rdma, sges);
83
+ return NULL;
8384 }
8485
8586 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
8687 struct svc_rdma_rw_ctxt *ctxt)
8788 {
88
- sg_free_table_chained(&ctxt->rw_sg_table, true);
89
+ sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
8990
9091 spin_lock(&rdma->sc_rw_ctxt_lock);
9192 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
....@@ -107,8 +108,36 @@
107108 }
108109 }
109110
111
+/**
112
+ * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
113
+ * @rdma: controlling transport instance
114
+ * @ctxt: R/W context to prepare
115
+ * @offset: RDMA offset
116
+ * @handle: RDMA tag/handle
117
+ * @direction: I/O direction
118
+ *
119
+ * Returns on success, the number of WQEs that will be needed
120
+ * on the workqueue, or a negative errno.
121
+ */
122
+static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
123
+ struct svc_rdma_rw_ctxt *ctxt,
124
+ u64 offset, u32 handle,
125
+ enum dma_data_direction direction)
126
+{
127
+ int ret;
128
+
129
+ ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
130
+ ctxt->rw_sg_table.sgl, ctxt->rw_nents,
131
+ 0, offset, handle, direction);
132
+ if (unlikely(ret < 0)) {
133
+ svc_rdma_put_rw_ctxt(rdma, ctxt);
134
+ trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret);
135
+ }
136
+ return ret;
137
+}
138
+
110139 /* A chunk context tracks all I/O for moving one Read or Write
111
- * chunk. This is a a set of rdma_rw's that handle data movement
140
+ * chunk. This is a set of rdma_rw's that handle data movement
112141 * for all segments of one chunk.
113142 *
114143 * These are small, acquired with a single allocator call, and
....@@ -116,17 +145,25 @@
116145 * demand, and not cached.
117146 */
118147 struct svc_rdma_chunk_ctxt {
148
+ struct rpc_rdma_cid cc_cid;
119149 struct ib_cqe cc_cqe;
120150 struct svcxprt_rdma *cc_rdma;
121151 struct list_head cc_rwctxts;
122152 int cc_sqecount;
123153 };
124154
155
+static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
156
+ struct rpc_rdma_cid *cid)
157
+{
158
+ cid->ci_queue_id = rdma->sc_sq_cq->res.id;
159
+ cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
160
+}
161
+
125162 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
126163 struct svc_rdma_chunk_ctxt *cc)
127164 {
165
+ svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
128166 cc->cc_rdma = rdma;
129
- svc_xprt_get(&rdma->sc_xprt);
130167
131168 INIT_LIST_HEAD(&cc->cc_rwctxts);
132169 cc->cc_sqecount = 0;
....@@ -146,7 +183,6 @@
146183 ctxt->rw_nents, dir);
147184 svc_rdma_put_rw_ctxt(rdma, ctxt);
148185 }
149
- svc_xprt_put(&rdma->sc_xprt);
150186 }
151187
152188 /* State for sending a Write or Reply chunk.
....@@ -208,18 +244,13 @@
208244 struct svc_rdma_write_info *info =
209245 container_of(cc, struct svc_rdma_write_info, wi_cc);
210246
211
- trace_svcrdma_wc_write(wc);
247
+ trace_svcrdma_wc_write(wc, &cc->cc_cid);
212248
213249 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
214250 wake_up(&rdma->sc_send_wait);
215251
216
- if (unlikely(wc->status != IB_WC_SUCCESS)) {
252
+ if (unlikely(wc->status != IB_WC_SUCCESS))
217253 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
218
- if (wc->status != IB_WC_WR_FLUSH_ERR)
219
- pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
220
- ib_wc_status_msg(wc->status),
221
- wc->status, wc->vendor_err);
222
- }
223254
224255 svc_rdma_write_info_free(info);
225256 }
....@@ -271,25 +302,22 @@
271302 struct svc_rdma_read_info *info =
272303 container_of(cc, struct svc_rdma_read_info, ri_cc);
273304
274
- trace_svcrdma_wc_read(wc);
305
+ trace_svcrdma_wc_read(wc, &cc->cc_cid);
275306
276307 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
277308 wake_up(&rdma->sc_send_wait);
278309
279310 if (unlikely(wc->status != IB_WC_SUCCESS)) {
280311 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
281
- if (wc->status != IB_WC_WR_FLUSH_ERR)
282
- pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
283
- ib_wc_status_msg(wc->status),
284
- wc->status, wc->vendor_err);
285312 svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt);
286313 } else {
287314 spin_lock(&rdma->sc_rq_dto_lock);
288315 list_add_tail(&info->ri_readctxt->rc_list,
289316 &rdma->sc_read_complete_q);
317
+ /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
318
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
290319 spin_unlock(&rdma->sc_rq_dto_lock);
291320
292
- set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
293321 svc_xprt_enqueue(&rdma->sc_xprt);
294322 }
295323
....@@ -330,6 +358,7 @@
330358 do {
331359 if (atomic_sub_return(cc->cc_sqecount,
332360 &rdma->sc_sq_avail) > 0) {
361
+ trace_svcrdma_post_chunk(&cc->cc_cid, cc->cc_sqecount);
333362 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
334363 if (ret)
335364 break;
....@@ -421,35 +450,32 @@
421450 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
422451 do {
423452 unsigned int write_len;
424
- u32 seg_length, seg_handle;
425
- u64 seg_offset;
453
+ u32 handle, length;
454
+ u64 offset;
426455
427456 if (info->wi_seg_no >= info->wi_nsegs)
428457 goto out_overflow;
429458
430
- seg_handle = be32_to_cpup(seg);
431
- seg_length = be32_to_cpup(seg + 1);
432
- xdr_decode_hyper(seg + 2, &seg_offset);
433
- seg_offset += info->wi_seg_off;
459
+ xdr_decode_rdma_segment(seg, &handle, &length, &offset);
460
+ offset += info->wi_seg_off;
434461
435
- write_len = min(remaining, seg_length - info->wi_seg_off);
462
+ write_len = min(remaining, length - info->wi_seg_off);
436463 ctxt = svc_rdma_get_rw_ctxt(rdma,
437464 (write_len >> PAGE_SHIFT) + 2);
438465 if (!ctxt)
439
- goto out_noctx;
466
+ return -ENOMEM;
440467
441468 constructor(info, write_len, ctxt);
442
- ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
443
- rdma->sc_port_num, ctxt->rw_sg_table.sgl,
444
- ctxt->rw_nents, 0, seg_offset,
445
- seg_handle, DMA_TO_DEVICE);
469
+ ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, handle,
470
+ DMA_TO_DEVICE);
446471 if (ret < 0)
447
- goto out_initerr;
472
+ return -EIO;
448473
449
- trace_svcrdma_encode_wseg(seg_handle, write_len, seg_offset);
474
+ trace_svcrdma_send_wseg(handle, write_len, offset);
475
+
450476 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
451477 cc->cc_sqecount += ret;
452
- if (write_len == seg_length - info->wi_seg_off) {
478
+ if (write_len == length - info->wi_seg_off) {
453479 seg += 4;
454480 info->wi_seg_no++;
455481 info->wi_seg_off = 0;
....@@ -462,18 +488,9 @@
462488 return 0;
463489
464490 out_overflow:
465
- dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
466
- info->wi_nsegs);
491
+ trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
492
+ info->wi_nsegs);
467493 return -E2BIG;
468
-
469
-out_noctx:
470
- dprintk("svcrdma: no R/W ctxs available\n");
471
- return -ENOMEM;
472
-
473
-out_initerr:
474
- svc_rdma_put_rw_ctxt(rdma, ctxt);
475
- trace_svcrdma_dma_map_rwctx(rdma, ret);
476
- return -EIO;
477494 }
478495
479496 /* Send one of an xdr_buf's kvecs by itself. To send a Reply
....@@ -489,18 +506,19 @@
489506 vec->iov_len);
490507 }
491508
492
-/* Send an xdr_buf's page list by itself. A Write chunk is
493
- * just the page list. a Reply chunk is the head, page list,
494
- * and tail. This function is shared between the two types
495
- * of chunk.
509
+/* Send an xdr_buf's page list by itself. A Write chunk is just
510
+ * the page list. A Reply chunk is @xdr's head, page list, and
511
+ * tail. This function is shared between the two types of chunk.
496512 */
497513 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
498
- struct xdr_buf *xdr)
514
+ struct xdr_buf *xdr,
515
+ unsigned int offset,
516
+ unsigned long length)
499517 {
500518 info->wi_xdr = xdr;
501
- info->wi_next_off = 0;
519
+ info->wi_next_off = offset - xdr->head[0].iov_len;
502520 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
503
- xdr->page_len);
521
+ length);
504522 }
505523
506524 /**
....@@ -508,6 +526,8 @@
508526 * @rdma: controlling RDMA transport
509527 * @wr_ch: Write chunk provided by client
510528 * @xdr: xdr_buf containing the data payload
529
+ * @offset: payload's byte offset in @xdr
530
+ * @length: size of payload, in bytes
511531 *
512532 * Returns a non-negative number of bytes the chunk consumed, or
513533 * %-E2BIG if the payload was larger than the Write chunk,
....@@ -517,19 +537,20 @@
517537 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
518538 */
519539 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
520
- struct xdr_buf *xdr)
540
+ struct xdr_buf *xdr,
541
+ unsigned int offset, unsigned long length)
521542 {
522543 struct svc_rdma_write_info *info;
523544 int ret;
524545
525
- if (!xdr->page_len)
546
+ if (!length)
526547 return 0;
527548
528549 info = svc_rdma_write_info_alloc(rdma, wr_ch);
529550 if (!info)
530551 return -ENOMEM;
531552
532
- ret = svc_rdma_send_xdr_pagelist(info, xdr);
553
+ ret = svc_rdma_send_xdr_pagelist(info, xdr, offset, length);
533554 if (ret < 0)
534555 goto out_err;
535556
....@@ -537,8 +558,8 @@
537558 if (ret < 0)
538559 goto out_err;
539560
540
- trace_svcrdma_encode_write(xdr->page_len);
541
- return xdr->page_len;
561
+ trace_svcrdma_send_write_chunk(xdr->page_len);
562
+ return length;
542563
543564 out_err:
544565 svc_rdma_write_info_free(info);
....@@ -548,8 +569,7 @@
548569 /**
549570 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
550571 * @rdma: controlling RDMA transport
551
- * @rp_ch: Reply chunk provided by client
552
- * @writelist: true if client provided a Write list
572
+ * @rctxt: Write and Reply chunks from client
553573 * @xdr: xdr_buf containing an RPC Reply
554574 *
555575 * Returns a non-negative number of bytes the chunk consumed, or
....@@ -559,13 +579,14 @@
559579 * %-ENOTCONN if posting failed (connection is lost),
560580 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
561581 */
562
-int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
563
- bool writelist, struct xdr_buf *xdr)
582
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
583
+ const struct svc_rdma_recv_ctxt *rctxt,
584
+ struct xdr_buf *xdr)
564585 {
565586 struct svc_rdma_write_info *info;
566587 int consumed, ret;
567588
568
- info = svc_rdma_write_info_alloc(rdma, rp_ch);
589
+ info = svc_rdma_write_info_alloc(rdma, rctxt->rc_reply_chunk);
569590 if (!info)
570591 return -ENOMEM;
571592
....@@ -577,8 +598,10 @@
577598 /* Send the page list in the Reply chunk only if the
578599 * client did not provide Write chunks.
579600 */
580
- if (!writelist && xdr->page_len) {
581
- ret = svc_rdma_send_xdr_pagelist(info, xdr);
601
+ if (!rctxt->rc_write_list && xdr->page_len) {
602
+ ret = svc_rdma_send_xdr_pagelist(info, xdr,
603
+ xdr->head[0].iov_len,
604
+ xdr->page_len);
582605 if (ret < 0)
583606 goto out_err;
584607 consumed += xdr->page_len;
....@@ -595,7 +618,7 @@
595618 if (ret < 0)
596619 goto out_err;
597620
598
- trace_svcrdma_encode_reply(consumed);
621
+ trace_svcrdma_send_reply_chunk(consumed);
599622 return consumed;
600623
601624 out_err:
....@@ -617,7 +640,7 @@
617640 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
618641 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
619642 if (!ctxt)
620
- goto out_noctx;
643
+ return -ENOMEM;
621644 ctxt->rw_nents = sge_no;
622645
623646 sg = ctxt->rw_sg_table.sgl;
....@@ -647,29 +670,18 @@
647670 goto out_overrun;
648671 }
649672
650
- ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
651
- cc->cc_rdma->sc_port_num,
652
- ctxt->rw_sg_table.sgl, ctxt->rw_nents,
653
- 0, offset, rkey, DMA_FROM_DEVICE);
673
+ ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, offset, rkey,
674
+ DMA_FROM_DEVICE);
654675 if (ret < 0)
655
- goto out_initerr;
676
+ return -EIO;
656677
657678 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
658679 cc->cc_sqecount += ret;
659680 return 0;
660681
661
-out_noctx:
662
- dprintk("svcrdma: no R/W ctxs available\n");
663
- return -ENOMEM;
664
-
665682 out_overrun:
666
- dprintk("svcrdma: request overruns rq_pages\n");
683
+ trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno);
667684 return -EINVAL;
668
-
669
-out_initerr:
670
- trace_svcrdma_dma_map_rwctx(cc->cc_rdma, ret);
671
- svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
672
- return -EIO;
673685 }
674686
675687 /* Walk the segments in the Read chunk starting at @p and construct
....@@ -684,21 +696,17 @@
684696 ret = -EINVAL;
685697 info->ri_chunklen = 0;
686698 while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
687
- u32 rs_handle, rs_length;
688
- u64 rs_offset;
699
+ u32 handle, length;
700
+ u64 offset;
689701
690
- rs_handle = be32_to_cpup(p++);
691
- rs_length = be32_to_cpup(p++);
692
- p = xdr_decode_hyper(p, &rs_offset);
693
-
694
- ret = svc_rdma_build_read_segment(info, rqstp,
695
- rs_handle, rs_length,
696
- rs_offset);
702
+ p = xdr_decode_rdma_segment(p, &handle, &length, &offset);
703
+ ret = svc_rdma_build_read_segment(info, rqstp, handle, length,
704
+ offset);
697705 if (ret < 0)
698706 break;
699707
700
- trace_svcrdma_encode_rseg(rs_handle, rs_length, rs_offset);
701
- info->ri_chunklen += rs_length;
708
+ trace_svcrdma_send_rseg(handle, length, offset);
709
+ info->ri_chunklen += length;
702710 }
703711
704712 return ret;
....@@ -722,7 +730,7 @@
722730 if (ret < 0)
723731 goto out;
724732
725
- trace_svcrdma_encode_read(info->ri_chunklen, info->ri_position);
733
+ trace_svcrdma_send_read_chunk(info->ri_chunklen, info->ri_position);
726734
727735 head->rc_hdr_count = 0;
728736
....@@ -778,7 +786,7 @@
778786 if (ret < 0)
779787 goto out;
780788
781
- trace_svcrdma_encode_pzr(info->ri_chunklen);
789
+ trace_svcrdma_send_pzr(info->ri_chunklen);
782790
783791 head->rc_arg.len += info->ri_chunklen;
784792 head->rc_arg.buflen += info->ri_chunklen;