hc
2024-10-22 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5
kernel/net/sunrpc/xprtrdma/frwr_ops.c
....@@ -7,70 +7,39 @@
77 /* Lightweight memory registration using Fast Registration Work
88 * Requests (FRWR).
99 *
10
- * FRWR features ordered asynchronous registration and deregistration
11
- * of arbitrarily sized memory regions. This is the fastest and safest
10
+ * FRWR features ordered asynchronous registration and invalidation
11
+ * of arbitrarily-sized memory regions. This is the fastest and safest
1212 * but most complex memory registration mode.
1313 */
1414
1515 /* Normal operation
1616 *
17
- * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
18
- * Work Request (frwr_op_map). When the RDMA operation is finished, this
17
+ * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
18
+ * Work Request (frwr_map). When the RDMA operation is finished, this
1919 * Memory Region is invalidated using a LOCAL_INV Work Request
20
- * (frwr_op_unmap_sync).
20
+ * (frwr_unmap_async and frwr_unmap_sync).
2121 *
22
- * Typically these Work Requests are not signaled, and neither are RDMA
23
- * SEND Work Requests (with the exception of signaling occasionally to
24
- * prevent provider work queue overflows). This greatly reduces HCA
22
+ * Typically FAST_REG Work Requests are not signaled, and neither are
23
+ * RDMA Send Work Requests (with the exception of signaling occasionally
24
+ * to prevent provider work queue overflows). This greatly reduces HCA
2525 * interrupt workload.
26
- *
27
- * As an optimization, frwr_op_unmap marks MRs INVALID before the
28
- * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
29
- * rb_mrs immediately so that no work (like managing a linked list
30
- * under a spinlock) is needed in the completion upcall.
31
- *
32
- * But this means that frwr_op_map() can occasionally encounter an MR
33
- * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
34
- * ordering prevents a subsequent FAST_REG WR from executing against
35
- * that MR while it is still being invalidated.
3626 */
3727
3828 /* Transport recovery
3929 *
40
- * ->op_map and the transport connect worker cannot run at the same
41
- * time, but ->op_unmap can fire while the transport connect worker
42
- * is running. Thus MR recovery is handled in ->op_map, to guarantee
43
- * that recovered MRs are owned by a sending RPC, and not one where
44
- * ->op_unmap could fire at the same time transport reconnect is
45
- * being done.
30
+ * frwr_map and frwr_unmap_* cannot run at the same time the transport
31
+ * connect worker is running. The connect worker holds the transport
32
+ * send lock, just as ->send_request does. This prevents frwr_map and
33
+ * the connect worker from running concurrently. When a connection is
34
+ * closed, the Receive completion queue is drained before the allowing
35
+ * the connect worker to get control. This prevents frwr_unmap and the
36
+ * connect worker from running concurrently.
4637 *
47
- * When the underlying transport disconnects, MRs are left in one of
48
- * four states:
49
- *
50
- * INVALID: The MR was not in use before the QP entered ERROR state.
51
- *
52
- * VALID: The MR was registered before the QP entered ERROR state.
53
- *
54
- * FLUSHED_FR: The MR was being registered when the QP entered ERROR
55
- * state, and the pending WR was flushed.
56
- *
57
- * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR
58
- * state, and the pending WR was flushed.
59
- *
60
- * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
61
- * with ib_dereg_mr and then are re-initialized. Because MR recovery
62
- * allocates fresh resources, it is deferred to a workqueue, and the
63
- * recovered MRs are placed back on the rb_mrs list when recovery is
64
- * complete. frwr_op_map allocates another MR for the current RPC while
65
- * the broken MR is reset.
66
- *
67
- * To ensure that frwr_op_map doesn't encounter an MR that is marked
68
- * INVALID but that is about to be flushed due to a previous transport
69
- * disconnect, the transport connect worker attempts to drain all
70
- * pending send queue WRs before the transport is reconnected.
38
+ * When the underlying transport disconnects, MRs that are in flight
39
+ * are flushed and are likely unusable. Thus all MRs are destroyed.
40
+ * New MRs are created on demand.
7141 */
7242
73
-#include <linux/sunrpc/rpc_rdma.h>
7443 #include <linux/sunrpc/svc_rdma.h>
7544
7645 #include "xprt_rdma.h"
....@@ -80,156 +49,158 @@
8049 # define RPCDBG_FACILITY RPCDBG_TRANS
8150 #endif
8251
83
-bool
84
-frwr_is_supported(struct rpcrdma_ia *ia)
85
-{
86
- struct ib_device_attr *attrs = &ia->ri_device->attrs;
87
-
88
- if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
89
- goto out_not_supported;
90
- if (attrs->max_fast_reg_page_list_len == 0)
91
- goto out_not_supported;
92
- return true;
93
-
94
-out_not_supported:
95
- pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
96
- ia->ri_device->name);
97
- return false;
98
-}
99
-
100
-static int
101
-frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
102
-{
103
- unsigned int depth = ia->ri_max_frwr_depth;
104
- struct rpcrdma_frwr *frwr = &mr->frwr;
105
- int rc;
106
-
107
- frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
108
- if (IS_ERR(frwr->fr_mr))
109
- goto out_mr_err;
110
-
111
- mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL);
112
- if (!mr->mr_sg)
113
- goto out_list_err;
114
-
115
- INIT_LIST_HEAD(&mr->mr_list);
116
- sg_init_table(mr->mr_sg, depth);
117
- init_completion(&frwr->fr_linv_done);
118
- return 0;
119
-
120
-out_mr_err:
121
- rc = PTR_ERR(frwr->fr_mr);
122
- dprintk("RPC: %s: ib_alloc_mr status %i\n",
123
- __func__, rc);
124
- return rc;
125
-
126
-out_list_err:
127
- rc = -ENOMEM;
128
- dprintk("RPC: %s: sg allocation failure\n",
129
- __func__);
130
- ib_dereg_mr(frwr->fr_mr);
131
- return rc;
132
-}
133
-
134
-static void
135
-frwr_op_release_mr(struct rpcrdma_mr *mr)
52
+/**
53
+ * frwr_release_mr - Destroy one MR
54
+ * @mr: MR allocated by frwr_mr_init
55
+ *
56
+ */
57
+void frwr_release_mr(struct rpcrdma_mr *mr)
13658 {
13759 int rc;
13860
13961 rc = ib_dereg_mr(mr->frwr.fr_mr);
14062 if (rc)
141
- pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
142
- mr, rc);
63
+ trace_xprtrdma_frwr_dereg(mr, rc);
14364 kfree(mr->mr_sg);
14465 kfree(mr);
14566 }
14667
147
-static int
148
-__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
68
+static void frwr_mr_recycle(struct rpcrdma_mr *mr)
14969 {
150
- struct rpcrdma_frwr *frwr = &mr->frwr;
151
- int rc;
152
-
153
- rc = ib_dereg_mr(frwr->fr_mr);
154
- if (rc) {
155
- pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
156
- rc, mr);
157
- return rc;
158
- }
159
-
160
- frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype,
161
- ia->ri_max_frwr_depth);
162
- if (IS_ERR(frwr->fr_mr)) {
163
- pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
164
- PTR_ERR(frwr->fr_mr), mr);
165
- return PTR_ERR(frwr->fr_mr);
166
- }
167
-
168
- dprintk("RPC: %s: recovered FRWR %p\n", __func__, frwr);
169
- frwr->fr_state = FRWR_IS_INVALID;
170
- return 0;
171
-}
172
-
173
-/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR.
174
- */
175
-static void
176
-frwr_op_recover_mr(struct rpcrdma_mr *mr)
177
-{
178
- enum rpcrdma_frwr_state state = mr->frwr.fr_state;
17970 struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
180
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
181
- int rc;
18271
183
- rc = __frwr_mr_reset(ia, mr);
184
- if (state != FRWR_FLUSHED_LI) {
185
- trace_xprtrdma_dma_unmap(mr);
186
- ib_dma_unmap_sg(ia->ri_device,
72
+ trace_xprtrdma_mr_recycle(mr);
73
+
74
+ if (mr->mr_dir != DMA_NONE) {
75
+ trace_xprtrdma_mr_unmap(mr);
76
+ ib_dma_unmap_sg(r_xprt->rx_ep->re_id->device,
18777 mr->mr_sg, mr->mr_nents, mr->mr_dir);
78
+ mr->mr_dir = DMA_NONE;
18879 }
189
- if (rc)
190
- goto out_release;
19180
192
- rpcrdma_mr_put(mr);
193
- r_xprt->rx_stats.mrs_recovered++;
194
- return;
195
-
196
-out_release:
197
- pr_err("rpcrdma: FRWR reset failed %d, %p released\n", rc, mr);
198
- r_xprt->rx_stats.mrs_orphaned++;
199
-
200
- spin_lock(&r_xprt->rx_buf.rb_mrlock);
81
+ spin_lock(&r_xprt->rx_buf.rb_lock);
20182 list_del(&mr->mr_all);
202
- spin_unlock(&r_xprt->rx_buf.rb_mrlock);
83
+ r_xprt->rx_stats.mrs_recycled++;
84
+ spin_unlock(&r_xprt->rx_buf.rb_lock);
20385
204
- frwr_op_release_mr(mr);
86
+ frwr_release_mr(mr);
20587 }
20688
207
-/* On success, sets:
208
- * ep->rep_attr.cap.max_send_wr
209
- * ep->rep_attr.cap.max_recv_wr
210
- * cdata->max_requests
211
- * ia->ri_max_segs
89
+/* frwr_reset - Place MRs back on the free list
90
+ * @req: request to reset
21291 *
213
- * And these FRWR-related fields:
214
- * ia->ri_max_frwr_depth
215
- * ia->ri_mrtype
92
+ * Used after a failed marshal. For FRWR, this means the MRs
93
+ * don't have to be fully released and recreated.
94
+ *
95
+ * NB: This is safe only as long as none of @req's MRs are
96
+ * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
97
+ * Work Request.
21698 */
217
-static int
218
-frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
219
- struct rpcrdma_create_data_internal *cdata)
99
+void frwr_reset(struct rpcrdma_req *req)
220100 {
221
- struct ib_device_attr *attrs = &ia->ri_device->attrs;
101
+ struct rpcrdma_mr *mr;
102
+
103
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
104
+ rpcrdma_mr_put(mr);
105
+}
106
+
107
+/**
108
+ * frwr_mr_init - Initialize one MR
109
+ * @r_xprt: controlling transport instance
110
+ * @mr: generic MR to prepare for FRWR
111
+ *
112
+ * Returns zero if successful. Otherwise a negative errno
113
+ * is returned.
114
+ */
115
+int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
116
+{
117
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
118
+ unsigned int depth = ep->re_max_fr_depth;
119
+ struct scatterlist *sg;
120
+ struct ib_mr *frmr;
121
+ int rc;
122
+
123
+ frmr = ib_alloc_mr(ep->re_pd, ep->re_mrtype, depth);
124
+ if (IS_ERR(frmr))
125
+ goto out_mr_err;
126
+
127
+ sg = kmalloc_array(depth, sizeof(*sg), GFP_NOFS);
128
+ if (!sg)
129
+ goto out_list_err;
130
+
131
+ mr->mr_xprt = r_xprt;
132
+ mr->frwr.fr_mr = frmr;
133
+ mr->mr_dir = DMA_NONE;
134
+ INIT_LIST_HEAD(&mr->mr_list);
135
+ init_completion(&mr->frwr.fr_linv_done);
136
+
137
+ sg_init_table(sg, depth);
138
+ mr->mr_sg = sg;
139
+ return 0;
140
+
141
+out_mr_err:
142
+ rc = PTR_ERR(frmr);
143
+ trace_xprtrdma_frwr_alloc(mr, rc);
144
+ return rc;
145
+
146
+out_list_err:
147
+ ib_dereg_mr(frmr);
148
+ return -ENOMEM;
149
+}
150
+
151
+/**
152
+ * frwr_query_device - Prepare a transport for use with FRWR
153
+ * @ep: endpoint to fill in
154
+ * @device: RDMA device to query
155
+ *
156
+ * On success, sets:
157
+ * ep->re_attr
158
+ * ep->re_max_requests
159
+ * ep->re_max_rdma_segs
160
+ * ep->re_max_fr_depth
161
+ * ep->re_mrtype
162
+ *
163
+ * Return values:
164
+ * On success, returns zero.
165
+ * %-EINVAL - the device does not support FRWR memory registration
166
+ * %-ENOMEM - the device is not sufficiently capable for NFS/RDMA
167
+ */
168
+int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
169
+{
170
+ const struct ib_device_attr *attrs = &device->attrs;
222171 int max_qp_wr, depth, delta;
172
+ unsigned int max_sge;
223173
224
- ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
174
+ if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
175
+ attrs->max_fast_reg_page_list_len == 0) {
176
+ pr_err("rpcrdma: 'frwr' mode is not supported by device %s\n",
177
+ device->name);
178
+ return -EINVAL;
179
+ }
180
+
181
+ max_sge = min_t(unsigned int, attrs->max_send_sge,
182
+ RPCRDMA_MAX_SEND_SGES);
183
+ if (max_sge < RPCRDMA_MIN_SEND_SGES) {
184
+ pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge);
185
+ return -ENOMEM;
186
+ }
187
+ ep->re_attr.cap.max_send_sge = max_sge;
188
+ ep->re_attr.cap.max_recv_sge = 1;
189
+
190
+ ep->re_mrtype = IB_MR_TYPE_MEM_REG;
225191 if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
226
- ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
192
+ ep->re_mrtype = IB_MR_TYPE_SG_GAPS;
227193
228
- ia->ri_max_frwr_depth =
229
- min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
230
- attrs->max_fast_reg_page_list_len);
231
- dprintk("RPC: %s: device's max FR page list len = %u\n",
232
- __func__, ia->ri_max_frwr_depth);
194
+ /* Quirk: Some devices advertise a large max_fast_reg_page_list_len
195
+ * capability, but perform optimally when the MRs are not larger
196
+ * than a page.
197
+ */
198
+ if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS)
199
+ ep->re_max_fr_depth = attrs->max_sge_rd;
200
+ else
201
+ ep->re_max_fr_depth = attrs->max_fast_reg_page_list_len;
202
+ if (ep->re_max_fr_depth > RPCRDMA_MAX_DATA_SEGS)
203
+ ep->re_max_fr_depth = RPCRDMA_MAX_DATA_SEGS;
233204
234205 /* Add room for frwr register and invalidate WRs.
235206 * 1. FRWR reg WR for head
....@@ -245,155 +216,81 @@
245216 /* Calculate N if the device max FRWR depth is smaller than
246217 * RPCRDMA_MAX_DATA_SEGS.
247218 */
248
- if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
249
- delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
219
+ if (ep->re_max_fr_depth < RPCRDMA_MAX_DATA_SEGS) {
220
+ delta = RPCRDMA_MAX_DATA_SEGS - ep->re_max_fr_depth;
250221 do {
251222 depth += 2; /* FRWR reg + invalidate */
252
- delta -= ia->ri_max_frwr_depth;
223
+ delta -= ep->re_max_fr_depth;
253224 } while (delta > 0);
254225 }
255226
256
- max_qp_wr = ia->ri_device->attrs.max_qp_wr;
227
+ max_qp_wr = attrs->max_qp_wr;
257228 max_qp_wr -= RPCRDMA_BACKWARD_WRS;
258229 max_qp_wr -= 1;
259230 if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
260231 return -ENOMEM;
261
- if (cdata->max_requests > max_qp_wr)
262
- cdata->max_requests = max_qp_wr;
263
- ep->rep_attr.cap.max_send_wr = cdata->max_requests * depth;
264
- if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
265
- cdata->max_requests = max_qp_wr / depth;
266
- if (!cdata->max_requests)
267
- return -EINVAL;
268
- ep->rep_attr.cap.max_send_wr = cdata->max_requests *
269
- depth;
232
+ if (ep->re_max_requests > max_qp_wr)
233
+ ep->re_max_requests = max_qp_wr;
234
+ ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth;
235
+ if (ep->re_attr.cap.max_send_wr > max_qp_wr) {
236
+ ep->re_max_requests = max_qp_wr / depth;
237
+ if (!ep->re_max_requests)
238
+ return -ENOMEM;
239
+ ep->re_attr.cap.max_send_wr = ep->re_max_requests * depth;
270240 }
271
- ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
272
- ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
273
- ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
274
- ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
275
- ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
241
+ ep->re_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
242
+ ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
243
+ ep->re_attr.cap.max_recv_wr = ep->re_max_requests;
244
+ ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
245
+ ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH;
246
+ ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
276247
277
- ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
278
- ia->ri_max_frwr_depth);
248
+ ep->re_max_rdma_segs =
249
+ DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ep->re_max_fr_depth);
250
+ /* Reply chunks require segments for head and tail buffers */
251
+ ep->re_max_rdma_segs += 2;
252
+ if (ep->re_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS)
253
+ ep->re_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS;
254
+
255
+ /* Ensure the underlying device is capable of conveying the
256
+ * largest r/wsize NFS will ask for. This guarantees that
257
+ * failing over from one RDMA device to another will not
258
+ * break NFS I/O.
259
+ */
260
+ if ((ep->re_max_rdma_segs * ep->re_max_fr_depth) < RPCRDMA_MAX_SEGS)
261
+ return -ENOMEM;
262
+
279263 return 0;
280264 }
281265
282
-/* FRWR mode conveys a list of pages per chunk segment. The
283
- * maximum length of that list is the FRWR page list depth.
284
- */
285
-static size_t
286
-frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
287
-{
288
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
289
-
290
- return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
291
- RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth);
292
-}
293
-
294
-static void
295
-__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
296
-{
297
- if (wc->status != IB_WC_WR_FLUSH_ERR)
298
- pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
299
- wr, ib_wc_status_msg(wc->status),
300
- wc->status, wc->vendor_err);
301
-}
302
-
303266 /**
304
- * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
305
- * @cq: completion queue (ignored)
306
- * @wc: completed WR
267
+ * frwr_map - Register a memory region
268
+ * @r_xprt: controlling transport
269
+ * @seg: memory region co-ordinates
270
+ * @nsegs: number of segments remaining
271
+ * @writing: true when RDMA Write will be used
272
+ * @xid: XID of RPC using the registered memory
273
+ * @mr: MR to fill in
307274 *
308
- */
309
-static void
310
-frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
311
-{
312
- struct ib_cqe *cqe = wc->wr_cqe;
313
- struct rpcrdma_frwr *frwr =
314
- container_of(cqe, struct rpcrdma_frwr, fr_cqe);
315
-
316
- /* WARNING: Only wr_cqe and status are reliable at this point */
317
- if (wc->status != IB_WC_SUCCESS) {
318
- frwr->fr_state = FRWR_FLUSHED_FR;
319
- __frwr_sendcompletion_flush(wc, "fastreg");
320
- }
321
- trace_xprtrdma_wc_fastreg(wc, frwr);
322
-}
323
-
324
-/**
325
- * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
326
- * @cq: completion queue (ignored)
327
- * @wc: completed WR
328
- *
329
- */
330
-static void
331
-frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
332
-{
333
- struct ib_cqe *cqe = wc->wr_cqe;
334
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
335
- fr_cqe);
336
-
337
- /* WARNING: Only wr_cqe and status are reliable at this point */
338
- if (wc->status != IB_WC_SUCCESS) {
339
- frwr->fr_state = FRWR_FLUSHED_LI;
340
- __frwr_sendcompletion_flush(wc, "localinv");
341
- }
342
- trace_xprtrdma_wc_li(wc, frwr);
343
-}
344
-
345
-/**
346
- * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
347
- * @cq: completion queue (ignored)
348
- * @wc: completed WR
349
- *
350
- * Awaken anyone waiting for an MR to finish being fenced.
351
- */
352
-static void
353
-frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
354
-{
355
- struct ib_cqe *cqe = wc->wr_cqe;
356
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
357
- fr_cqe);
358
-
359
- /* WARNING: Only wr_cqe and status are reliable at this point */
360
- if (wc->status != IB_WC_SUCCESS) {
361
- frwr->fr_state = FRWR_FLUSHED_LI;
362
- __frwr_sendcompletion_flush(wc, "localinv");
363
- }
364
- complete(&frwr->fr_linv_done);
365
- trace_xprtrdma_wc_li_wake(wc, frwr);
366
-}
367
-
368
-/* Post a REG_MR Work Request to register a memory region
275
+ * Prepare a REG_MR Work Request to register a memory region
369276 * for remote access via RDMA READ or RDMA WRITE.
277
+ *
278
+ * Returns the next segment or a negative errno pointer.
279
+ * On success, @mr is filled in.
370280 */
371
-static struct rpcrdma_mr_seg *
372
-frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
373
- int nsegs, bool writing, struct rpcrdma_mr **out)
281
+struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
282
+ struct rpcrdma_mr_seg *seg,
283
+ int nsegs, bool writing, __be32 xid,
284
+ struct rpcrdma_mr *mr)
374285 {
375
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
376
- bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
377
- struct rpcrdma_frwr *frwr;
378
- struct rpcrdma_mr *mr;
379
- struct ib_mr *ibmr;
286
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
380287 struct ib_reg_wr *reg_wr;
381
- int i, n;
288
+ int i, n, dma_nents;
289
+ struct ib_mr *ibmr;
382290 u8 key;
383291
384
- mr = NULL;
385
- do {
386
- if (mr)
387
- rpcrdma_mr_defer_recovery(mr);
388
- mr = rpcrdma_mr_get(r_xprt);
389
- if (!mr)
390
- return ERR_PTR(-EAGAIN);
391
- } while (mr->frwr.fr_state != FRWR_IS_INVALID);
392
- frwr = &mr->frwr;
393
- frwr->fr_state = FRWR_IS_VALID;
394
-
395
- if (nsegs > ia->ri_max_frwr_depth)
396
- nsegs = ia->ri_max_frwr_depth;
292
+ if (nsegs > ep->re_max_fr_depth)
293
+ nsegs = ep->re_max_fr_depth;
397294 for (i = 0; i < nsegs;) {
398295 if (seg->mr_page)
399296 sg_set_page(&mr->mr_sg[i],
....@@ -406,28 +303,31 @@
406303
407304 ++seg;
408305 ++i;
409
- if (holes_ok)
306
+ if (ep->re_mrtype == IB_MR_TYPE_SG_GAPS)
410307 continue;
411308 if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
412309 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
413310 break;
414311 }
415312 mr->mr_dir = rpcrdma_data_dir(writing);
313
+ mr->mr_nents = i;
416314
417
- mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
418
- if (!mr->mr_nents)
315
+ dma_nents = ib_dma_map_sg(ep->re_id->device, mr->mr_sg, mr->mr_nents,
316
+ mr->mr_dir);
317
+ if (!dma_nents)
419318 goto out_dmamap_err;
420
- trace_xprtrdma_dma_map(mr);
421319
422
- ibmr = frwr->fr_mr;
423
- n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
424
- if (unlikely(n != mr->mr_nents))
320
+ ibmr = mr->frwr.fr_mr;
321
+ n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE);
322
+ if (n != dma_nents)
425323 goto out_mapmr_err;
426324
325
+ ibmr->iova &= 0x00000000ffffffff;
326
+ ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
427327 key = (u8)(ibmr->rkey & 0x000000FF);
428328 ib_update_fast_reg_key(ibmr, ++key);
429329
430
- reg_wr = &frwr->fr_regwr;
330
+ reg_wr = &mr->frwr.fr_regwr;
431331 reg_wr->mr = ibmr;
432332 reg_wr->key = ibmr->rkey;
433333 reg_wr->access = writing ?
....@@ -437,37 +337,59 @@
437337 mr->mr_handle = ibmr->rkey;
438338 mr->mr_length = ibmr->length;
439339 mr->mr_offset = ibmr->iova;
340
+ trace_xprtrdma_mr_map(mr);
440341
441
- *out = mr;
442342 return seg;
443343
444344 out_dmamap_err:
445
- pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
446
- mr->mr_sg, i);
447
- frwr->fr_state = FRWR_IS_INVALID;
448
- rpcrdma_mr_put(mr);
345
+ mr->mr_dir = DMA_NONE;
346
+ trace_xprtrdma_frwr_sgerr(mr, i);
449347 return ERR_PTR(-EIO);
450348
451349 out_mapmr_err:
452
- pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
453
- frwr->fr_mr, n, mr->mr_nents);
454
- rpcrdma_mr_defer_recovery(mr);
350
+ trace_xprtrdma_frwr_maperr(mr, n);
455351 return ERR_PTR(-EIO);
456352 }
457353
458
-/* Post Send WR containing the RPC Call message.
354
+/**
355
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
356
+ * @cq: completion queue
357
+ * @wc: WCE for a completed FastReg WR
459358 *
460
- * For FRMR, chain any FastReg WRs to the Send WR. Only a
359
+ */
360
+static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
361
+{
362
+ struct ib_cqe *cqe = wc->wr_cqe;
363
+ struct rpcrdma_frwr *frwr =
364
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
365
+
366
+ /* WARNING: Only wr_cqe and status are reliable at this point */
367
+ trace_xprtrdma_wc_fastreg(wc, frwr);
368
+ /* The MR will get recycled when the associated req is retransmitted */
369
+
370
+ rpcrdma_flush_disconnect(cq->cq_context, wc);
371
+}
372
+
373
+/**
374
+ * frwr_send - post Send WRs containing the RPC Call message
375
+ * @r_xprt: controlling transport instance
376
+ * @req: prepared RPC Call
377
+ *
378
+ * For FRWR, chain any FastReg WRs to the Send WR. Only a
461379 * single ib_post_send call is needed to register memory
462380 * and then post the Send WR.
381
+ *
382
+ * Returns the return code from ib_post_send.
383
+ *
384
+ * Caller must hold the transport send lock to ensure that the
385
+ * pointers to the transport's rdma_cm_id and QP are stable.
463386 */
464
-static int
465
-frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
387
+int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
466388 {
467389 struct ib_send_wr *post_wr;
468390 struct rpcrdma_mr *mr;
469391
470
- post_wr = &req->rl_sendctx->sc_wr;
392
+ post_wr = &req->rl_wr;
471393 list_for_each_entry(mr, &req->rl_registered, mr_list) {
472394 struct rpcrdma_frwr *frwr;
473395
....@@ -483,46 +405,96 @@
483405 post_wr = &frwr->fr_regwr.wr;
484406 }
485407
486
- /* If ib_post_send fails, the next ->send_request for
487
- * @req will queue these MWs for recovery.
488
- */
489
- return ib_post_send(ia->ri_id->qp, post_wr, NULL);
408
+ return ib_post_send(r_xprt->rx_ep->re_id->qp, post_wr, NULL);
490409 }
491410
492
-/* Handle a remotely invalidated mr on the @mrs list
411
+/**
412
+ * frwr_reminv - handle a remotely invalidated mr on the @mrs list
413
+ * @rep: Received reply
414
+ * @mrs: list of MRs to check
415
+ *
493416 */
494
-static void
495
-frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
417
+void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
496418 {
497419 struct rpcrdma_mr *mr;
498420
499421 list_for_each_entry(mr, mrs, mr_list)
500422 if (mr->mr_handle == rep->rr_inv_rkey) {
501423 list_del_init(&mr->mr_list);
502
- trace_xprtrdma_remoteinv(mr);
503
- mr->frwr.fr_state = FRWR_IS_INVALID;
504
- rpcrdma_mr_unmap_and_put(mr);
424
+ trace_xprtrdma_mr_reminv(mr);
425
+ rpcrdma_mr_put(mr);
505426 break; /* only one invalidated MR per RPC */
506427 }
507428 }
508429
509
-/* Invalidate all memory regions that were registered for "req".
430
+static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
431
+{
432
+ if (wc->status != IB_WC_SUCCESS)
433
+ frwr_mr_recycle(mr);
434
+ else
435
+ rpcrdma_mr_put(mr);
436
+}
437
+
438
+/**
439
+ * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
440
+ * @cq: completion queue
441
+ * @wc: WCE for a completed LocalInv WR
510442 *
511
- * Sleeps until it is safe for the host CPU to access the
512
- * previously mapped memory regions.
513
- *
514
- * Caller ensures that @mrs is not empty before the call. This
515
- * function empties the list.
516443 */
517
-static void
518
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
444
+static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
445
+{
446
+ struct ib_cqe *cqe = wc->wr_cqe;
447
+ struct rpcrdma_frwr *frwr =
448
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
449
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
450
+
451
+ /* WARNING: Only wr_cqe and status are reliable at this point */
452
+ trace_xprtrdma_wc_li(wc, frwr);
453
+ __frwr_release_mr(wc, mr);
454
+
455
+ rpcrdma_flush_disconnect(cq->cq_context, wc);
456
+}
457
+
458
+/**
459
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
460
+ * @cq: completion queue
461
+ * @wc: WCE for a completed LocalInv WR
462
+ *
463
+ * Awaken anyone waiting for an MR to finish being fenced.
464
+ */
465
+static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
466
+{
467
+ struct ib_cqe *cqe = wc->wr_cqe;
468
+ struct rpcrdma_frwr *frwr =
469
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
470
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
471
+
472
+ /* WARNING: Only wr_cqe and status are reliable at this point */
473
+ trace_xprtrdma_wc_li_wake(wc, frwr);
474
+ __frwr_release_mr(wc, mr);
475
+ complete(&frwr->fr_linv_done);
476
+
477
+ rpcrdma_flush_disconnect(cq->cq_context, wc);
478
+}
479
+
480
+/**
481
+ * frwr_unmap_sync - invalidate memory regions that were registered for @req
482
+ * @r_xprt: controlling transport instance
483
+ * @req: rpcrdma_req with a non-empty list of MRs to process
484
+ *
485
+ * Sleeps until it is safe for the host CPU to access the previously mapped
486
+ * memory regions. This guarantees that registered MRs are properly fenced
487
+ * from the server before the RPC consumer accesses the data in them. It
488
+ * also ensures proper Send flow control: waking the next RPC waits until
489
+ * this RPC has relinquished all its Send Queue entries.
490
+ */
491
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
519492 {
520493 struct ib_send_wr *first, **prev, *last;
521494 const struct ib_send_wr *bad_wr;
522
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
523495 struct rpcrdma_frwr *frwr;
524496 struct rpcrdma_mr *mr;
525
- int count, rc;
497
+ int rc;
526498
527499 /* ORDER: Invalidate all of the MRs first
528500 *
....@@ -530,86 +502,161 @@
530502 * a single ib_post_send() call.
531503 */
532504 frwr = NULL;
533
- count = 0;
534505 prev = &first;
535
- list_for_each_entry(mr, mrs, mr_list) {
536
- mr->frwr.fr_state = FRWR_IS_INVALID;
506
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
507
+
508
+ trace_xprtrdma_mr_localinv(mr);
509
+ r_xprt->rx_stats.local_inv_needed++;
537510
538511 frwr = &mr->frwr;
539
- trace_xprtrdma_localinv(mr);
540
-
541512 frwr->fr_cqe.done = frwr_wc_localinv;
542513 last = &frwr->fr_invwr;
543
- memset(last, 0, sizeof(*last));
514
+ last->next = NULL;
544515 last->wr_cqe = &frwr->fr_cqe;
516
+ last->sg_list = NULL;
517
+ last->num_sge = 0;
545518 last->opcode = IB_WR_LOCAL_INV;
519
+ last->send_flags = IB_SEND_SIGNALED;
546520 last->ex.invalidate_rkey = mr->mr_handle;
547
- count++;
548521
549522 *prev = last;
550523 prev = &last->next;
551524 }
552
- if (!frwr)
553
- goto unmap;
554525
555526 /* Strong send queue ordering guarantees that when the
556527 * last WR in the chain completes, all WRs in the chain
557528 * are complete.
558529 */
559
- last->send_flags = IB_SEND_SIGNALED;
560530 frwr->fr_cqe.done = frwr_wc_localinv_wake;
561531 reinit_completion(&frwr->fr_linv_done);
562532
563533 /* Transport disconnect drains the receive CQ before it
564534 * replaces the QP. The RPC reply handler won't call us
565
- * unless ri_id->qp is a valid pointer.
535
+ * unless re_id->qp is a valid pointer.
566536 */
567
- r_xprt->rx_stats.local_inv_needed++;
568537 bad_wr = NULL;
569
- rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
538
+ rc = ib_post_send(r_xprt->rx_ep->re_id->qp, first, &bad_wr);
539
+
540
+ /* The final LOCAL_INV WR in the chain is supposed to
541
+ * do the wake. If it was never posted, the wake will
542
+ * not happen, so don't wait in that case.
543
+ */
570544 if (bad_wr != first)
571545 wait_for_completion(&frwr->fr_linv_done);
572
- if (rc)
573
- goto reset_mrs;
546
+ if (!rc)
547
+ return;
574548
575
- /* ORDER: Now DMA unmap all of the MRs, and return
576
- * them to the free MR list.
549
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
577550 */
578
-unmap:
579
- while (!list_empty(mrs)) {
580
- mr = rpcrdma_mr_pop(mrs);
581
- rpcrdma_mr_unmap_and_put(mr);
582
- }
583
- return;
584
-
585
-reset_mrs:
586
- pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
587
-
588
- /* Find and reset the MRs in the LOCAL_INV WRs that did not
589
- * get posted.
590
- */
551
+ trace_xprtrdma_post_linv(req, rc);
591552 while (bad_wr) {
592553 frwr = container_of(bad_wr, struct rpcrdma_frwr,
593554 fr_invwr);
594555 mr = container_of(frwr, struct rpcrdma_mr, frwr);
595
-
596
- __frwr_mr_reset(ia, mr);
597
-
598556 bad_wr = bad_wr->next;
557
+
558
+ frwr_mr_recycle(mr);
599559 }
600
- goto unmap;
601560 }
602561
603
-const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
604
- .ro_map = frwr_op_map,
605
- .ro_send = frwr_op_send,
606
- .ro_reminv = frwr_op_reminv,
607
- .ro_unmap_sync = frwr_op_unmap_sync,
608
- .ro_recover_mr = frwr_op_recover_mr,
609
- .ro_open = frwr_op_open,
610
- .ro_maxpages = frwr_op_maxpages,
611
- .ro_init_mr = frwr_op_init_mr,
612
- .ro_release_mr = frwr_op_release_mr,
613
- .ro_displayname = "frwr",
614
- .ro_send_w_inv_ok = RPCRDMA_CMP_F_SND_W_INV_OK,
615
-};
562
+/**
563
+ * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
564
+ * @cq: completion queue
565
+ * @wc: WCE for a completed LocalInv WR
566
+ *
567
+ */
568
+static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
569
+{
570
+ struct ib_cqe *cqe = wc->wr_cqe;
571
+ struct rpcrdma_frwr *frwr =
572
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
573
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
574
+ struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
575
+
576
+ /* WARNING: Only wr_cqe and status are reliable at this point */
577
+ trace_xprtrdma_wc_li_done(wc, frwr);
578
+ __frwr_release_mr(wc, mr);
579
+
580
+ /* Ensure @rep is generated before __frwr_release_mr */
581
+ smp_rmb();
582
+ rpcrdma_complete_rqst(rep);
583
+
584
+ rpcrdma_flush_disconnect(cq->cq_context, wc);
585
+}
586
+
587
+/**
588
+ * frwr_unmap_async - invalidate memory regions that were registered for @req
589
+ * @r_xprt: controlling transport instance
590
+ * @req: rpcrdma_req with a non-empty list of MRs to process
591
+ *
592
+ * This guarantees that registered MRs are properly fenced from the
593
+ * server before the RPC consumer accesses the data in them. It also
594
+ * ensures proper Send flow control: waking the next RPC waits until
595
+ * this RPC has relinquished all its Send Queue entries.
596
+ */
597
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
598
+{
599
+ struct ib_send_wr *first, *last, **prev;
600
+ const struct ib_send_wr *bad_wr;
601
+ struct rpcrdma_frwr *frwr;
602
+ struct rpcrdma_mr *mr;
603
+ int rc;
604
+
605
+ /* Chain the LOCAL_INV Work Requests and post them with
606
+ * a single ib_post_send() call.
607
+ */
608
+ frwr = NULL;
609
+ prev = &first;
610
+ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
611
+
612
+ trace_xprtrdma_mr_localinv(mr);
613
+ r_xprt->rx_stats.local_inv_needed++;
614
+
615
+ frwr = &mr->frwr;
616
+ frwr->fr_cqe.done = frwr_wc_localinv;
617
+ last = &frwr->fr_invwr;
618
+ last->next = NULL;
619
+ last->wr_cqe = &frwr->fr_cqe;
620
+ last->sg_list = NULL;
621
+ last->num_sge = 0;
622
+ last->opcode = IB_WR_LOCAL_INV;
623
+ last->send_flags = IB_SEND_SIGNALED;
624
+ last->ex.invalidate_rkey = mr->mr_handle;
625
+
626
+ *prev = last;
627
+ prev = &last->next;
628
+ }
629
+
630
+ /* Strong send queue ordering guarantees that when the
631
+ * last WR in the chain completes, all WRs in the chain
632
+ * are complete. The last completion will wake up the
633
+ * RPC waiter.
634
+ */
635
+ frwr->fr_cqe.done = frwr_wc_localinv_done;
636
+
637
+ /* Transport disconnect drains the receive CQ before it
638
+ * replaces the QP. The RPC reply handler won't call us
639
+ * unless re_id->qp is a valid pointer.
640
+ */
641
+ bad_wr = NULL;
642
+ rc = ib_post_send(r_xprt->rx_ep->re_id->qp, first, &bad_wr);
643
+ if (!rc)
644
+ return;
645
+
646
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
647
+ */
648
+ trace_xprtrdma_post_linv(req, rc);
649
+ while (bad_wr) {
650
+ frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
651
+ mr = container_of(frwr, struct rpcrdma_mr, frwr);
652
+ bad_wr = bad_wr->next;
653
+
654
+ frwr_mr_recycle(mr);
655
+ }
656
+
657
+ /* The final LOCAL_INV WR in the chain is supposed to
658
+ * do the wake. If it was never posted, the wake will
659
+ * not happen, so wake here in that case.
660
+ */
661
+ rpcrdma_complete_rqst(req->rl_reply);
662
+}