hc
2024-02-20 102a0743326a03cd1a1202ceda21e175b7d3575c
kernel/net/sunrpc/xprtrdma/rpc_rdma.c
....@@ -71,16 +71,13 @@
7171 size = RPCRDMA_HDRLEN_MIN;
7272
7373 /* Maximum Read list size */
74
- maxsegs += 2; /* segment for head and tail buffers */
75
- size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
74
+ size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
7675
7776 /* Minimal Read chunk size */
7877 size += sizeof(__be32); /* segment count */
7978 size += rpcrdma_segment_maxsz * sizeof(__be32);
8079 size += sizeof(__be32); /* list discriminator */
8180
82
- dprintk("RPC: %s: max call header size = %u\n",
83
- __func__, size);
8481 return size;
8582 }
8683
....@@ -97,26 +94,29 @@
9794 size = RPCRDMA_HDRLEN_MIN;
9895
9996 /* Maximum Write list size */
100
- maxsegs += 2; /* segment for head and tail buffers */
101
- size = sizeof(__be32); /* segment count */
97
+ size += sizeof(__be32); /* segment count */
10298 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
10399 size += sizeof(__be32); /* list discriminator */
104100
105
- dprintk("RPC: %s: max reply header size = %u\n",
106
- __func__, size);
107101 return size;
108102 }
109103
110
-void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
104
+/**
105
+ * rpcrdma_set_max_header_sizes - Initialize inline payload sizes
106
+ * @ep: endpoint to initialize
107
+ *
108
+ * The max_inline fields contain the maximum size of an RPC message
109
+ * so the marshaling code doesn't have to repeat this calculation
110
+ * for every RPC.
111
+ */
112
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
111113 {
112
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
113
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
114
- unsigned int maxsegs = ia->ri_max_segs;
114
+ unsigned int maxsegs = ep->re_max_rdma_segs;
115115
116
- ia->ri_max_inline_write = cdata->inline_wsize -
117
- rpcrdma_max_call_header_size(maxsegs);
118
- ia->ri_max_inline_read = cdata->inline_rsize -
119
- rpcrdma_max_reply_header_size(maxsegs);
116
+ ep->re_max_inline_send =
117
+ ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs);
118
+ ep->re_max_inline_recv =
119
+ ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs);
120120 }
121121
122122 /* The client can send a request inline as long as the RPCRDMA header
....@@ -131,9 +131,10 @@
131131 struct rpc_rqst *rqst)
132132 {
133133 struct xdr_buf *xdr = &rqst->rq_snd_buf;
134
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
134135 unsigned int count, remaining, offset;
135136
136
- if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
137
+ if (xdr->len > ep->re_max_inline_send)
137138 return false;
138139
139140 if (xdr->page_len) {
....@@ -144,7 +145,7 @@
144145 remaining -= min_t(unsigned int,
145146 PAGE_SIZE - offset, remaining);
146147 offset = 0;
147
- if (++count > r_xprt->rx_ia.ri_max_send_sges)
148
+ if (++count > ep->re_attr.cap.max_send_sge)
148149 return false;
149150 }
150151 }
....@@ -161,9 +162,46 @@
161162 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
162163 struct rpc_rqst *rqst)
163164 {
164
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
165
+ return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
166
+}
165167
166
- return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
168
+/* The client is required to provide a Reply chunk if the maximum
169
+ * size of the non-payload part of the RPC Reply is larger than
170
+ * the inline threshold.
171
+ */
172
+static bool
173
+rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
174
+ const struct rpc_rqst *rqst)
175
+{
176
+ const struct xdr_buf *buf = &rqst->rq_rcv_buf;
177
+
178
+ return (buf->head[0].iov_len + buf->tail[0].iov_len) <
179
+ r_xprt->rx_ep->re_max_inline_recv;
180
+}
181
+
182
+/* ACL likes to be lazy in allocating pages. For TCP, these
183
+ * pages can be allocated during receive processing. Not true
184
+ * for RDMA, which must always provision receive buffers
185
+ * up front.
186
+ */
187
+static noinline int
188
+rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
189
+{
190
+ struct page **ppages;
191
+ int len;
192
+
193
+ len = buf->page_len;
194
+ ppages = buf->pages + (buf->page_base >> PAGE_SHIFT);
195
+ while (len > 0) {
196
+ if (!*ppages)
197
+ *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN);
198
+ if (!*ppages)
199
+ return -ENOBUFS;
200
+ ppages++;
201
+ len -= PAGE_SIZE;
202
+ }
203
+
204
+ return 0;
167205 }
168206
169207 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
....@@ -220,14 +258,6 @@
220258 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
221259 page_base = offset_in_page(xdrbuf->page_base);
222260 while (len) {
223
- if (unlikely(!*ppages)) {
224
- /* XXX: Certain upper layer operations do
225
- * not provide receive buffer pages.
226
- */
227
- *ppages = alloc_page(GFP_ATOMIC);
228
- if (!*ppages)
229
- return -ENOBUFS;
230
- }
231261 seg->mr_page = *ppages;
232262 seg->mr_offset = (char *)page_base;
233263 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
....@@ -241,7 +271,7 @@
241271 /* When encoding a Read chunk, the tail iovec contains an
242272 * XDR pad and may be omitted.
243273 */
244
- if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
274
+ if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup)
245275 goto out;
246276
247277 /* When encoding a Write chunk, some servers need to see an
....@@ -249,7 +279,7 @@
249279 * layer provides space in the tail iovec that may be used
250280 * for this purpose.
251281 */
252
- if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
282
+ if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup)
253283 goto out;
254284
255285 if (xdrbuf->tail[0].iov_len)
....@@ -261,40 +291,6 @@
261291 return n;
262292 }
263293
264
-static inline int
265
-encode_item_present(struct xdr_stream *xdr)
266
-{
267
- __be32 *p;
268
-
269
- p = xdr_reserve_space(xdr, sizeof(*p));
270
- if (unlikely(!p))
271
- return -EMSGSIZE;
272
-
273
- *p = xdr_one;
274
- return 0;
275
-}
276
-
277
-static inline int
278
-encode_item_not_present(struct xdr_stream *xdr)
279
-{
280
- __be32 *p;
281
-
282
- p = xdr_reserve_space(xdr, sizeof(*p));
283
- if (unlikely(!p))
284
- return -EMSGSIZE;
285
-
286
- *p = xdr_zero;
287
- return 0;
288
-}
289
-
290
-static void
291
-xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
292
-{
293
- *iptr++ = cpu_to_be32(mr->mr_handle);
294
- *iptr++ = cpu_to_be32(mr->mr_length);
295
- xdr_encode_hyper(iptr, mr->mr_offset);
296
-}
297
-
298294 static int
299295 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
300296 {
....@@ -304,7 +300,7 @@
304300 if (unlikely(!p))
305301 return -EMSGSIZE;
306302
307
- xdr_encode_rdma_segment(p, mr);
303
+ xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset);
308304 return 0;
309305 }
310306
....@@ -319,9 +315,34 @@
319315 return -EMSGSIZE;
320316
321317 *p++ = xdr_one; /* Item present */
322
- *p++ = cpu_to_be32(position);
323
- xdr_encode_rdma_segment(p, mr);
318
+ xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length,
319
+ mr->mr_offset);
324320 return 0;
321
+}
322
+
323
+static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt,
324
+ struct rpcrdma_req *req,
325
+ struct rpcrdma_mr_seg *seg,
326
+ int nsegs, bool writing,
327
+ struct rpcrdma_mr **mr)
328
+{
329
+ *mr = rpcrdma_mr_pop(&req->rl_free_mrs);
330
+ if (!*mr) {
331
+ *mr = rpcrdma_mr_get(r_xprt);
332
+ if (!*mr)
333
+ goto out_getmr_err;
334
+ trace_xprtrdma_mr_get(req);
335
+ (*mr)->mr_req = req;
336
+ }
337
+
338
+ rpcrdma_mr_push(*mr, &req->rl_registered);
339
+ return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr);
340
+
341
+out_getmr_err:
342
+ trace_xprtrdma_nomrs(req);
343
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
344
+ rpcrdma_mrs_refresh(r_xprt);
345
+ return ERR_PTR(-EAGAIN);
325346 }
326347
327348 /* Register and XDR encode the Read list. Supports encoding a list of read
....@@ -338,15 +359,19 @@
338359 *
339360 * Only a single @pos value is currently supported.
340361 */
341
-static noinline int
342
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
343
- struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
362
+static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
363
+ struct rpcrdma_req *req,
364
+ struct rpc_rqst *rqst,
365
+ enum rpcrdma_chunktype rtype)
344366 {
345367 struct xdr_stream *xdr = &req->rl_stream;
346368 struct rpcrdma_mr_seg *seg;
347369 struct rpcrdma_mr *mr;
348370 unsigned int pos;
349371 int nsegs;
372
+
373
+ if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
374
+ goto done;
350375
351376 pos = rqst->rq_snd_buf.head[0].iov_len;
352377 if (rtype == rpcrdma_areadch)
....@@ -358,20 +383,21 @@
358383 return nsegs;
359384
360385 do {
361
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
362
- false, &mr);
386
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr);
363387 if (IS_ERR(seg))
364388 return PTR_ERR(seg);
365
- rpcrdma_mr_push(mr, &req->rl_registered);
366389
367390 if (encode_read_segment(xdr, mr, pos) < 0)
368391 return -EMSGSIZE;
369392
370
- trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
393
+ trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs);
371394 r_xprt->rx_stats.read_chunk_count++;
372395 nsegs -= mr->mr_nents;
373396 } while (nsegs);
374397
398
+done:
399
+ if (xdr_stream_encode_item_absent(xdr) < 0)
400
+ return -EMSGSIZE;
375401 return 0;
376402 }
377403
....@@ -390,15 +416,19 @@
390416 *
391417 * Only a single Write chunk is currently supported.
392418 */
393
-static noinline int
394
-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
395
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
419
+static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt,
420
+ struct rpcrdma_req *req,
421
+ struct rpc_rqst *rqst,
422
+ enum rpcrdma_chunktype wtype)
396423 {
397424 struct xdr_stream *xdr = &req->rl_stream;
398425 struct rpcrdma_mr_seg *seg;
399426 struct rpcrdma_mr *mr;
400427 int nsegs, nchunks;
401428 __be32 *segcount;
429
+
430
+ if (wtype != rpcrdma_writech)
431
+ goto done;
402432
403433 seg = req->rl_segments;
404434 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
....@@ -407,7 +437,7 @@
407437 if (nsegs < 0)
408438 return nsegs;
409439
410
- if (encode_item_present(xdr) < 0)
440
+ if (xdr_stream_encode_item_present(xdr) < 0)
411441 return -EMSGSIZE;
412442 segcount = xdr_reserve_space(xdr, sizeof(*segcount));
413443 if (unlikely(!segcount))
....@@ -416,16 +446,14 @@
416446
417447 nchunks = 0;
418448 do {
419
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
420
- true, &mr);
449
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
421450 if (IS_ERR(seg))
422451 return PTR_ERR(seg);
423
- rpcrdma_mr_push(mr, &req->rl_registered);
424452
425453 if (encode_rdma_segment(xdr, mr) < 0)
426454 return -EMSGSIZE;
427455
428
- trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
456
+ trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs);
429457 r_xprt->rx_stats.write_chunk_count++;
430458 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
431459 nchunks++;
....@@ -435,6 +463,9 @@
435463 /* Update count of segments in this Write chunk */
436464 *segcount = cpu_to_be32(nchunks);
437465
466
+done:
467
+ if (xdr_stream_encode_item_absent(xdr) < 0)
468
+ return -EMSGSIZE;
438469 return 0;
439470 }
440471
....@@ -450,9 +481,10 @@
450481 * Returns zero on success, or a negative errno if a failure occurred.
451482 * @xdr is advanced to the next position in the stream.
452483 */
453
-static noinline int
454
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
455
- struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
484
+static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
485
+ struct rpcrdma_req *req,
486
+ struct rpc_rqst *rqst,
487
+ enum rpcrdma_chunktype wtype)
456488 {
457489 struct xdr_stream *xdr = &req->rl_stream;
458490 struct rpcrdma_mr_seg *seg;
....@@ -460,12 +492,18 @@
460492 int nsegs, nchunks;
461493 __be32 *segcount;
462494
495
+ if (wtype != rpcrdma_replych) {
496
+ if (xdr_stream_encode_item_absent(xdr) < 0)
497
+ return -EMSGSIZE;
498
+ return 0;
499
+ }
500
+
463501 seg = req->rl_segments;
464502 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
465503 if (nsegs < 0)
466504 return nsegs;
467505
468
- if (encode_item_present(xdr) < 0)
506
+ if (xdr_stream_encode_item_present(xdr) < 0)
469507 return -EMSGSIZE;
470508 segcount = xdr_reserve_space(xdr, sizeof(*segcount));
471509 if (unlikely(!segcount))
....@@ -474,16 +512,14 @@
474512
475513 nchunks = 0;
476514 do {
477
- seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
478
- true, &mr);
515
+ seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr);
479516 if (IS_ERR(seg))
480517 return PTR_ERR(seg);
481
- rpcrdma_mr_push(mr, &req->rl_registered);
482518
483519 if (encode_rdma_segment(xdr, mr) < 0)
484520 return -EMSGSIZE;
485521
486
- trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
522
+ trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs);
487523 r_xprt->rx_stats.reply_chunk_count++;
488524 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
489525 nchunks++;
....@@ -496,181 +532,265 @@
496532 return 0;
497533 }
498534
535
+static void rpcrdma_sendctx_done(struct kref *kref)
536
+{
537
+ struct rpcrdma_req *req =
538
+ container_of(kref, struct rpcrdma_req, rl_kref);
539
+ struct rpcrdma_rep *rep = req->rl_reply;
540
+
541
+ rpcrdma_complete_rqst(rep);
542
+ rep->rr_rxprt->rx_stats.reply_waits_for_send++;
543
+}
544
+
499545 /**
500
- * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
546
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
501547 * @sc: sendctx containing SGEs to unmap
502548 *
503549 */
504
-void
505
-rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
550
+void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
506551 {
507
- struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
552
+ struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
508553 struct ib_sge *sge;
509
- unsigned int count;
554
+
555
+ if (!sc->sc_unmap_count)
556
+ return;
510557
511558 /* The first two SGEs contain the transport header and
512559 * the inline buffer. These are always left mapped so
513560 * they can be cheaply re-used.
514561 */
515
- sge = &sc->sc_sges[2];
516
- for (count = sc->sc_unmap_count; count; ++sge, --count)
517
- ib_dma_unmap_page(ia->ri_device,
518
- sge->addr, sge->length, DMA_TO_DEVICE);
562
+ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
563
+ ++sge, --sc->sc_unmap_count)
564
+ ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
565
+ DMA_TO_DEVICE);
519566
520
- if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
521
- smp_mb__after_atomic();
522
- wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
523
- }
567
+ kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
524568 }
525569
526570 /* Prepare an SGE for the RPC-over-RDMA transport header.
527571 */
528
-static bool
529
-rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
530
- u32 len)
572
+static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
573
+ struct rpcrdma_req *req, u32 len)
531574 {
532575 struct rpcrdma_sendctx *sc = req->rl_sendctx;
533576 struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
534
- struct ib_sge *sge = sc->sc_sges;
577
+ struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
535578
536
- if (!rpcrdma_dma_map_regbuf(ia, rb))
537
- goto out_regbuf;
538579 sge->addr = rdmab_addr(rb);
539580 sge->length = len;
540581 sge->lkey = rdmab_lkey(rb);
541582
542
- ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
543
- sge->length, DMA_TO_DEVICE);
544
- sc->sc_wr.num_sge++;
583
+ ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
584
+ DMA_TO_DEVICE);
585
+}
586
+
587
+/* The head iovec is straightforward, as it is usually already
588
+ * DMA-mapped. Sync the content that has changed.
589
+ */
590
+static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt,
591
+ struct rpcrdma_req *req, unsigned int len)
592
+{
593
+ struct rpcrdma_sendctx *sc = req->rl_sendctx;
594
+ struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
595
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
596
+
597
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
598
+ return false;
599
+
600
+ sge->addr = rdmab_addr(rb);
601
+ sge->length = len;
602
+ sge->lkey = rdmab_lkey(rb);
603
+
604
+ ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
605
+ DMA_TO_DEVICE);
606
+ return true;
607
+}
608
+
609
+/* If there is a page list present, DMA map and prepare an
610
+ * SGE for each page to be sent.
611
+ */
612
+static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req,
613
+ struct xdr_buf *xdr)
614
+{
615
+ struct rpcrdma_sendctx *sc = req->rl_sendctx;
616
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
617
+ unsigned int page_base, len, remaining;
618
+ struct page **ppages;
619
+ struct ib_sge *sge;
620
+
621
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
622
+ page_base = offset_in_page(xdr->page_base);
623
+ remaining = xdr->page_len;
624
+ while (remaining) {
625
+ sge = &sc->sc_sges[req->rl_wr.num_sge++];
626
+ len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
627
+ sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages,
628
+ page_base, len, DMA_TO_DEVICE);
629
+ if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
630
+ goto out_mapping_err;
631
+
632
+ sge->length = len;
633
+ sge->lkey = rdmab_lkey(rb);
634
+
635
+ sc->sc_unmap_count++;
636
+ ppages++;
637
+ remaining -= len;
638
+ page_base = 0;
639
+ }
640
+
545641 return true;
546642
547
-out_regbuf:
548
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
643
+out_mapping_err:
644
+ trace_xprtrdma_dma_maperr(sge->addr);
549645 return false;
550646 }
551647
552
-/* Prepare the Send SGEs. The head and tail iovec, and each entry
553
- * in the page list, gets its own SGE.
648
+/* The tail iovec may include an XDR pad for the page list,
649
+ * as well as additional content, and may not reside in the
650
+ * same page as the head iovec.
554651 */
555
-static bool
556
-rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
557
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
652
+static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
653
+ struct xdr_buf *xdr,
654
+ unsigned int page_base, unsigned int len)
558655 {
559656 struct rpcrdma_sendctx *sc = req->rl_sendctx;
560
- unsigned int sge_no, page_base, len, remaining;
657
+ struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
561658 struct rpcrdma_regbuf *rb = req->rl_sendbuf;
562
- struct ib_device *device = ia->ri_device;
563
- struct ib_sge *sge = sc->sc_sges;
564
- u32 lkey = ia->ri_pd->local_dma_lkey;
565
- struct page *page, **ppages;
659
+ struct page *page = virt_to_page(xdr->tail[0].iov_base);
566660
567
- /* The head iovec is straightforward, as it is already
568
- * DMA-mapped. Sync the content that has changed.
569
- */
570
- if (!rpcrdma_dma_map_regbuf(ia, rb))
571
- goto out_regbuf;
572
- sge_no = 1;
573
- sge[sge_no].addr = rdmab_addr(rb);
574
- sge[sge_no].length = xdr->head[0].iov_len;
575
- sge[sge_no].lkey = rdmab_lkey(rb);
576
- ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
577
- sge[sge_no].length, DMA_TO_DEVICE);
661
+ sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len,
662
+ DMA_TO_DEVICE);
663
+ if (ib_dma_mapping_error(rdmab_device(rb), sge->addr))
664
+ goto out_mapping_err;
578665
579
- /* If there is a Read chunk, the page list is being handled
580
- * via explicit RDMA, and thus is skipped here. However, the
581
- * tail iovec may include an XDR pad for the page list, as
582
- * well as additional content, and may not reside in the
583
- * same page as the head iovec.
584
- */
585
- if (rtype == rpcrdma_readch) {
586
- len = xdr->tail[0].iov_len;
587
-
588
- /* Do not include the tail if it is only an XDR pad */
589
- if (len < 4)
590
- goto out;
591
-
592
- page = virt_to_page(xdr->tail[0].iov_base);
593
- page_base = offset_in_page(xdr->tail[0].iov_base);
594
-
595
- /* If the content in the page list is an odd length,
596
- * xdr_write_pages() has added a pad at the beginning
597
- * of the tail iovec. Force the tail's non-pad content
598
- * to land at the next XDR position in the Send message.
599
- */
600
- page_base += len & 3;
601
- len -= len & 3;
602
- goto map_tail;
603
- }
604
-
605
- /* If there is a page list present, temporarily DMA map
606
- * and prepare an SGE for each page to be sent.
607
- */
608
- if (xdr->page_len) {
609
- ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
610
- page_base = offset_in_page(xdr->page_base);
611
- remaining = xdr->page_len;
612
- while (remaining) {
613
- sge_no++;
614
- if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
615
- goto out_mapping_overflow;
616
-
617
- len = min_t(u32, PAGE_SIZE - page_base, remaining);
618
- sge[sge_no].addr = ib_dma_map_page(device, *ppages,
619
- page_base, len,
620
- DMA_TO_DEVICE);
621
- if (ib_dma_mapping_error(device, sge[sge_no].addr))
622
- goto out_mapping_err;
623
- sge[sge_no].length = len;
624
- sge[sge_no].lkey = lkey;
625
-
626
- sc->sc_unmap_count++;
627
- ppages++;
628
- remaining -= len;
629
- page_base = 0;
630
- }
631
- }
632
-
633
- /* The tail iovec is not always constructed in the same
634
- * page where the head iovec resides (see, for example,
635
- * gss_wrap_req_priv). To neatly accommodate that case,
636
- * DMA map it separately.
637
- */
638
- if (xdr->tail[0].iov_len) {
639
- page = virt_to_page(xdr->tail[0].iov_base);
640
- page_base = offset_in_page(xdr->tail[0].iov_base);
641
- len = xdr->tail[0].iov_len;
642
-
643
-map_tail:
644
- sge_no++;
645
- sge[sge_no].addr = ib_dma_map_page(device, page,
646
- page_base, len,
647
- DMA_TO_DEVICE);
648
- if (ib_dma_mapping_error(device, sge[sge_no].addr))
649
- goto out_mapping_err;
650
- sge[sge_no].length = len;
651
- sge[sge_no].lkey = lkey;
652
- sc->sc_unmap_count++;
653
- }
654
-
655
-out:
656
- sc->sc_wr.num_sge += sge_no;
657
- if (sc->sc_unmap_count)
658
- __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
666
+ sge->length = len;
667
+ sge->lkey = rdmab_lkey(rb);
668
+ ++sc->sc_unmap_count;
659669 return true;
660670
661
-out_regbuf:
662
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
663
- return false;
664
-
665
-out_mapping_overflow:
666
- rpcrdma_unmap_sendctx(sc);
667
- pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
668
- return false;
669
-
670671 out_mapping_err:
671
- rpcrdma_unmap_sendctx(sc);
672
- pr_err("rpcrdma: Send mapping error\n");
672
+ trace_xprtrdma_dma_maperr(sge->addr);
673673 return false;
674
+}
675
+
676
+/* Copy the tail to the end of the head buffer.
677
+ */
678
+static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
679
+ struct rpcrdma_req *req,
680
+ struct xdr_buf *xdr)
681
+{
682
+ unsigned char *dst;
683
+
684
+ dst = (unsigned char *)xdr->head[0].iov_base;
685
+ dst += xdr->head[0].iov_len + xdr->page_len;
686
+ memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
687
+ r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
688
+}
689
+
690
+/* Copy pagelist content into the head buffer.
691
+ */
692
+static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
693
+ struct rpcrdma_req *req,
694
+ struct xdr_buf *xdr)
695
+{
696
+ unsigned int len, page_base, remaining;
697
+ struct page **ppages;
698
+ unsigned char *src, *dst;
699
+
700
+ dst = (unsigned char *)xdr->head[0].iov_base;
701
+ dst += xdr->head[0].iov_len;
702
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
703
+ page_base = offset_in_page(xdr->page_base);
704
+ remaining = xdr->page_len;
705
+ while (remaining) {
706
+ src = page_address(*ppages);
707
+ src += page_base;
708
+ len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
709
+ memcpy(dst, src, len);
710
+ r_xprt->rx_stats.pullup_copy_count += len;
711
+
712
+ ppages++;
713
+ dst += len;
714
+ remaining -= len;
715
+ page_base = 0;
716
+ }
717
+}
718
+
719
+/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
720
+ * When the head, pagelist, and tail are small, a pull-up copy
721
+ * is considerably less costly than DMA mapping the components
722
+ * of @xdr.
723
+ *
724
+ * Assumptions:
725
+ * - the caller has already verified that the total length
726
+ * of the RPC Call body will fit into @rl_sendbuf.
727
+ */
728
+static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
729
+ struct rpcrdma_req *req,
730
+ struct xdr_buf *xdr)
731
+{
732
+ if (unlikely(xdr->tail[0].iov_len))
733
+ rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
734
+
735
+ if (unlikely(xdr->page_len))
736
+ rpcrdma_pullup_pagelist(r_xprt, req, xdr);
737
+
738
+ /* The whole RPC message resides in the head iovec now */
739
+ return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
740
+}
741
+
742
+static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
743
+ struct rpcrdma_req *req,
744
+ struct xdr_buf *xdr)
745
+{
746
+ struct kvec *tail = &xdr->tail[0];
747
+
748
+ if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
749
+ return false;
750
+ if (xdr->page_len)
751
+ if (!rpcrdma_prepare_pagelist(req, xdr))
752
+ return false;
753
+ if (tail->iov_len)
754
+ if (!rpcrdma_prepare_tail_iov(req, xdr,
755
+ offset_in_page(tail->iov_base),
756
+ tail->iov_len))
757
+ return false;
758
+
759
+ if (req->rl_sendctx->sc_unmap_count)
760
+ kref_get(&req->rl_kref);
761
+ return true;
762
+}
763
+
764
+static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
765
+ struct rpcrdma_req *req,
766
+ struct xdr_buf *xdr)
767
+{
768
+ if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len))
769
+ return false;
770
+
771
+ /* If there is a Read chunk, the page list is being handled
772
+ * via explicit RDMA, and thus is skipped here.
773
+ */
774
+
775
+ /* Do not include the tail if it is only an XDR pad */
776
+ if (xdr->tail[0].iov_len > 3) {
777
+ unsigned int page_base, len;
778
+
779
+ /* If the content in the page list is an odd length,
780
+ * xdr_write_pages() adds a pad at the beginning of
781
+ * the tail iovec. Force the tail's non-pad content to
782
+ * land at the next XDR position in the Send message.
783
+ */
784
+ page_base = offset_in_page(xdr->tail[0].iov_base);
785
+ len = xdr->tail[0].iov_len;
786
+ page_base += len & 3;
787
+ len -= len & 3;
788
+ if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
789
+ return false;
790
+ kref_get(&req->rl_kref);
791
+ }
792
+
793
+ return true;
674794 }
675795
676796 /**
....@@ -683,27 +803,54 @@
683803 *
684804 * Returns 0 on success; otherwise a negative errno is returned.
685805 */
686
-int
687
-rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
688
- struct rpcrdma_req *req, u32 hdrlen,
689
- struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
806
+inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
807
+ struct rpcrdma_req *req, u32 hdrlen,
808
+ struct xdr_buf *xdr,
809
+ enum rpcrdma_chunktype rtype)
690810 {
691
- req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
811
+ int ret;
812
+
813
+ ret = -EAGAIN;
814
+ req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
692815 if (!req->rl_sendctx)
693
- return -EAGAIN;
694
- req->rl_sendctx->sc_wr.num_sge = 0;
816
+ goto out_nosc;
695817 req->rl_sendctx->sc_unmap_count = 0;
696818 req->rl_sendctx->sc_req = req;
697
- __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
819
+ kref_init(&req->rl_kref);
820
+ req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
821
+ req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
822
+ req->rl_wr.num_sge = 0;
823
+ req->rl_wr.opcode = IB_WR_SEND;
698824
699
- if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
700
- return -EIO;
825
+ rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
701826
702
- if (rtype != rpcrdma_areadch)
703
- if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
704
- return -EIO;
827
+ ret = -EIO;
828
+ switch (rtype) {
829
+ case rpcrdma_noch_pullup:
830
+ if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
831
+ goto out_unmap;
832
+ break;
833
+ case rpcrdma_noch_mapped:
834
+ if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
835
+ goto out_unmap;
836
+ break;
837
+ case rpcrdma_readch:
838
+ if (!rpcrdma_prepare_readch(r_xprt, req, xdr))
839
+ goto out_unmap;
840
+ break;
841
+ case rpcrdma_areadch:
842
+ break;
843
+ default:
844
+ goto out_unmap;
845
+ }
705846
706847 return 0;
848
+
849
+out_unmap:
850
+ rpcrdma_sendctx_unmap(req->rl_sendctx);
851
+out_nosc:
852
+ trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
853
+ return ret;
707854 }
708855
709856 /**
....@@ -731,13 +878,20 @@
731878 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
732879 struct xdr_stream *xdr = &req->rl_stream;
733880 enum rpcrdma_chunktype rtype, wtype;
881
+ struct xdr_buf *buf = &rqst->rq_snd_buf;
734882 bool ddp_allowed;
735883 __be32 *p;
736884 int ret;
737885
886
+ if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
887
+ ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf);
888
+ if (ret)
889
+ return ret;
890
+ }
891
+
738892 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
739
- xdr_init_encode(xdr, &req->rl_hdrbuf,
740
- req->rl_rdmabuf->rg_base);
893
+ xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf),
894
+ rqst);
741895
742896 /* Fixed header fields */
743897 ret = -EMSGSIZE;
....@@ -746,14 +900,14 @@
746900 goto out_err;
747901 *p++ = rqst->rq_xid;
748902 *p++ = rpcrdma_version;
749
- *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
903
+ *p++ = r_xprt->rx_buf.rb_max_requests;
750904
751905 /* When the ULP employs a GSS flavor that guarantees integrity
752906 * or privacy, direct data placement of individual data items
753907 * is not allowed.
754908 */
755
- ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
756
- RPCAUTH_AUTH_DATATOUCH);
909
+ ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
910
+ &rqst->rq_cred->cr_auth->au_flags);
757911
758912 /*
759913 * Chunks needed for results?
....@@ -766,7 +920,8 @@
766920 */
767921 if (rpcrdma_results_inline(r_xprt, rqst))
768922 wtype = rpcrdma_noch;
769
- else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
923
+ else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
924
+ rpcrdma_nonpayload_inline(r_xprt, rqst))
770925 wtype = rpcrdma_writech;
771926 else
772927 wtype = rpcrdma_replych;
....@@ -787,25 +942,15 @@
787942 */
788943 if (rpcrdma_args_inline(r_xprt, rqst)) {
789944 *p++ = rdma_msg;
790
- rtype = rpcrdma_noch;
791
- } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
945
+ rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
946
+ rpcrdma_noch_pullup : rpcrdma_noch_mapped;
947
+ } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
792948 *p++ = rdma_msg;
793949 rtype = rpcrdma_readch;
794950 } else {
795951 r_xprt->rx_stats.nomsg_call_count++;
796952 *p++ = rdma_nomsg;
797953 rtype = rpcrdma_areadch;
798
- }
799
-
800
- /* If this is a retransmit, discard previously registered
801
- * chunks. Very likely the connection has been replaced,
802
- * so these registrations are invalid and unusable.
803
- */
804
- while (unlikely(!list_empty(&req->rl_registered))) {
805
- struct rpcrdma_mr *mr;
806
-
807
- mr = rpcrdma_mr_pop(&req->rl_registered);
808
- rpcrdma_mr_defer_recovery(mr);
809954 }
810955
811956 /* This implementation supports the following combinations
....@@ -830,50 +975,63 @@
830975 * send a Call message with a Position Zero Read chunk and a
831976 * regular Read chunk at the same time.
832977 */
833
- if (rtype != rpcrdma_noch) {
834
- ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
835
- if (ret)
836
- goto out_err;
837
- }
838
- ret = encode_item_not_present(xdr);
978
+ ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
979
+ if (ret)
980
+ goto out_err;
981
+ ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
982
+ if (ret)
983
+ goto out_err;
984
+ ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
839985 if (ret)
840986 goto out_err;
841987
842
- if (wtype == rpcrdma_writech) {
843
- ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
844
- if (ret)
845
- goto out_err;
846
- }
847
- ret = encode_item_not_present(xdr);
988
+ ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
989
+ buf, rtype);
848990 if (ret)
849991 goto out_err;
850992
851
- if (wtype != rpcrdma_replych)
852
- ret = encode_item_not_present(xdr);
853
- else
854
- ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
855
- if (ret)
856
- goto out_err;
857
-
858
- trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
859
-
860
- ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
861
- &rqst->rq_snd_buf, rtype);
862
- if (ret)
863
- goto out_err;
993
+ trace_xprtrdma_marshal(req, rtype, wtype);
864994 return 0;
865995
866996 out_err:
867
- switch (ret) {
868
- case -EAGAIN:
869
- xprt_wait_for_buffer_space(rqst->rq_task, NULL);
870
- break;
871
- case -ENOBUFS:
872
- break;
873
- default:
874
- r_xprt->rx_stats.failed_marshal_count++;
875
- }
997
+ trace_xprtrdma_marshal_failed(rqst, ret);
998
+ r_xprt->rx_stats.failed_marshal_count++;
999
+ frwr_reset(req);
8761000 return ret;
1001
+}
1002
+
1003
+static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt,
1004
+ struct rpcrdma_buffer *buf,
1005
+ u32 grant)
1006
+{
1007
+ buf->rb_credits = grant;
1008
+ xprt->cwnd = grant << RPC_CWNDSHIFT;
1009
+}
1010
+
1011
+static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant)
1012
+{
1013
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1014
+
1015
+ spin_lock(&xprt->transport_lock);
1016
+ __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant);
1017
+ spin_unlock(&xprt->transport_lock);
1018
+}
1019
+
1020
+/**
1021
+ * rpcrdma_reset_cwnd - Reset the xprt's congestion window
1022
+ * @r_xprt: controlling transport instance
1023
+ *
1024
+ * Prepare @r_xprt for the next connection by reinitializing
1025
+ * its credit grant to one (see RFC 8166, Section 3.3.3).
1026
+ */
1027
+void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
1028
+{
1029
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1030
+
1031
+ spin_lock(&xprt->transport_lock);
1032
+ xprt->cong = 0;
1033
+ __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1);
1034
+ spin_unlock(&xprt->transport_lock);
8771035 }
8781036
8791037 /**
....@@ -915,7 +1073,6 @@
9151073 curlen = rqst->rq_rcv_buf.head[0].iov_len;
9161074 if (curlen > copy_len)
9171075 curlen = copy_len;
918
- trace_xprtrdma_fixup(rqst, copy_len, curlen);
9191076 srcp += curlen;
9201077 copy_len -= curlen;
9211078
....@@ -935,8 +1092,6 @@
9351092 if (curlen > pagelist_len)
9361093 curlen = pagelist_len;
9371094
938
- trace_xprtrdma_fixup_pg(rqst, i, srcp,
939
- copy_len, curlen);
9401095 destp = kmap_atomic(ppages[i]);
9411096 memcpy(destp + page_base, srcp, curlen);
9421097 flush_dcache_page(ppages[i]);
....@@ -968,6 +1123,8 @@
9681123 rqst->rq_private_buf.tail[0].iov_base = srcp;
9691124 }
9701125
1126
+ if (fixup_copy_count)
1127
+ trace_xprtrdma_fixup(rqst, fixup_copy_count);
9711128 return fixup_copy_count;
9721129 }
9731130
....@@ -980,6 +1137,7 @@
9801137 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
9811138 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
9821139 {
1140
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
9831141 struct xdr_stream *xdr = &rep->rr_stream;
9841142 __be32 *p;
9851143
....@@ -990,17 +1148,21 @@
9901148 p = xdr_inline_decode(xdr, 0);
9911149
9921150 /* Chunk lists */
993
- if (*p++ != xdr_zero)
1151
+ if (xdr_item_is_present(p++))
9941152 return false;
995
- if (*p++ != xdr_zero)
1153
+ if (xdr_item_is_present(p++))
9961154 return false;
997
- if (*p++ != xdr_zero)
1155
+ if (xdr_item_is_present(p++))
9981156 return false;
9991157
10001158 /* RPC header */
10011159 if (*p++ != rep->rr_xid)
10021160 return false;
10031161 if (*p != cpu_to_be32(RPC_CALL))
1162
+ return false;
1163
+
1164
+ /* No bc service. */
1165
+ if (xprt->bc_serv == NULL)
10041166 return false;
10051167
10061168 /* Now that we are sure this is a backchannel call,
....@@ -1033,10 +1195,7 @@
10331195 if (unlikely(!p))
10341196 return -EIO;
10351197
1036
- handle = be32_to_cpup(p++);
1037
- *length = be32_to_cpup(p++);
1038
- xdr_decode_hyper(p, &offset);
1039
-
1198
+ xdr_decode_rdma_segment(p, &handle, length, &offset);
10401199 trace_xprtrdma_decode_seg(handle, *length, offset);
10411200 return 0;
10421201 }
....@@ -1072,7 +1231,7 @@
10721231 p = xdr_inline_decode(xdr, sizeof(*p));
10731232 if (unlikely(!p))
10741233 return -EIO;
1075
- if (unlikely(*p != xdr_zero))
1234
+ if (unlikely(xdr_item_is_present(p)))
10761235 return -EIO;
10771236 return 0;
10781237 }
....@@ -1091,7 +1250,7 @@
10911250 p = xdr_inline_decode(xdr, sizeof(*p));
10921251 if (unlikely(!p))
10931252 return -EIO;
1094
- if (*p == xdr_zero)
1253
+ if (xdr_item_is_absent(p))
10951254 break;
10961255 if (!first)
10971256 return -EIO;
....@@ -1113,7 +1272,7 @@
11131272 return -EIO;
11141273
11151274 *length = 0;
1116
- if (*p != xdr_zero)
1275
+ if (xdr_item_is_present(p))
11171276 if (decode_write_chunk(xdr, length))
11181277 return -EIO;
11191278 return 0;
....@@ -1190,21 +1349,23 @@
11901349 p = xdr_inline_decode(xdr, 2 * sizeof(*p));
11911350 if (!p)
11921351 break;
1193
- dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
1194
- rqst->rq_task->tk_pid, __func__,
1195
- be32_to_cpup(p), be32_to_cpu(*(p + 1)));
1352
+ dprintk("RPC: %s: server reports "
1353
+ "version error (%u-%u), xid %08x\n", __func__,
1354
+ be32_to_cpup(p), be32_to_cpu(*(p + 1)),
1355
+ be32_to_cpu(rep->rr_xid));
11961356 break;
11971357 case err_chunk:
1198
- dprintk("RPC: %5u: %s: server reports header decoding error\n",
1199
- rqst->rq_task->tk_pid, __func__);
1358
+ dprintk("RPC: %s: server reports "
1359
+ "header decoding error, xid %08x\n", __func__,
1360
+ be32_to_cpu(rep->rr_xid));
12001361 break;
12011362 default:
1202
- dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
1203
- rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
1363
+ dprintk("RPC: %s: server reports "
1364
+ "unrecognized error %d, xid %08x\n", __func__,
1365
+ be32_to_cpup(p), be32_to_cpu(rep->rr_xid));
12041366 }
12051367
1206
- r_xprt->rx_stats.bad_reply_count++;
1207
- return -EREMOTEIO;
1368
+ return -EIO;
12081369 }
12091370
12101371 /* Perform XID lookup, reconstruction of the RPC reply, and
....@@ -1216,10 +1377,7 @@
12161377 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
12171378 struct rpc_xprt *xprt = &r_xprt->rx_xprt;
12181379 struct rpc_rqst *rqst = rep->rr_rqst;
1219
- unsigned long cwnd;
12201380 int status;
1221
-
1222
- xprt->reestablish_timeout = 0;
12231381
12241382 switch (rep->rr_proc) {
12251383 case rdma_msg:
....@@ -1238,74 +1396,31 @@
12381396 goto out_badheader;
12391397
12401398 out:
1241
- spin_lock(&xprt->recv_lock);
1242
- cwnd = xprt->cwnd;
1243
- xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
1244
- if (xprt->cwnd > cwnd)
1245
- xprt_release_rqst_cong(rqst->rq_task);
1246
-
1399
+ spin_lock(&xprt->queue_lock);
12471400 xprt_complete_rqst(rqst->rq_task, status);
12481401 xprt_unpin_rqst(rqst);
1249
- spin_unlock(&xprt->recv_lock);
1402
+ spin_unlock(&xprt->queue_lock);
12501403 return;
12511404
1252
-/* If the incoming reply terminated a pending RPC, the next
1253
- * RPC call will post a replacement receive buffer as it is
1254
- * being marshaled.
1255
- */
12561405 out_badheader:
12571406 trace_xprtrdma_reply_hdr(rep);
12581407 r_xprt->rx_stats.bad_reply_count++;
1259
- status = -EIO;
1408
+ rqst->rq_task->tk_status = status;
1409
+ status = 0;
12601410 goto out;
12611411 }
12621412
1263
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1413
+static void rpcrdma_reply_done(struct kref *kref)
12641414 {
1265
- /* Invalidate and unmap the data payloads before waking
1266
- * the waiting application. This guarantees the memory
1267
- * regions are properly fenced from the server before the
1268
- * application accesses the data. It also ensures proper
1269
- * send flow control: waking the next RPC waits until this
1270
- * RPC has relinquished all its Send Queue entries.
1271
- */
1272
- if (!list_empty(&req->rl_registered))
1273
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
1274
- &req->rl_registered);
1415
+ struct rpcrdma_req *req =
1416
+ container_of(kref, struct rpcrdma_req, rl_kref);
12751417
1276
- /* Ensure that any DMA mapped pages associated with
1277
- * the Send of the RPC Call have been unmapped before
1278
- * allowing the RPC to complete. This protects argument
1279
- * memory not controlled by the RPC client from being
1280
- * re-used before we're done with it.
1281
- */
1282
- if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1283
- r_xprt->rx_stats.reply_waits_for_send++;
1284
- out_of_line_wait_on_bit(&req->rl_flags,
1285
- RPCRDMA_REQ_F_TX_RESOURCES,
1286
- bit_wait,
1287
- TASK_UNINTERRUPTIBLE);
1288
- }
1418
+ rpcrdma_complete_rqst(req->rl_reply);
12891419 }
12901420
1291
-/* Reply handling runs in the poll worker thread. Anything that
1292
- * might wait is deferred to a separate workqueue.
1293
- */
1294
-void rpcrdma_deferred_completion(struct work_struct *work)
1295
-{
1296
- struct rpcrdma_rep *rep =
1297
- container_of(work, struct rpcrdma_rep, rr_work);
1298
- struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1299
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1300
-
1301
- trace_xprtrdma_defer_cmp(rep);
1302
- if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1303
- r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
1304
- rpcrdma_release_rqst(r_xprt, req);
1305
- rpcrdma_complete_rqst(rep);
1306
-}
1307
-
1308
-/* Process received RPC/RDMA messages.
1421
+/**
1422
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
1423
+ * @rep: Incoming rpcrdma_rep object to process
13091424 *
13101425 * Errors must result in the RPC task either being awakened, or
13111426 * allowed to timeout, to discover the errors at that time.
....@@ -1320,14 +1435,15 @@
13201435 u32 credits;
13211436 __be32 *p;
13221437
1323
- --buf->rb_posted_receives;
1324
-
1325
- if (rep->rr_hdrbuf.head[0].iov_len == 0)
1326
- goto out_badstatus;
1438
+ /* Any data means we had a useful conversation, so
1439
+ * then we don't need to delay the next reconnect.
1440
+ */
1441
+ if (xprt->reestablish_timeout)
1442
+ xprt->reestablish_timeout = 0;
13271443
13281444 /* Fixed transport header fields */
13291445 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1330
- rep->rr_hdrbuf.head[0].iov_base);
1446
+ rep->rr_hdrbuf.head[0].iov_base, NULL);
13311447 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
13321448 if (unlikely(!p))
13331449 goto out_shortreply;
....@@ -1345,19 +1461,21 @@
13451461 /* Match incoming rpcrdma_rep to an rpcrdma_req to
13461462 * get context for handling any incoming chunks.
13471463 */
1348
- spin_lock(&xprt->recv_lock);
1464
+ spin_lock(&xprt->queue_lock);
13491465 rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
13501466 if (!rqst)
13511467 goto out_norqst;
13521468 xprt_pin_rqst(rqst);
1469
+ spin_unlock(&xprt->queue_lock);
13531470
13541471 if (credits == 0)
13551472 credits = 1; /* don't deadlock */
1356
- else if (credits > buf->rb_max_requests)
1357
- credits = buf->rb_max_requests;
1358
- buf->rb_credits = credits;
1359
-
1360
- spin_unlock(&xprt->recv_lock);
1473
+ else if (credits > r_xprt->rx_ep->re_max_requests)
1474
+ credits = r_xprt->rx_ep->re_max_requests;
1475
+ rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1),
1476
+ false);
1477
+ if (buf->rb_credits != credits)
1478
+ rpcrdma_update_cwnd(r_xprt, credits);
13611479
13621480 req = rpcr_to_rdmar(rqst);
13631481 if (req->rl_reply) {
....@@ -1366,34 +1484,30 @@
13661484 }
13671485 req->rl_reply = rep;
13681486 rep->rr_rqst = rqst;
1369
- clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
13701487
13711488 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
13721489
1373
- rpcrdma_post_recvs(r_xprt, false);
1374
- queue_work(rpcrdma_receive_wq, &rep->rr_work);
1490
+ if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1491
+ frwr_reminv(rep, &req->rl_registered);
1492
+ if (!list_empty(&req->rl_registered))
1493
+ frwr_unmap_async(r_xprt, req);
1494
+ /* LocalInv completion will complete the RPC */
1495
+ else
1496
+ kref_put(&req->rl_kref, rpcrdma_reply_done);
13751497 return;
13761498
13771499 out_badversion:
13781500 trace_xprtrdma_reply_vers(rep);
1379
- goto repost;
1501
+ goto out;
13801502
1381
-/* The RPC transaction has already been terminated, or the header
1382
- * is corrupt.
1383
- */
13841503 out_norqst:
1385
- spin_unlock(&xprt->recv_lock);
1504
+ spin_unlock(&xprt->queue_lock);
13861505 trace_xprtrdma_reply_rqst(rep);
1387
- goto repost;
1506
+ goto out;
13881507
13891508 out_shortreply:
13901509 trace_xprtrdma_reply_short(rep);
13911510
1392
-/* If no pending RPC transaction was matched, post a replacement
1393
- * receive buffer before returning.
1394
- */
1395
-repost:
1396
- rpcrdma_post_recvs(r_xprt, false);
1397
-out_badstatus:
1511
+out:
13981512 rpcrdma_recv_buffer_put(rep);
13991513 }