hc
2023-12-06 08f87f769b595151be1afeff53e144f543faa614
kernel/net/sunrpc/xprtrdma/verbs.c
....@@ -53,6 +53,7 @@
5353 #include <linux/slab.h>
5454 #include <linux/sunrpc/addr.h>
5555 #include <linux/sunrpc/svc_rdma.h>
56
+#include <linux/log2.h>
5657
5758 #include <asm-generic/barrier.h>
5859 #include <asm/bitops.h>
....@@ -73,99 +74,122 @@
7374 /*
7475 * internal functions
7576 */
76
-static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
77
+static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
78
+static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
79
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
80
+ struct rpcrdma_sendctx *sc);
81
+static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
82
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
83
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
84
+static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
7785 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
78
-static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
79
-static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
80
-static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
86
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
87
+static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
88
+static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
89
+static struct rpcrdma_regbuf *
90
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
91
+ gfp_t flags);
92
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
93
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
8194
82
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
83
-
84
-int
85
-rpcrdma_alloc_wq(void)
95
+/* Wait for outstanding transport work to finish. ib_drain_qp
96
+ * handles the drains in the wrong order for us, so open code
97
+ * them here.
98
+ */
99
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
86100 {
87
- struct workqueue_struct *recv_wq;
101
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
102
+ struct rdma_cm_id *id = ep->re_id;
88103
89
- recv_wq = alloc_workqueue("xprtrdma_receive",
90
- WQ_MEM_RECLAIM | WQ_HIGHPRI,
91
- 0);
92
- if (!recv_wq)
93
- return -ENOMEM;
104
+ /* Flush Receives, then wait for deferred Reply work
105
+ * to complete.
106
+ */
107
+ ib_drain_rq(id->qp);
94108
95
- rpcrdma_receive_wq = recv_wq;
96
- return 0;
109
+ /* Deferred Reply processing might have scheduled
110
+ * local invalidations.
111
+ */
112
+ ib_drain_sq(id->qp);
113
+
114
+ rpcrdma_ep_put(ep);
97115 }
98116
99
-void
100
-rpcrdma_destroy_wq(void)
101
-{
102
- struct workqueue_struct *wq;
103
-
104
- if (rpcrdma_receive_wq) {
105
- wq = rpcrdma_receive_wq;
106
- rpcrdma_receive_wq = NULL;
107
- destroy_workqueue(wq);
108
- }
109
-}
110
-
111
-static void
112
-rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117
+/**
118
+ * rpcrdma_qp_event_handler - Handle one QP event (error notification)
119
+ * @event: details of the event
120
+ * @context: ep that owns QP where event occurred
121
+ *
122
+ * Called from the RDMA provider (device driver) possibly in an interrupt
123
+ * context. The QP is always destroyed before the ID, so the ID will be
124
+ * reliably available when this handler is invoked.
125
+ */
126
+static void rpcrdma_qp_event_handler(struct ib_event *event, void *context)
113127 {
114128 struct rpcrdma_ep *ep = context;
115
- struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
116
- rx_ep);
117129
118
- trace_xprtrdma_qp_error(r_xprt, event);
119
- pr_err("rpcrdma: %s on device %s ep %p\n",
120
- ib_event_msg(event->event), event->device->name, context);
130
+ trace_xprtrdma_qp_event(ep, event);
131
+}
121132
122
- if (ep->rep_connected == 1) {
123
- ep->rep_connected = -EIO;
124
- rpcrdma_conn_func(ep);
125
- wake_up_all(&ep->rep_connect_wait);
126
- }
133
+/* Ensure xprt_force_disconnect() is invoked exactly once when a
134
+ * connection is closed or lost. (The important thing is it needs
135
+ * to be invoked "at least" once).
136
+ */
137
+static void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
138
+{
139
+ if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
140
+ xprt_force_disconnect(ep->re_xprt);
141
+}
142
+
143
+/**
144
+ * rpcrdma_flush_disconnect - Disconnect on flushed completion
145
+ * @r_xprt: transport to disconnect
146
+ * @wc: work completion entry
147
+ *
148
+ * Must be called in process context.
149
+ */
150
+void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
151
+{
152
+ if (wc->status != IB_WC_SUCCESS)
153
+ rpcrdma_force_disconnect(r_xprt->rx_ep);
127154 }
128155
129156 /**
130157 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
131
- * @cq: completion queue (ignored)
132
- * @wc: completed WR
158
+ * @cq: completion queue
159
+ * @wc: WCE for a completed Send WR
133160 *
134161 */
135
-static void
136
-rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
162
+static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
137163 {
138164 struct ib_cqe *cqe = wc->wr_cqe;
139165 struct rpcrdma_sendctx *sc =
140166 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
167
+ struct rpcrdma_xprt *r_xprt = cq->cq_context;
141168
142169 /* WARNING: Only wr_cqe and status are reliable at this point */
143170 trace_xprtrdma_wc_send(sc, wc);
144
- if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
145
- pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
146
- ib_wc_status_msg(wc->status),
147
- wc->status, wc->vendor_err);
148
-
149
- rpcrdma_sendctx_put_locked(sc);
171
+ rpcrdma_sendctx_put_locked(r_xprt, sc);
172
+ rpcrdma_flush_disconnect(r_xprt, wc);
150173 }
151174
152175 /**
153176 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
154
- * @cq: completion queue (ignored)
155
- * @wc: completed WR
177
+ * @cq: completion queue
178
+ * @wc: WCE for a completed Receive WR
156179 *
157180 */
158
-static void
159
-rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
181
+static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
160182 {
161183 struct ib_cqe *cqe = wc->wr_cqe;
162184 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
163185 rr_cqe);
186
+ struct rpcrdma_xprt *r_xprt = cq->cq_context;
164187
165
- /* WARNING: Only wr_id and status are reliable at this point */
188
+ /* WARNING: Only wr_cqe and status are reliable at this point */
166189 trace_xprtrdma_wc_receive(wc);
190
+ --r_xprt->rx_ep->re_receive_count;
167191 if (wc->status != IB_WC_SUCCESS)
168
- goto out_fail;
192
+ goto out_flushed;
169193
170194 /* status == SUCCESS means all fields in wc are trustworthy */
171195 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
....@@ -176,179 +200,150 @@
176200 rdmab_addr(rep->rr_rdmabuf),
177201 wc->byte_len, DMA_FROM_DEVICE);
178202
179
-out_schedule:
180203 rpcrdma_reply_handler(rep);
181204 return;
182205
183
-out_fail:
184
- if (wc->status != IB_WC_WR_FLUSH_ERR)
185
- pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
186
- ib_wc_status_msg(wc->status),
187
- wc->status, wc->vendor_err);
188
- rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
189
- goto out_schedule;
206
+out_flushed:
207
+ rpcrdma_flush_disconnect(r_xprt, wc);
208
+ rpcrdma_rep_destroy(rep);
190209 }
191210
192
-static void
193
-rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
194
- struct rdma_conn_param *param)
211
+static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
212
+ struct rdma_conn_param *param)
195213 {
196
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
197214 const struct rpcrdma_connect_private *pmsg = param->private_data;
198215 unsigned int rsize, wsize;
199216
200217 /* Default settings for RPC-over-RDMA Version One */
201
- r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
218
+ ep->re_implicit_roundup = xprt_rdma_pad_optimize;
202219 rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
203220 wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
204221
205222 if (pmsg &&
206223 pmsg->cp_magic == rpcrdma_cmp_magic &&
207224 pmsg->cp_version == RPCRDMA_CMP_VERSION) {
208
- r_xprt->rx_ia.ri_implicit_roundup = true;
225
+ ep->re_implicit_roundup = true;
209226 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
210227 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
211228 }
212229
213
- if (rsize < cdata->inline_rsize)
214
- cdata->inline_rsize = rsize;
215
- if (wsize < cdata->inline_wsize)
216
- cdata->inline_wsize = wsize;
217
- dprintk("RPC: %s: max send %u, max recv %u\n",
218
- __func__, cdata->inline_wsize, cdata->inline_rsize);
219
- rpcrdma_set_max_header_sizes(r_xprt);
230
+ if (rsize < ep->re_inline_recv)
231
+ ep->re_inline_recv = rsize;
232
+ if (wsize < ep->re_inline_send)
233
+ ep->re_inline_send = wsize;
234
+
235
+ rpcrdma_set_max_header_sizes(ep);
220236 }
221237
238
+/**
239
+ * rpcrdma_cm_event_handler - Handle RDMA CM events
240
+ * @id: rdma_cm_id on which an event has occurred
241
+ * @event: details of the event
242
+ *
243
+ * Called with @id's mutex held. Returns 1 if caller should
244
+ * destroy @id, otherwise 0.
245
+ */
222246 static int
223
-rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
247
+rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
224248 {
225
- struct rpcrdma_xprt *xprt = id->context;
226
- struct rpcrdma_ia *ia = &xprt->rx_ia;
227
- struct rpcrdma_ep *ep = &xprt->rx_ep;
228
- int connstate = 0;
249
+ struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
250
+ struct rpcrdma_ep *ep = id->context;
229251
230
- trace_xprtrdma_conn_upcall(xprt, event);
252
+ might_sleep();
253
+
231254 switch (event->event) {
232255 case RDMA_CM_EVENT_ADDR_RESOLVED:
233256 case RDMA_CM_EVENT_ROUTE_RESOLVED:
234
- ia->ri_async_rc = 0;
235
- complete(&ia->ri_done);
236
- break;
257
+ ep->re_async_rc = 0;
258
+ complete(&ep->re_done);
259
+ return 0;
237260 case RDMA_CM_EVENT_ADDR_ERROR:
238
- ia->ri_async_rc = -EPROTO;
239
- complete(&ia->ri_done);
240
- break;
261
+ ep->re_async_rc = -EPROTO;
262
+ complete(&ep->re_done);
263
+ return 0;
241264 case RDMA_CM_EVENT_ROUTE_ERROR:
242
- ia->ri_async_rc = -ENETUNREACH;
243
- complete(&ia->ri_done);
244
- break;
265
+ ep->re_async_rc = -ENETUNREACH;
266
+ complete(&ep->re_done);
267
+ return 0;
245268 case RDMA_CM_EVENT_DEVICE_REMOVAL:
246
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
247
- pr_info("rpcrdma: removing device %s for %s:%s\n",
248
- ia->ri_device->name,
249
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
250
-#endif
251
- init_completion(&ia->ri_remove_done);
252
- set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
253
- ep->rep_connected = -ENODEV;
254
- xprt_force_disconnect(&xprt->rx_xprt);
255
- wait_for_completion(&ia->ri_remove_done);
256
-
257
- ia->ri_id = NULL;
258
- ia->ri_device = NULL;
259
- /* Return 1 to ensure the core destroys the id. */
260
- return 1;
269
+ pr_info("rpcrdma: removing device %s for %pISpc\n",
270
+ ep->re_id->device->name, sap);
271
+ fallthrough;
272
+ case RDMA_CM_EVENT_ADDR_CHANGE:
273
+ ep->re_connect_status = -ENODEV;
274
+ goto disconnected;
261275 case RDMA_CM_EVENT_ESTABLISHED:
262
- ++xprt->rx_xprt.connect_cookie;
263
- connstate = 1;
264
- rpcrdma_update_connect_private(xprt, &event->param.conn);
265
- goto connected;
276
+ rpcrdma_ep_get(ep);
277
+ ep->re_connect_status = 1;
278
+ rpcrdma_update_cm_private(ep, &event->param.conn);
279
+ trace_xprtrdma_inline_thresh(ep);
280
+ wake_up_all(&ep->re_connect_wait);
281
+ break;
266282 case RDMA_CM_EVENT_CONNECT_ERROR:
267
- connstate = -ENOTCONN;
268
- goto connected;
283
+ ep->re_connect_status = -ENOTCONN;
284
+ goto wake_connect_worker;
269285 case RDMA_CM_EVENT_UNREACHABLE:
270
- connstate = -ENETUNREACH;
271
- goto connected;
286
+ ep->re_connect_status = -ENETUNREACH;
287
+ goto wake_connect_worker;
272288 case RDMA_CM_EVENT_REJECTED:
273
- dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
274
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
275
- rdma_reject_msg(id, event->status));
276
- connstate = -ECONNREFUSED;
289
+ dprintk("rpcrdma: connection to %pISpc rejected: %s\n",
290
+ sap, rdma_reject_msg(id, event->status));
291
+ ep->re_connect_status = -ECONNREFUSED;
277292 if (event->status == IB_CM_REJ_STALE_CONN)
278
- connstate = -EAGAIN;
279
- goto connected;
293
+ ep->re_connect_status = -ENOTCONN;
294
+wake_connect_worker:
295
+ wake_up_all(&ep->re_connect_wait);
296
+ return 0;
280297 case RDMA_CM_EVENT_DISCONNECTED:
281
- ++xprt->rx_xprt.connect_cookie;
282
- connstate = -ECONNABORTED;
283
-connected:
284
- ep->rep_connected = connstate;
285
- rpcrdma_conn_func(ep);
286
- wake_up_all(&ep->rep_connect_wait);
287
- /*FALLTHROUGH*/
298
+ ep->re_connect_status = -ECONNABORTED;
299
+disconnected:
300
+ rpcrdma_force_disconnect(ep);
301
+ return rpcrdma_ep_put(ep);
288302 default:
289
- dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n",
290
- __func__,
291
- rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
292
- ia->ri_device->name, ia->ri_ops->ro_displayname,
293
- ep, rdma_event_msg(event->event));
294303 break;
295304 }
296305
306
+ dprintk("RPC: %s: %pISpc on %s/frwr: %s\n", __func__, sap,
307
+ ep->re_id->device->name, rdma_event_msg(event->event));
297308 return 0;
298309 }
299310
300
-static struct rdma_cm_id *
301
-rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
311
+static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
312
+ struct rpcrdma_ep *ep)
302313 {
303314 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
315
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
304316 struct rdma_cm_id *id;
305317 int rc;
306318
307
- trace_xprtrdma_conn_start(xprt);
319
+ init_completion(&ep->re_done);
308320
309
- init_completion(&ia->ri_done);
310
-
311
- id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall,
312
- xprt, RDMA_PS_TCP, IB_QPT_RC);
313
- if (IS_ERR(id)) {
314
- rc = PTR_ERR(id);
315
- dprintk("RPC: %s: rdma_create_id() failed %i\n",
316
- __func__, rc);
321
+ id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
322
+ RDMA_PS_TCP, IB_QPT_RC);
323
+ if (IS_ERR(id))
317324 return id;
318
- }
319325
320
- ia->ri_async_rc = -ETIMEDOUT;
321
- rc = rdma_resolve_addr(id, NULL,
322
- (struct sockaddr *)&xprt->rx_xprt.addr,
326
+ ep->re_async_rc = -ETIMEDOUT;
327
+ rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
323328 RDMA_RESOLVE_TIMEOUT);
324
- if (rc) {
325
- dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
326
- __func__, rc);
329
+ if (rc)
327330 goto out;
328
- }
329
- rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
330
- if (rc < 0) {
331
- trace_xprtrdma_conn_tout(xprt);
331
+ rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
332
+ if (rc < 0)
332333 goto out;
333
- }
334334
335
- rc = ia->ri_async_rc;
335
+ rc = ep->re_async_rc;
336336 if (rc)
337337 goto out;
338338
339
- ia->ri_async_rc = -ETIMEDOUT;
339
+ ep->re_async_rc = -ETIMEDOUT;
340340 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
341
- if (rc) {
342
- dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
343
- __func__, rc);
341
+ if (rc)
344342 goto out;
345
- }
346
- rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
347
- if (rc < 0) {
348
- trace_xprtrdma_conn_tout(xprt);
343
+ rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
344
+ if (rc < 0)
349345 goto out;
350
- }
351
- rc = ia->ri_async_rc;
346
+ rc = ep->re_async_rc;
352347 if (rc)
353348 goto out;
354349
....@@ -359,454 +354,256 @@
359354 return ERR_PTR(rc);
360355 }
361356
362
-/*
363
- * Exported functions.
364
- */
365
-
366
-/**
367
- * rpcrdma_ia_open - Open and initialize an Interface Adapter.
368
- * @xprt: transport with IA to (re)initialize
369
- *
370
- * Returns 0 on success, negative errno if an appropriate
371
- * Interface Adapter could not be found and opened.
372
- */
373
-int
374
-rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
357
+static void rpcrdma_ep_destroy(struct kref *kref)
375358 {
376
- struct rpcrdma_ia *ia = &xprt->rx_ia;
359
+ struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
360
+
361
+ if (ep->re_id->qp) {
362
+ rdma_destroy_qp(ep->re_id);
363
+ ep->re_id->qp = NULL;
364
+ }
365
+
366
+ if (ep->re_attr.recv_cq)
367
+ ib_free_cq(ep->re_attr.recv_cq);
368
+ ep->re_attr.recv_cq = NULL;
369
+ if (ep->re_attr.send_cq)
370
+ ib_free_cq(ep->re_attr.send_cq);
371
+ ep->re_attr.send_cq = NULL;
372
+
373
+ if (ep->re_pd)
374
+ ib_dealloc_pd(ep->re_pd);
375
+ ep->re_pd = NULL;
376
+
377
+ kfree(ep);
378
+ module_put(THIS_MODULE);
379
+}
380
+
381
+static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
382
+{
383
+ kref_get(&ep->re_kref);
384
+}
385
+
386
+/* Returns:
387
+ * %0 if @ep still has a positive kref count, or
388
+ * %1 if @ep was destroyed successfully.
389
+ */
390
+static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
391
+{
392
+ return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
393
+}
394
+
395
+static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
396
+{
397
+ struct rpcrdma_connect_private *pmsg;
398
+ struct ib_device *device;
399
+ struct rdma_cm_id *id;
400
+ struct rpcrdma_ep *ep;
377401 int rc;
378402
379
- ia->ri_id = rpcrdma_create_id(xprt, ia);
380
- if (IS_ERR(ia->ri_id)) {
381
- rc = PTR_ERR(ia->ri_id);
382
- goto out_err;
403
+ ep = kzalloc(sizeof(*ep), GFP_NOFS);
404
+ if (!ep)
405
+ return -ENOTCONN;
406
+ ep->re_xprt = &r_xprt->rx_xprt;
407
+ kref_init(&ep->re_kref);
408
+
409
+ id = rpcrdma_create_id(r_xprt, ep);
410
+ if (IS_ERR(id)) {
411
+ kfree(ep);
412
+ return PTR_ERR(id);
383413 }
384
- ia->ri_device = ia->ri_id->device;
414
+ __module_get(THIS_MODULE);
415
+ device = id->device;
416
+ ep->re_id = id;
385417
386
- ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
387
- if (IS_ERR(ia->ri_pd)) {
388
- rc = PTR_ERR(ia->ri_pd);
389
- pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
390
- goto out_err;
391
- }
392
-
393
- switch (xprt_rdma_memreg_strategy) {
394
- case RPCRDMA_FRWR:
395
- if (frwr_is_supported(ia)) {
396
- ia->ri_ops = &rpcrdma_frwr_memreg_ops;
397
- break;
398
- }
399
- /*FALLTHROUGH*/
400
- case RPCRDMA_MTHCAFMR:
401
- if (fmr_is_supported(ia)) {
402
- ia->ri_ops = &rpcrdma_fmr_memreg_ops;
403
- break;
404
- }
405
- /*FALLTHROUGH*/
406
- default:
407
- pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
408
- ia->ri_device->name, xprt_rdma_memreg_strategy);
409
- rc = -EINVAL;
410
- goto out_err;
411
- }
412
-
413
- return 0;
414
-
415
-out_err:
416
- rpcrdma_ia_close(ia);
417
- return rc;
418
-}
419
-
420
-/**
421
- * rpcrdma_ia_remove - Handle device driver unload
422
- * @ia: interface adapter being removed
423
- *
424
- * Divest transport H/W resources associated with this adapter,
425
- * but allow it to be restored later.
426
- */
427
-void
428
-rpcrdma_ia_remove(struct rpcrdma_ia *ia)
429
-{
430
- struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
431
- rx_ia);
432
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
433
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
434
- struct rpcrdma_req *req;
435
- struct rpcrdma_rep *rep;
436
-
437
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
438
-
439
- /* This is similar to rpcrdma_ep_destroy, but:
440
- * - Don't cancel the connect worker.
441
- * - Don't call rpcrdma_ep_disconnect, which waits
442
- * for another conn upcall, which will deadlock.
443
- * - rdma_disconnect is unneeded, the underlying
444
- * connection is already gone.
445
- */
446
- if (ia->ri_id->qp) {
447
- ib_drain_qp(ia->ri_id->qp);
448
- rdma_destroy_qp(ia->ri_id);
449
- ia->ri_id->qp = NULL;
450
- }
451
- ib_free_cq(ep->rep_attr.recv_cq);
452
- ep->rep_attr.recv_cq = NULL;
453
- ib_free_cq(ep->rep_attr.send_cq);
454
- ep->rep_attr.send_cq = NULL;
455
-
456
- /* The ULP is responsible for ensuring all DMA
457
- * mappings and MRs are gone.
458
- */
459
- list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
460
- rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
461
- list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
462
- rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
463
- rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
464
- rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
465
- }
466
- rpcrdma_mrs_destroy(buf);
467
- ib_dealloc_pd(ia->ri_pd);
468
- ia->ri_pd = NULL;
469
-
470
- /* Allow waiters to continue */
471
- complete(&ia->ri_remove_done);
472
-
473
- trace_xprtrdma_remove(r_xprt);
474
-}
475
-
476
-/**
477
- * rpcrdma_ia_close - Clean up/close an IA.
478
- * @ia: interface adapter to close
479
- *
480
- */
481
-void
482
-rpcrdma_ia_close(struct rpcrdma_ia *ia)
483
-{
484
- if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
485
- if (ia->ri_id->qp)
486
- rdma_destroy_qp(ia->ri_id);
487
- rdma_destroy_id(ia->ri_id);
488
- }
489
- ia->ri_id = NULL;
490
- ia->ri_device = NULL;
491
-
492
- /* If the pd is still busy, xprtrdma missed freeing a resource */
493
- if (ia->ri_pd && !IS_ERR(ia->ri_pd))
494
- ib_dealloc_pd(ia->ri_pd);
495
- ia->ri_pd = NULL;
496
-}
497
-
498
-/*
499
- * Create unconnected endpoint.
500
- */
501
-int
502
-rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
503
- struct rpcrdma_create_data_internal *cdata)
504
-{
505
- struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
506
- struct ib_cq *sendcq, *recvcq;
507
- unsigned int max_sge;
508
- int rc;
509
-
510
- max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
511
- RPCRDMA_MAX_SEND_SGES);
512
- if (max_sge < RPCRDMA_MIN_SEND_SGES) {
513
- pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
514
- return -ENOMEM;
515
- }
516
- ia->ri_max_send_sges = max_sge;
517
-
518
- rc = ia->ri_ops->ro_open(ia, ep, cdata);
418
+ ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
419
+ ep->re_inline_send = xprt_rdma_max_inline_write;
420
+ ep->re_inline_recv = xprt_rdma_max_inline_read;
421
+ rc = frwr_query_device(ep, device);
519422 if (rc)
520
- return rc;
423
+ goto out_destroy;
521424
522
- ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
523
- ep->rep_attr.qp_context = ep;
524
- ep->rep_attr.srq = NULL;
525
- ep->rep_attr.cap.max_send_sge = max_sge;
526
- ep->rep_attr.cap.max_recv_sge = 1;
527
- ep->rep_attr.cap.max_inline_data = 0;
528
- ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
529
- ep->rep_attr.qp_type = IB_QPT_RC;
530
- ep->rep_attr.port_num = ~0;
425
+ r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
426
+
427
+ ep->re_attr.event_handler = rpcrdma_qp_event_handler;
428
+ ep->re_attr.qp_context = ep;
429
+ ep->re_attr.srq = NULL;
430
+ ep->re_attr.cap.max_inline_data = 0;
431
+ ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
432
+ ep->re_attr.qp_type = IB_QPT_RC;
433
+ ep->re_attr.port_num = ~0;
531434
532435 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
533436 "iovs: send %d recv %d\n",
534437 __func__,
535
- ep->rep_attr.cap.max_send_wr,
536
- ep->rep_attr.cap.max_recv_wr,
537
- ep->rep_attr.cap.max_send_sge,
538
- ep->rep_attr.cap.max_recv_sge);
438
+ ep->re_attr.cap.max_send_wr,
439
+ ep->re_attr.cap.max_recv_wr,
440
+ ep->re_attr.cap.max_send_sge,
441
+ ep->re_attr.cap.max_recv_sge);
539442
540
- /* set trigger for requesting send completion */
541
- ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
542
- cdata->max_requests >> 2);
543
- ep->rep_send_count = ep->rep_send_batch;
544
- init_waitqueue_head(&ep->rep_connect_wait);
545
- INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
443
+ ep->re_send_batch = ep->re_max_requests >> 3;
444
+ ep->re_send_count = ep->re_send_batch;
445
+ init_waitqueue_head(&ep->re_connect_wait);
546446
547
- sendcq = ib_alloc_cq(ia->ri_device, NULL,
548
- ep->rep_attr.cap.max_send_wr + 1,
549
- ia->ri_device->num_comp_vectors > 1 ? 1 : 0,
550
- IB_POLL_WORKQUEUE);
551
- if (IS_ERR(sendcq)) {
552
- rc = PTR_ERR(sendcq);
553
- dprintk("RPC: %s: failed to create send CQ: %i\n",
554
- __func__, rc);
555
- goto out1;
447
+ ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
448
+ ep->re_attr.cap.max_send_wr,
449
+ IB_POLL_WORKQUEUE);
450
+ if (IS_ERR(ep->re_attr.send_cq)) {
451
+ rc = PTR_ERR(ep->re_attr.send_cq);
452
+ ep->re_attr.send_cq = NULL;
453
+ goto out_destroy;
556454 }
557455
558
- recvcq = ib_alloc_cq(ia->ri_device, NULL,
559
- ep->rep_attr.cap.max_recv_wr + 1,
560
- 0, IB_POLL_WORKQUEUE);
561
- if (IS_ERR(recvcq)) {
562
- rc = PTR_ERR(recvcq);
563
- dprintk("RPC: %s: failed to create recv CQ: %i\n",
564
- __func__, rc);
565
- goto out2;
456
+ ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
457
+ ep->re_attr.cap.max_recv_wr,
458
+ IB_POLL_WORKQUEUE);
459
+ if (IS_ERR(ep->re_attr.recv_cq)) {
460
+ rc = PTR_ERR(ep->re_attr.recv_cq);
461
+ ep->re_attr.recv_cq = NULL;
462
+ goto out_destroy;
566463 }
567
-
568
- ep->rep_attr.send_cq = sendcq;
569
- ep->rep_attr.recv_cq = recvcq;
464
+ ep->re_receive_count = 0;
570465
571466 /* Initialize cma parameters */
572
- memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
467
+ memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
573468
574469 /* Prepare RDMA-CM private message */
470
+ pmsg = &ep->re_cm_private;
575471 pmsg->cp_magic = rpcrdma_cmp_magic;
576472 pmsg->cp_version = RPCRDMA_CMP_VERSION;
577
- pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
578
- pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
579
- pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
580
- ep->rep_remote_cma.private_data = pmsg;
581
- ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
473
+ pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
474
+ pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
475
+ pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
476
+ ep->re_remote_cma.private_data = pmsg;
477
+ ep->re_remote_cma.private_data_len = sizeof(*pmsg);
582478
583479 /* Client offers RDMA Read but does not initiate */
584
- ep->rep_remote_cma.initiator_depth = 0;
585
- ep->rep_remote_cma.responder_resources =
586
- min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
480
+ ep->re_remote_cma.initiator_depth = 0;
481
+ ep->re_remote_cma.responder_resources =
482
+ min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
587483
588484 /* Limit transport retries so client can detect server
589485 * GID changes quickly. RPC layer handles re-establishing
590486 * transport connection and retransmission.
591487 */
592
- ep->rep_remote_cma.retry_count = 6;
488
+ ep->re_remote_cma.retry_count = 6;
593489
594490 /* RPC-over-RDMA handles its own flow control. In addition,
595491 * make all RNR NAKs visible so we know that RPC-over-RDMA
596492 * flow control is working correctly (no NAKs should be seen).
597493 */
598
- ep->rep_remote_cma.flow_control = 0;
599
- ep->rep_remote_cma.rnr_retry_count = 0;
494
+ ep->re_remote_cma.flow_control = 0;
495
+ ep->re_remote_cma.rnr_retry_count = 0;
600496
601
- return 0;
602
-
603
-out2:
604
- ib_free_cq(sendcq);
605
-out1:
606
- return rc;
607
-}
608
-
609
-/*
610
- * rpcrdma_ep_destroy
611
- *
612
- * Disconnect and destroy endpoint. After this, the only
613
- * valid operations on the ep are to free it (if dynamically
614
- * allocated) or re-create it.
615
- */
616
-void
617
-rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
618
-{
619
- cancel_delayed_work_sync(&ep->rep_connect_worker);
620
-
621
- if (ia->ri_id && ia->ri_id->qp) {
622
- rpcrdma_ep_disconnect(ep, ia);
623
- rdma_destroy_qp(ia->ri_id);
624
- ia->ri_id->qp = NULL;
625
- }
626
-
627
- if (ep->rep_attr.recv_cq)
628
- ib_free_cq(ep->rep_attr.recv_cq);
629
- if (ep->rep_attr.send_cq)
630
- ib_free_cq(ep->rep_attr.send_cq);
631
-}
632
-
633
-/* Re-establish a connection after a device removal event.
634
- * Unlike a normal reconnection, a fresh PD and a new set
635
- * of MRs and buffers is needed.
636
- */
637
-static int
638
-rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
639
- struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
640
-{
641
- int rc, err;
642
-
643
- trace_xprtrdma_reinsert(r_xprt);
644
-
645
- rc = -EHOSTUNREACH;
646
- if (rpcrdma_ia_open(r_xprt))
647
- goto out1;
648
-
649
- rc = -ENOMEM;
650
- err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
651
- if (err) {
652
- pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
653
- goto out2;
654
- }
655
-
656
- rc = -ENETUNREACH;
657
- err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
658
- if (err) {
659
- pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
660
- goto out3;
661
- }
662
-
663
- rpcrdma_mrs_create(r_xprt);
664
- return 0;
665
-
666
-out3:
667
- rpcrdma_ep_destroy(ep, ia);
668
-out2:
669
- rpcrdma_ia_close(ia);
670
-out1:
671
- return rc;
672
-}
673
-
674
-static int
675
-rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
676
- struct rpcrdma_ia *ia)
677
-{
678
- struct rdma_cm_id *id, *old;
679
- int err, rc;
680
-
681
- trace_xprtrdma_reconnect(r_xprt);
682
-
683
- rpcrdma_ep_disconnect(ep, ia);
684
-
685
- rc = -EHOSTUNREACH;
686
- id = rpcrdma_create_id(r_xprt, ia);
687
- if (IS_ERR(id))
688
- goto out;
689
-
690
- /* As long as the new ID points to the same device as the
691
- * old ID, we can reuse the transport's existing PD and all
692
- * previously allocated MRs. Also, the same device means
693
- * the transport's previous DMA mappings are still valid.
694
- *
695
- * This is a sanity check only. There should be no way these
696
- * point to two different devices here.
697
- */
698
- old = id;
699
- rc = -ENETUNREACH;
700
- if (ia->ri_device != id->device) {
701
- pr_err("rpcrdma: can't reconnect on different device!\n");
497
+ ep->re_pd = ib_alloc_pd(device, 0);
498
+ if (IS_ERR(ep->re_pd)) {
499
+ rc = PTR_ERR(ep->re_pd);
500
+ ep->re_pd = NULL;
702501 goto out_destroy;
703502 }
704503
705
- err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
706
- if (err) {
707
- dprintk("RPC: %s: rdma_create_qp returned %d\n",
708
- __func__, err);
504
+ rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
505
+ if (rc)
709506 goto out_destroy;
710
- }
711507
712
- /* Atomically replace the transport's ID and QP. */
713
- rc = 0;
714
- old = ia->ri_id;
715
- ia->ri_id = id;
716
- rdma_destroy_qp(old);
508
+ r_xprt->rx_ep = ep;
509
+ return 0;
717510
718511 out_destroy:
719
- rdma_destroy_id(old);
720
-out:
512
+ rpcrdma_ep_put(ep);
513
+ rdma_destroy_id(id);
721514 return rc;
722515 }
723516
724
-/*
725
- * Connect unconnected endpoint.
517
+/**
518
+ * rpcrdma_xprt_connect - Connect an unconnected transport
519
+ * @r_xprt: controlling transport instance
520
+ *
521
+ * Returns 0 on success or a negative errno.
726522 */
727
-int
728
-rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
523
+int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
729524 {
730
- struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
731
- rx_ia);
525
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
526
+ struct rpcrdma_ep *ep;
732527 int rc;
733528
734
-retry:
735
- switch (ep->rep_connected) {
736
- case 0:
737
- dprintk("RPC: %s: connecting...\n", __func__);
738
- rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
739
- if (rc) {
740
- dprintk("RPC: %s: rdma_create_qp failed %i\n",
741
- __func__, rc);
742
- rc = -ENETUNREACH;
743
- goto out_noupdate;
744
- }
745
- break;
746
- case -ENODEV:
747
- rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
748
- if (rc)
749
- goto out_noupdate;
750
- break;
751
- default:
752
- rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
753
- if (rc)
754
- goto out;
755
- }
756
-
757
- ep->rep_connected = 0;
758
- rpcrdma_post_recvs(r_xprt, true);
759
-
760
- rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
761
- if (rc) {
762
- dprintk("RPC: %s: rdma_connect() failed with %i\n",
763
- __func__, rc);
764
- goto out;
765
- }
766
-
767
- wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
768
- if (ep->rep_connected <= 0) {
769
- if (ep->rep_connected == -EAGAIN)
770
- goto retry;
771
- rc = ep->rep_connected;
772
- goto out;
773
- }
774
-
775
- dprintk("RPC: %s: connected\n", __func__);
776
-
777
-out:
529
+ rc = rpcrdma_ep_create(r_xprt);
778530 if (rc)
779
- ep->rep_connected = rc;
531
+ return rc;
532
+ ep = r_xprt->rx_ep;
780533
781
-out_noupdate:
534
+ xprt_clear_connected(xprt);
535
+ rpcrdma_reset_cwnd(r_xprt);
536
+
537
+ /* Bump the ep's reference count while there are
538
+ * outstanding Receives.
539
+ */
540
+ rpcrdma_ep_get(ep);
541
+ rpcrdma_post_recvs(r_xprt, 1, true);
542
+
543
+ rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
544
+ if (rc)
545
+ goto out;
546
+
547
+ if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
548
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
549
+ wait_event_interruptible(ep->re_connect_wait,
550
+ ep->re_connect_status != 0);
551
+ if (ep->re_connect_status <= 0) {
552
+ rc = ep->re_connect_status;
553
+ goto out;
554
+ }
555
+
556
+ rc = rpcrdma_sendctxs_create(r_xprt);
557
+ if (rc) {
558
+ rc = -ENOTCONN;
559
+ goto out;
560
+ }
561
+
562
+ rc = rpcrdma_reqs_setup(r_xprt);
563
+ if (rc) {
564
+ rc = -ENOTCONN;
565
+ goto out;
566
+ }
567
+ rpcrdma_mrs_create(r_xprt);
568
+
569
+out:
570
+ trace_xprtrdma_connect(r_xprt, rc);
782571 return rc;
783572 }
784573
785
-/*
786
- * rpcrdma_ep_disconnect
574
+/**
575
+ * rpcrdma_xprt_disconnect - Disconnect underlying transport
576
+ * @r_xprt: controlling transport instance
787577 *
788
- * This is separate from destroy to facilitate the ability
789
- * to reconnect without recreating the endpoint.
578
+ * Caller serializes. Either the transport send lock is held,
579
+ * or we're being called to destroy the transport.
790580 *
791
- * This call is not reentrant, and must not be made in parallel
792
- * on the same endpoint.
581
+ * On return, @r_xprt is completely divested of all hardware
582
+ * resources and prepared for the next ->connect operation.
793583 */
794
-void
795
-rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
584
+void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
796585 {
586
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
587
+ struct rdma_cm_id *id;
797588 int rc;
798589
799
- rc = rdma_disconnect(ia->ri_id);
800
- if (!rc)
801
- /* returns without wait if not connected */
802
- wait_event_interruptible(ep->rep_connect_wait,
803
- ep->rep_connected != 1);
804
- else
805
- ep->rep_connected = rc;
806
- trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
807
- rx_ep), rc);
590
+ if (!ep)
591
+ return;
808592
809
- ib_drain_qp(ia->ri_id->qp);
593
+ id = ep->re_id;
594
+ rc = rdma_disconnect(id);
595
+ trace_xprtrdma_disconnect(r_xprt, rc);
596
+
597
+ rpcrdma_xprt_drain(r_xprt);
598
+ rpcrdma_reps_unmap(r_xprt);
599
+ rpcrdma_reqs_reset(r_xprt);
600
+ rpcrdma_mrs_destroy(r_xprt);
601
+ rpcrdma_sendctxs_destroy(r_xprt);
602
+
603
+ if (rpcrdma_ep_put(ep))
604
+ rdma_destroy_id(id);
605
+
606
+ r_xprt->rx_ep = NULL;
810607 }
811608
812609 /* Fixed-size circular FIFO queue. This implementation is wait-free and
....@@ -823,31 +620,31 @@
823620 */
824621
825622 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
826
- * queue activity, and ib_drain_qp has flushed all remaining Send
827
- * requests.
623
+ * queue activity, and rpcrdma_xprt_drain has flushed all remaining
624
+ * Send requests.
828625 */
829
-static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
626
+static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
830627 {
628
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
831629 unsigned long i;
832630
631
+ if (!buf->rb_sc_ctxs)
632
+ return;
833633 for (i = 0; i <= buf->rb_sc_last; i++)
834634 kfree(buf->rb_sc_ctxs[i]);
835635 kfree(buf->rb_sc_ctxs);
636
+ buf->rb_sc_ctxs = NULL;
836637 }
837638
838
-static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
639
+static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
839640 {
840641 struct rpcrdma_sendctx *sc;
841642
842
- sc = kzalloc(sizeof(*sc) +
843
- ia->ri_max_send_sges * sizeof(struct ib_sge),
643
+ sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
844644 GFP_KERNEL);
845645 if (!sc)
846646 return NULL;
847647
848
- sc->sc_wr.wr_cqe = &sc->sc_cqe;
849
- sc->sc_wr.sg_list = sc->sc_sges;
850
- sc->sc_wr.opcode = IB_WR_SEND;
851648 sc->sc_cqe.done = rpcrdma_wc_send;
852649 return sc;
853650 }
....@@ -863,23 +660,22 @@
863660 * the ->send_request call to fail temporarily before too many
864661 * Sends are posted.
865662 */
866
- i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
867
- dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i);
663
+ i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
868664 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
869665 if (!buf->rb_sc_ctxs)
870666 return -ENOMEM;
871667
872668 buf->rb_sc_last = i - 1;
873669 for (i = 0; i <= buf->rb_sc_last; i++) {
874
- sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
670
+ sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
875671 if (!sc)
876672 return -ENOMEM;
877673
878
- sc->sc_xprt = r_xprt;
879674 buf->rb_sc_ctxs[i] = sc;
880675 }
881
- buf->rb_flags = 0;
882676
677
+ buf->rb_sc_head = 0;
678
+ buf->rb_sc_tail = 0;
883679 return 0;
884680 }
885681
....@@ -895,20 +691,20 @@
895691
896692 /**
897693 * rpcrdma_sendctx_get_locked - Acquire a send context
898
- * @buf: transport buffers from which to acquire an unused context
694
+ * @r_xprt: controlling transport instance
899695 *
900696 * Returns pointer to a free send completion context; or NULL if
901697 * the queue is empty.
902698 *
903699 * Usage: Called to acquire an SGE array before preparing a Send WR.
904700 *
905
- * The caller serializes calls to this function (per rpcrdma_buffer),
906
- * and provides an effective memory barrier that flushes the new value
701
+ * The caller serializes calls to this function (per transport), and
702
+ * provides an effective memory barrier that flushes the new value
907703 * of rb_sc_head.
908704 */
909
-struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
705
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
910706 {
911
- struct rpcrdma_xprt *r_xprt;
707
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
912708 struct rpcrdma_sendctx *sc;
913709 unsigned long next_head;
914710
....@@ -932,28 +728,28 @@
932728 * completions recently. This is a sign the Send Queue is
933729 * backing up. Cause the caller to pause and try again.
934730 */
935
- set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
936
- r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
731
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
937732 r_xprt->rx_stats.empty_sendctx_q++;
938733 return NULL;
939734 }
940735
941736 /**
942737 * rpcrdma_sendctx_put_locked - Release a send context
738
+ * @r_xprt: controlling transport instance
943739 * @sc: send context to release
944740 *
945741 * Usage: Called from Send completion to return a sendctxt
946742 * to the queue.
947743 *
948
- * The caller serializes calls to this function (per rpcrdma_buffer).
744
+ * The caller serializes calls to this function (per transport).
949745 */
950
-static void
951
-rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
746
+static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
747
+ struct rpcrdma_sendctx *sc)
952748 {
953
- struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
749
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
954750 unsigned long next_tail;
955751
956
- /* Unmap SGEs of previously completed by unsignaled
752
+ /* Unmap SGEs of previously completed but unsignaled
957753 * Sends by walking up the queue until @sc is found.
958754 */
959755 next_tail = buf->rb_sc_tail;
....@@ -961,218 +757,323 @@
961757 next_tail = rpcrdma_sendctx_next(buf, next_tail);
962758
963759 /* ORDER: item must be accessed _before_ tail is updated */
964
- rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
760
+ rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
965761
966762 } while (buf->rb_sc_ctxs[next_tail] != sc);
967763
968764 /* Paired with READ_ONCE */
969765 smp_store_release(&buf->rb_sc_tail, next_tail);
970766
971
- if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
972
- smp_mb__after_atomic();
973
- xprt_write_space(&sc->sc_xprt->rx_xprt);
974
- }
975
-}
976
-
977
-static void
978
-rpcrdma_mr_recovery_worker(struct work_struct *work)
979
-{
980
- struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
981
- rb_recovery_worker.work);
982
- struct rpcrdma_mr *mr;
983
-
984
- spin_lock(&buf->rb_recovery_lock);
985
- while (!list_empty(&buf->rb_stale_mrs)) {
986
- mr = rpcrdma_mr_pop(&buf->rb_stale_mrs);
987
- spin_unlock(&buf->rb_recovery_lock);
988
-
989
- trace_xprtrdma_recover_mr(mr);
990
- mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr);
991
-
992
- spin_lock(&buf->rb_recovery_lock);
993
- }
994
- spin_unlock(&buf->rb_recovery_lock);
995
-}
996
-
997
-void
998
-rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr)
999
-{
1000
- struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1001
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1002
-
1003
- spin_lock(&buf->rb_recovery_lock);
1004
- rpcrdma_mr_push(mr, &buf->rb_stale_mrs);
1005
- spin_unlock(&buf->rb_recovery_lock);
1006
-
1007
- schedule_delayed_work(&buf->rb_recovery_worker, 0);
767
+ xprt_write_space(&r_xprt->rx_xprt);
1008768 }
1009769
1010770 static void
1011771 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
1012772 {
1013773 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1014
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
774
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
1015775 unsigned int count;
1016
- LIST_HEAD(free);
1017
- LIST_HEAD(all);
1018776
1019
- for (count = 0; count < 3; count++) {
777
+ for (count = 0; count < ep->re_max_rdma_segs; count++) {
1020778 struct rpcrdma_mr *mr;
1021779 int rc;
1022780
1023
- mr = kzalloc(sizeof(*mr), GFP_KERNEL);
781
+ mr = kzalloc(sizeof(*mr), GFP_NOFS);
1024782 if (!mr)
1025783 break;
1026784
1027
- rc = ia->ri_ops->ro_init_mr(ia, mr);
785
+ rc = frwr_mr_init(r_xprt, mr);
1028786 if (rc) {
1029787 kfree(mr);
1030788 break;
1031789 }
1032790
1033
- mr->mr_xprt = r_xprt;
1034
-
1035
- list_add(&mr->mr_list, &free);
1036
- list_add(&mr->mr_all, &all);
791
+ spin_lock(&buf->rb_lock);
792
+ rpcrdma_mr_push(mr, &buf->rb_mrs);
793
+ list_add(&mr->mr_all, &buf->rb_all_mrs);
794
+ spin_unlock(&buf->rb_lock);
1037795 }
1038796
1039
- spin_lock(&buf->rb_mrlock);
1040
- list_splice(&free, &buf->rb_mrs);
1041
- list_splice(&all, &buf->rb_all);
1042797 r_xprt->rx_stats.mrs_allocated += count;
1043
- spin_unlock(&buf->rb_mrlock);
1044798 trace_xprtrdma_createmrs(r_xprt, count);
1045
-
1046
- xprt_write_space(&r_xprt->rx_xprt);
1047799 }
1048800
1049801 static void
1050802 rpcrdma_mr_refresh_worker(struct work_struct *work)
1051803 {
1052804 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
1053
- rb_refresh_worker.work);
805
+ rb_refresh_worker);
1054806 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1055807 rx_buf);
1056808
1057809 rpcrdma_mrs_create(r_xprt);
810
+ xprt_write_space(&r_xprt->rx_xprt);
1058811 }
1059812
1060
-struct rpcrdma_req *
1061
-rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
813
+/**
814
+ * rpcrdma_mrs_refresh - Wake the MR refresh worker
815
+ * @r_xprt: controlling transport instance
816
+ *
817
+ */
818
+void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
819
+{
820
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
821
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
822
+
823
+ /* If there is no underlying connection, it's no use
824
+ * to wake the refresh worker.
825
+ */
826
+ if (ep->re_connect_status == 1) {
827
+ /* The work is scheduled on a WQ_MEM_RECLAIM
828
+ * workqueue in order to prevent MR allocation
829
+ * from recursing into NFS during direct reclaim.
830
+ */
831
+ queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
832
+ }
833
+}
834
+
835
+/**
836
+ * rpcrdma_req_create - Allocate an rpcrdma_req object
837
+ * @r_xprt: controlling r_xprt
838
+ * @size: initial size, in bytes, of send and receive buffers
839
+ * @flags: GFP flags passed to memory allocators
840
+ *
841
+ * Returns an allocated and fully initialized rpcrdma_req or NULL.
842
+ */
843
+struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
844
+ gfp_t flags)
1062845 {
1063846 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1064
- struct rpcrdma_regbuf *rb;
1065847 struct rpcrdma_req *req;
1066848
1067
- req = kzalloc(sizeof(*req), GFP_KERNEL);
849
+ req = kzalloc(sizeof(*req), flags);
1068850 if (req == NULL)
1069
- return ERR_PTR(-ENOMEM);
851
+ goto out1;
1070852
1071
- rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
1072
- DMA_TO_DEVICE, GFP_KERNEL);
1073
- if (IS_ERR(rb)) {
1074
- kfree(req);
1075
- return ERR_PTR(-ENOMEM);
1076
- }
1077
- req->rl_rdmabuf = rb;
1078
- xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
1079
- req->rl_buffer = buffer;
853
+ req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
854
+ if (!req->rl_sendbuf)
855
+ goto out2;
856
+
857
+ req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
858
+ if (!req->rl_recvbuf)
859
+ goto out3;
860
+
861
+ INIT_LIST_HEAD(&req->rl_free_mrs);
1080862 INIT_LIST_HEAD(&req->rl_registered);
1081
-
1082
- spin_lock(&buffer->rb_reqslock);
863
+ spin_lock(&buffer->rb_lock);
1083864 list_add(&req->rl_all, &buffer->rb_allreqs);
1084
- spin_unlock(&buffer->rb_reqslock);
865
+ spin_unlock(&buffer->rb_lock);
1085866 return req;
867
+
868
+out3:
869
+ kfree(req->rl_sendbuf);
870
+out2:
871
+ kfree(req);
872
+out1:
873
+ return NULL;
1086874 }
1087875
1088
-static int
1089
-rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
876
+/**
877
+ * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
878
+ * @r_xprt: controlling transport instance
879
+ * @req: rpcrdma_req object to set up
880
+ *
881
+ * Returns zero on success, and a negative errno on failure.
882
+ */
883
+int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1090884 {
1091
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
885
+ struct rpcrdma_regbuf *rb;
886
+ size_t maxhdrsize;
887
+
888
+ /* Compute maximum header buffer size in bytes */
889
+ maxhdrsize = rpcrdma_fixed_maxsz + 3 +
890
+ r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
891
+ maxhdrsize *= sizeof(__be32);
892
+ rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
893
+ DMA_TO_DEVICE, GFP_KERNEL);
894
+ if (!rb)
895
+ goto out;
896
+
897
+ if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
898
+ goto out_free;
899
+
900
+ req->rl_rdmabuf = rb;
901
+ xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
902
+ return 0;
903
+
904
+out_free:
905
+ rpcrdma_regbuf_free(rb);
906
+out:
907
+ return -ENOMEM;
908
+}
909
+
910
+/* ASSUMPTION: the rb_allreqs list is stable for the duration,
911
+ * and thus can be walked without holding rb_lock. Eg. the
912
+ * caller is holding the transport send lock to exclude
913
+ * device removal or disconnection.
914
+ */
915
+static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
916
+{
1092917 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1093
- struct rpcrdma_rep *rep;
918
+ struct rpcrdma_req *req;
1094919 int rc;
1095920
1096
- rc = -ENOMEM;
921
+ list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
922
+ rc = rpcrdma_req_setup(r_xprt, req);
923
+ if (rc)
924
+ return rc;
925
+ }
926
+ return 0;
927
+}
928
+
929
+static void rpcrdma_req_reset(struct rpcrdma_req *req)
930
+{
931
+ /* Credits are valid for only one connection */
932
+ req->rl_slot.rq_cong = 0;
933
+
934
+ rpcrdma_regbuf_free(req->rl_rdmabuf);
935
+ req->rl_rdmabuf = NULL;
936
+
937
+ rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
938
+ rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
939
+
940
+ frwr_reset(req);
941
+}
942
+
943
+/* ASSUMPTION: the rb_allreqs list is stable for the duration,
944
+ * and thus can be walked without holding rb_lock. Eg. the
945
+ * caller is holding the transport send lock to exclude
946
+ * device removal or disconnection.
947
+ */
948
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
949
+{
950
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
951
+ struct rpcrdma_req *req;
952
+
953
+ list_for_each_entry(req, &buf->rb_allreqs, rl_all)
954
+ rpcrdma_req_reset(req);
955
+}
956
+
957
+/* No locking needed here. This function is called only by the
958
+ * Receive completion handler.
959
+ */
960
+static noinline
961
+struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
962
+ bool temp)
963
+{
964
+ struct rpcrdma_rep *rep;
965
+
1097966 rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1098967 if (rep == NULL)
1099968 goto out;
1100969
1101
- rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
970
+ rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
1102971 DMA_FROM_DEVICE, GFP_KERNEL);
1103
- if (IS_ERR(rep->rr_rdmabuf)) {
1104
- rc = PTR_ERR(rep->rr_rdmabuf);
972
+ if (!rep->rr_rdmabuf)
1105973 goto out_free;
1106
- }
1107
- xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
1108
- rdmab_length(rep->rr_rdmabuf));
1109974
975
+ if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
976
+ goto out_free_regbuf;
977
+
978
+ xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
979
+ rdmab_length(rep->rr_rdmabuf));
1110980 rep->rr_cqe.done = rpcrdma_wc_receive;
1111981 rep->rr_rxprt = r_xprt;
1112
- INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1113982 rep->rr_recv_wr.next = NULL;
1114983 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
1115984 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1116985 rep->rr_recv_wr.num_sge = 1;
1117986 rep->rr_temp = temp;
987
+ list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
988
+ return rep;
1118989
1119
- spin_lock(&buf->rb_lock);
1120
- list_add(&rep->rr_list, &buf->rb_recv_bufs);
1121
- spin_unlock(&buf->rb_lock);
1122
- return 0;
1123
-
990
+out_free_regbuf:
991
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
1124992 out_free:
1125993 kfree(rep);
1126994 out:
1127
- dprintk("RPC: %s: reply buffer %d alloc failed\n",
1128
- __func__, rc);
1129
- return rc;
995
+ return NULL;
1130996 }
1131997
1132
-int
1133
-rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
998
+/* No locking needed here. This function is invoked only by the
999
+ * Receive completion handler, or during transport shutdown.
1000
+ */
1001
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
1002
+{
1003
+ list_del(&rep->rr_all);
1004
+ rpcrdma_regbuf_free(rep->rr_rdmabuf);
1005
+ kfree(rep);
1006
+}
1007
+
1008
+static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1009
+{
1010
+ struct llist_node *node;
1011
+
1012
+ /* Calls to llist_del_first are required to be serialized */
1013
+ node = llist_del_first(&buf->rb_free_reps);
1014
+ if (!node)
1015
+ return NULL;
1016
+ return llist_entry(node, struct rpcrdma_rep, rr_node);
1017
+}
1018
+
1019
+static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
1020
+ struct rpcrdma_rep *rep)
1021
+{
1022
+ llist_add(&rep->rr_node, &buf->rb_free_reps);
1023
+}
1024
+
1025
+static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1026
+{
1027
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1028
+ struct rpcrdma_rep *rep;
1029
+
1030
+ list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
1031
+ rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1032
+ rep->rr_temp = true;
1033
+ }
1034
+}
1035
+
1036
+static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1037
+{
1038
+ struct rpcrdma_rep *rep;
1039
+
1040
+ while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
1041
+ rpcrdma_rep_destroy(rep);
1042
+}
1043
+
1044
+/**
1045
+ * rpcrdma_buffer_create - Create initial set of req/rep objects
1046
+ * @r_xprt: transport instance to (re)initialize
1047
+ *
1048
+ * Returns zero on success, otherwise a negative errno.
1049
+ */
1050
+int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
11341051 {
11351052 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
11361053 int i, rc;
11371054
1138
- buf->rb_max_requests = r_xprt->rx_data.max_requests;
11391055 buf->rb_bc_srv_max_requests = 0;
1140
- spin_lock_init(&buf->rb_mrlock);
11411056 spin_lock_init(&buf->rb_lock);
1142
- spin_lock_init(&buf->rb_recovery_lock);
11431057 INIT_LIST_HEAD(&buf->rb_mrs);
1144
- INIT_LIST_HEAD(&buf->rb_all);
1145
- INIT_LIST_HEAD(&buf->rb_stale_mrs);
1146
- INIT_DELAYED_WORK(&buf->rb_refresh_worker,
1147
- rpcrdma_mr_refresh_worker);
1148
- INIT_DELAYED_WORK(&buf->rb_recovery_worker,
1149
- rpcrdma_mr_recovery_worker);
1150
-
1151
- rpcrdma_mrs_create(r_xprt);
1058
+ INIT_LIST_HEAD(&buf->rb_all_mrs);
1059
+ INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
11521060
11531061 INIT_LIST_HEAD(&buf->rb_send_bufs);
11541062 INIT_LIST_HEAD(&buf->rb_allreqs);
1155
- spin_lock_init(&buf->rb_reqslock);
1156
- for (i = 0; i < buf->rb_max_requests; i++) {
1063
+ INIT_LIST_HEAD(&buf->rb_all_reps);
1064
+
1065
+ rc = -ENOMEM;
1066
+ for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
11571067 struct rpcrdma_req *req;
11581068
1159
- req = rpcrdma_create_req(r_xprt);
1160
- if (IS_ERR(req)) {
1161
- dprintk("RPC: %s: request buffer %d alloc"
1162
- " failed\n", __func__, i);
1163
- rc = PTR_ERR(req);
1069
+ req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
1070
+ GFP_KERNEL);
1071
+ if (!req)
11641072 goto out;
1165
- }
11661073 list_add(&req->rl_list, &buf->rb_send_bufs);
11671074 }
11681075
1169
- buf->rb_credits = 1;
1170
- buf->rb_posted_receives = 0;
1171
- INIT_LIST_HEAD(&buf->rb_recv_bufs);
1172
-
1173
- rc = rpcrdma_sendctxs_create(r_xprt);
1174
- if (rc)
1175
- goto out;
1076
+ init_llist_head(&buf->rb_free_reps);
11761077
11771078 return 0;
11781079 out:
....@@ -1180,85 +1081,85 @@
11801081 return rc;
11811082 }
11821083
1183
-static void
1184
-rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1084
+/**
1085
+ * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1086
+ * @req: unused object to be destroyed
1087
+ *
1088
+ * Relies on caller holding the transport send lock to protect
1089
+ * removing req->rl_all from buf->rb_all_reqs safely.
1090
+ */
1091
+void rpcrdma_req_destroy(struct rpcrdma_req *req)
11851092 {
1186
- rpcrdma_free_regbuf(rep->rr_rdmabuf);
1187
- kfree(rep);
1188
-}
1093
+ struct rpcrdma_mr *mr;
11891094
1190
-void
1191
-rpcrdma_destroy_req(struct rpcrdma_req *req)
1192
-{
1193
- rpcrdma_free_regbuf(req->rl_recvbuf);
1194
- rpcrdma_free_regbuf(req->rl_sendbuf);
1195
- rpcrdma_free_regbuf(req->rl_rdmabuf);
1095
+ list_del(&req->rl_all);
1096
+
1097
+ while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1098
+ struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1099
+
1100
+ spin_lock(&buf->rb_lock);
1101
+ list_del(&mr->mr_all);
1102
+ spin_unlock(&buf->rb_lock);
1103
+
1104
+ frwr_release_mr(mr);
1105
+ }
1106
+
1107
+ rpcrdma_regbuf_free(req->rl_recvbuf);
1108
+ rpcrdma_regbuf_free(req->rl_sendbuf);
1109
+ rpcrdma_regbuf_free(req->rl_rdmabuf);
11961110 kfree(req);
11971111 }
11981112
1199
-static void
1200
-rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
1113
+/**
1114
+ * rpcrdma_mrs_destroy - Release all of a transport's MRs
1115
+ * @r_xprt: controlling transport instance
1116
+ *
1117
+ * Relies on caller holding the transport send lock to protect
1118
+ * removing mr->mr_list from req->rl_free_mrs safely.
1119
+ */
1120
+static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
12011121 {
1202
- struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1203
- rx_buf);
1204
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1122
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
12051123 struct rpcrdma_mr *mr;
1206
- unsigned int count;
12071124
1208
- count = 0;
1209
- spin_lock(&buf->rb_mrlock);
1210
- while (!list_empty(&buf->rb_all)) {
1211
- mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
1125
+ cancel_work_sync(&buf->rb_refresh_worker);
1126
+
1127
+ spin_lock(&buf->rb_lock);
1128
+ while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1129
+ struct rpcrdma_mr,
1130
+ mr_all)) != NULL) {
1131
+ list_del(&mr->mr_list);
12121132 list_del(&mr->mr_all);
1133
+ spin_unlock(&buf->rb_lock);
12131134
1214
- spin_unlock(&buf->rb_mrlock);
1135
+ frwr_release_mr(mr);
12151136
1216
- /* Ensure MW is not on any rl_registered list */
1217
- if (!list_empty(&mr->mr_list))
1218
- list_del(&mr->mr_list);
1219
-
1220
- ia->ri_ops->ro_release_mr(mr);
1221
- count++;
1222
- spin_lock(&buf->rb_mrlock);
1137
+ spin_lock(&buf->rb_lock);
12231138 }
1224
- spin_unlock(&buf->rb_mrlock);
1225
- r_xprt->rx_stats.mrs_allocated = 0;
1226
-
1227
- dprintk("RPC: %s: released %u MRs\n", __func__, count);
1139
+ spin_unlock(&buf->rb_lock);
12281140 }
12291141
1142
+/**
1143
+ * rpcrdma_buffer_destroy - Release all hw resources
1144
+ * @buf: root control block for resources
1145
+ *
1146
+ * ORDERING: relies on a prior rpcrdma_xprt_drain :
1147
+ * - No more Send or Receive completions can occur
1148
+ * - All MRs, reps, and reqs are returned to their free lists
1149
+ */
12301150 void
12311151 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
12321152 {
1233
- cancel_delayed_work_sync(&buf->rb_recovery_worker);
1234
- cancel_delayed_work_sync(&buf->rb_refresh_worker);
1153
+ rpcrdma_reps_destroy(buf);
12351154
1236
- rpcrdma_sendctxs_destroy(buf);
1237
-
1238
- while (!list_empty(&buf->rb_recv_bufs)) {
1239
- struct rpcrdma_rep *rep;
1240
-
1241
- rep = list_first_entry(&buf->rb_recv_bufs,
1242
- struct rpcrdma_rep, rr_list);
1243
- list_del(&rep->rr_list);
1244
- rpcrdma_destroy_rep(rep);
1245
- }
1246
-
1247
- spin_lock(&buf->rb_reqslock);
1248
- while (!list_empty(&buf->rb_allreqs)) {
1155
+ while (!list_empty(&buf->rb_send_bufs)) {
12491156 struct rpcrdma_req *req;
12501157
1251
- req = list_first_entry(&buf->rb_allreqs,
1252
- struct rpcrdma_req, rl_all);
1253
- list_del(&req->rl_all);
1254
-
1255
- spin_unlock(&buf->rb_reqslock);
1256
- rpcrdma_destroy_req(req);
1257
- spin_lock(&buf->rb_reqslock);
1158
+ req = list_first_entry(&buf->rb_send_bufs,
1159
+ struct rpcrdma_req, rl_list);
1160
+ list_del(&req->rl_list);
1161
+ rpcrdma_req_destroy(req);
12581162 }
1259
- spin_unlock(&buf->rb_reqslock);
1260
-
1261
- rpcrdma_mrs_destroy(buf);
12621163 }
12631164
12641165 /**
....@@ -1272,61 +1173,45 @@
12721173 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
12731174 {
12741175 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1275
- struct rpcrdma_mr *mr = NULL;
1176
+ struct rpcrdma_mr *mr;
12761177
1277
- spin_lock(&buf->rb_mrlock);
1278
- if (!list_empty(&buf->rb_mrs))
1279
- mr = rpcrdma_mr_pop(&buf->rb_mrs);
1280
- spin_unlock(&buf->rb_mrlock);
1281
-
1282
- if (!mr)
1283
- goto out_nomrs;
1178
+ spin_lock(&buf->rb_lock);
1179
+ mr = rpcrdma_mr_pop(&buf->rb_mrs);
1180
+ spin_unlock(&buf->rb_lock);
12841181 return mr;
1285
-
1286
-out_nomrs:
1287
- trace_xprtrdma_nomrs(r_xprt);
1288
- if (r_xprt->rx_ep.rep_connected != -ENODEV)
1289
- schedule_delayed_work(&buf->rb_refresh_worker, 0);
1290
-
1291
- /* Allow the reply handler and refresh worker to run */
1292
- cond_resched();
1293
-
1294
- return NULL;
1295
-}
1296
-
1297
-static void
1298
-__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
1299
-{
1300
- spin_lock(&buf->rb_mrlock);
1301
- rpcrdma_mr_push(mr, &buf->rb_mrs);
1302
- spin_unlock(&buf->rb_mrlock);
13031182 }
13041183
13051184 /**
1306
- * rpcrdma_mr_put - Release an rpcrdma_mr object
1307
- * @mr: object to release
1185
+ * rpcrdma_mr_put - DMA unmap an MR and release it
1186
+ * @mr: MR to release
13081187 *
13091188 */
1310
-void
1311
-rpcrdma_mr_put(struct rpcrdma_mr *mr)
1312
-{
1313
- __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
1314
-}
1315
-
1316
-/**
1317
- * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
1318
- * @mr: object to release
1319
- *
1320
- */
1321
-void
1322
-rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
1189
+void rpcrdma_mr_put(struct rpcrdma_mr *mr)
13231190 {
13241191 struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
13251192
1326
- trace_xprtrdma_dma_unmap(mr);
1327
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
1328
- mr->mr_sg, mr->mr_nents, mr->mr_dir);
1329
- __rpcrdma_mr_put(&r_xprt->rx_buf, mr);
1193
+ if (mr->mr_dir != DMA_NONE) {
1194
+ trace_xprtrdma_mr_unmap(mr);
1195
+ ib_dma_unmap_sg(r_xprt->rx_ep->re_id->device,
1196
+ mr->mr_sg, mr->mr_nents, mr->mr_dir);
1197
+ mr->mr_dir = DMA_NONE;
1198
+ }
1199
+
1200
+ rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs);
1201
+}
1202
+
1203
+/**
1204
+ * rpcrdma_reply_put - Put reply buffers back into pool
1205
+ * @buffers: buffer pool
1206
+ * @req: object to return
1207
+ *
1208
+ */
1209
+void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1210
+{
1211
+ if (req->rl_reply) {
1212
+ rpcrdma_rep_put(buffers, req->rl_reply);
1213
+ req->rl_reply = NULL;
1214
+ }
13301215 }
13311216
13321217 /**
....@@ -1351,105 +1236,110 @@
13511236
13521237 /**
13531238 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1239
+ * @buffers: buffer pool
13541240 * @req: object to return
13551241 *
13561242 */
1357
-void
1358
-rpcrdma_buffer_put(struct rpcrdma_req *req)
1243
+void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
13591244 {
1360
- struct rpcrdma_buffer *buffers = req->rl_buffer;
1361
- struct rpcrdma_rep *rep = req->rl_reply;
1362
-
1363
- req->rl_reply = NULL;
1245
+ rpcrdma_reply_put(buffers, req);
13641246
13651247 spin_lock(&buffers->rb_lock);
13661248 list_add(&req->rl_list, &buffers->rb_send_bufs);
1367
- if (rep) {
1368
- if (!rep->rr_temp) {
1369
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
1370
- rep = NULL;
1371
- }
1372
- }
13731249 spin_unlock(&buffers->rb_lock);
1374
- if (rep)
1375
- rpcrdma_destroy_rep(rep);
1376
-}
1377
-
1378
-/*
1379
- * Put reply buffers back into pool when not attached to
1380
- * request. This happens in error conditions.
1381
- */
1382
-void
1383
-rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1384
-{
1385
- struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1386
-
1387
- if (!rep->rr_temp) {
1388
- spin_lock(&buffers->rb_lock);
1389
- list_add(&rep->rr_list, &buffers->rb_recv_bufs);
1390
- spin_unlock(&buffers->rb_lock);
1391
- } else {
1392
- rpcrdma_destroy_rep(rep);
1393
- }
13941250 }
13951251
13961252 /**
1397
- * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1398
- * @size: size of buffer to be allocated, in bytes
1399
- * @direction: direction of data movement
1400
- * @flags: GFP flags
1253
+ * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
1254
+ * @rep: rep to release
14011255 *
1402
- * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
1403
- * can be persistently DMA-mapped for I/O.
1256
+ * Used after error conditions.
1257
+ */
1258
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1259
+{
1260
+ rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
1261
+}
1262
+
1263
+/* Returns a pointer to a rpcrdma_regbuf object, or NULL.
14041264 *
14051265 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
14061266 * receiving the payload of RDMA RECV operations. During Long Calls
1407
- * or Replies they may be registered externally via ro_map.
1267
+ * or Replies they may be registered externally via frwr_map.
14081268 */
1409
-struct rpcrdma_regbuf *
1410
-rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
1269
+static struct rpcrdma_regbuf *
1270
+rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
14111271 gfp_t flags)
14121272 {
14131273 struct rpcrdma_regbuf *rb;
14141274
1415
- rb = kmalloc(sizeof(*rb) + size, flags);
1416
- if (rb == NULL)
1417
- return ERR_PTR(-ENOMEM);
1275
+ rb = kmalloc(sizeof(*rb), flags);
1276
+ if (!rb)
1277
+ return NULL;
1278
+ rb->rg_data = kmalloc(size, flags);
1279
+ if (!rb->rg_data) {
1280
+ kfree(rb);
1281
+ return NULL;
1282
+ }
14181283
14191284 rb->rg_device = NULL;
14201285 rb->rg_direction = direction;
14211286 rb->rg_iov.length = size;
1422
-
14231287 return rb;
14241288 }
14251289
14261290 /**
1427
- * __rpcrdma_map_regbuf - DMA-map a regbuf
1428
- * @ia: controlling rpcrdma_ia
1429
- * @rb: regbuf to be mapped
1291
+ * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1292
+ * @rb: regbuf to reallocate
1293
+ * @size: size of buffer to be allocated, in bytes
1294
+ * @flags: GFP flags
1295
+ *
1296
+ * Returns true if reallocation was successful. If false is
1297
+ * returned, @rb is left untouched.
14301298 */
1431
-bool
1432
-__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1299
+bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
14331300 {
1434
- struct ib_device *device = ia->ri_device;
1301
+ void *buf;
1302
+
1303
+ buf = kmalloc(size, flags);
1304
+ if (!buf)
1305
+ return false;
1306
+
1307
+ rpcrdma_regbuf_dma_unmap(rb);
1308
+ kfree(rb->rg_data);
1309
+
1310
+ rb->rg_data = buf;
1311
+ rb->rg_iov.length = size;
1312
+ return true;
1313
+}
1314
+
1315
+/**
1316
+ * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1317
+ * @r_xprt: controlling transport instance
1318
+ * @rb: regbuf to be mapped
1319
+ *
1320
+ * Returns true if the buffer is now DMA mapped to @r_xprt's device
1321
+ */
1322
+bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1323
+ struct rpcrdma_regbuf *rb)
1324
+{
1325
+ struct ib_device *device = r_xprt->rx_ep->re_id->device;
14351326
14361327 if (rb->rg_direction == DMA_NONE)
14371328 return false;
14381329
1439
- rb->rg_iov.addr = ib_dma_map_single(device,
1440
- (void *)rb->rg_base,
1441
- rdmab_length(rb),
1442
- rb->rg_direction);
1443
- if (ib_dma_mapping_error(device, rdmab_addr(rb)))
1330
+ rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1331
+ rdmab_length(rb), rb->rg_direction);
1332
+ if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1333
+ trace_xprtrdma_dma_maperr(rdmab_addr(rb));
14441334 return false;
1335
+ }
14451336
14461337 rb->rg_device = device;
1447
- rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
1338
+ rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
14481339 return true;
14491340 }
14501341
1451
-static void
1452
-rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
1342
+static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
14531343 {
14541344 if (!rb)
14551345 return;
....@@ -1457,106 +1347,98 @@
14571347 if (!rpcrdma_regbuf_is_mapped(rb))
14581348 return;
14591349
1460
- ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
1461
- rdmab_length(rb), rb->rg_direction);
1350
+ ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1351
+ rb->rg_direction);
14621352 rb->rg_device = NULL;
14631353 }
14641354
1465
-/**
1466
- * rpcrdma_free_regbuf - deregister and free registered buffer
1467
- * @rb: regbuf to be deregistered and freed
1468
- */
1469
-void
1470
-rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1355
+static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
14711356 {
1472
- rpcrdma_dma_unmap_regbuf(rb);
1357
+ rpcrdma_regbuf_dma_unmap(rb);
1358
+ if (rb)
1359
+ kfree(rb->rg_data);
14731360 kfree(rb);
14741361 }
14751362
1476
-/*
1477
- * Prepost any receive buffer, then post send.
1363
+/**
1364
+ * rpcrdma_post_sends - Post WRs to a transport's Send Queue
1365
+ * @r_xprt: controlling transport instance
1366
+ * @req: rpcrdma_req containing the Send WR to post
14781367 *
1479
- * Receive buffer is donated to hardware, reclaimed upon recv completion.
1368
+ * Returns 0 if the post was successful, otherwise -ENOTCONN
1369
+ * is returned.
14801370 */
1481
-int
1482
-rpcrdma_ep_post(struct rpcrdma_ia *ia,
1483
- struct rpcrdma_ep *ep,
1484
- struct rpcrdma_req *req)
1371
+int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
14851372 {
1486
- struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1373
+ struct ib_send_wr *send_wr = &req->rl_wr;
1374
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
14871375 int rc;
14881376
1489
- if (!ep->rep_send_count ||
1490
- test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1377
+ if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) {
14911378 send_wr->send_flags |= IB_SEND_SIGNALED;
1492
- ep->rep_send_count = ep->rep_send_batch;
1379
+ ep->re_send_count = ep->re_send_batch;
14931380 } else {
14941381 send_wr->send_flags &= ~IB_SEND_SIGNALED;
1495
- --ep->rep_send_count;
1382
+ --ep->re_send_count;
14961383 }
14971384
1498
- rc = ia->ri_ops->ro_send(ia, req);
1499
- trace_xprtrdma_post_send(req, rc);
1385
+ trace_xprtrdma_post_send(req);
1386
+ rc = frwr_send(r_xprt, req);
15001387 if (rc)
15011388 return -ENOTCONN;
15021389 return 0;
15031390 }
15041391
15051392 /**
1506
- * rpcrdma_post_recvs - Maybe post some Receive buffers
1507
- * @r_xprt: controlling transport
1508
- * @temp: when true, allocate temp rpcrdma_rep objects
1393
+ * rpcrdma_post_recvs - Refill the Receive Queue
1394
+ * @r_xprt: controlling transport instance
1395
+ * @needed: current credit grant
1396
+ * @temp: mark Receive buffers to be deleted after one use
15091397 *
15101398 */
1511
-void
1512
-rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1399
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
15131400 {
15141401 struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1402
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
15151403 struct ib_recv_wr *wr, *bad_wr;
1516
- int needed, count, rc;
1404
+ struct rpcrdma_rep *rep;
1405
+ int count, rc;
15171406
1518
- needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1519
- if (buf->rb_posted_receives > needed)
1520
- return;
1521
- needed -= buf->rb_posted_receives;
1522
-
1407
+ rc = 0;
15231408 count = 0;
1409
+
1410
+ if (likely(ep->re_receive_count > needed))
1411
+ goto out;
1412
+ needed -= ep->re_receive_count;
1413
+ if (!temp)
1414
+ needed += RPCRDMA_MAX_RECV_BATCH;
1415
+
1416
+ /* fast path: all needed reps can be found on the free list */
15241417 wr = NULL;
15251418 while (needed) {
1526
- struct rpcrdma_regbuf *rb;
1527
- struct rpcrdma_rep *rep;
1528
-
1529
- spin_lock(&buf->rb_lock);
1530
- rep = list_first_entry_or_null(&buf->rb_recv_bufs,
1531
- struct rpcrdma_rep, rr_list);
1532
- if (likely(rep))
1533
- list_del(&rep->rr_list);
1534
- spin_unlock(&buf->rb_lock);
1535
- if (!rep) {
1536
- if (rpcrdma_create_rep(r_xprt, temp))
1537
- break;
1419
+ rep = rpcrdma_rep_get_locked(buf);
1420
+ if (rep && rep->rr_temp) {
1421
+ rpcrdma_rep_destroy(rep);
15381422 continue;
15391423 }
1424
+ if (!rep)
1425
+ rep = rpcrdma_rep_create(r_xprt, temp);
1426
+ if (!rep)
1427
+ break;
15401428
1541
- rb = rep->rr_rdmabuf;
1542
- if (!rpcrdma_regbuf_is_mapped(rb)) {
1543
- if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
1544
- rpcrdma_recv_buffer_put(rep);
1545
- break;
1546
- }
1547
- }
1548
-
1549
- trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
1429
+ trace_xprtrdma_post_recv(rep);
15501430 rep->rr_recv_wr.next = wr;
15511431 wr = &rep->rr_recv_wr;
1552
- ++count;
15531432 --needed;
1433
+ ++count;
15541434 }
1555
- if (!count)
1556
- return;
1435
+ if (!wr)
1436
+ goto out;
15571437
1558
- rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
1438
+ rc = ib_post_recv(ep->re_id->qp, wr,
15591439 (const struct ib_recv_wr **)&bad_wr);
1440
+out:
1441
+ trace_xprtrdma_post_recvs(r_xprt, count, rc);
15601442 if (rc) {
15611443 for (wr = bad_wr; wr;) {
15621444 struct rpcrdma_rep *rep;
....@@ -1567,6 +1449,6 @@
15671449 --count;
15681450 }
15691451 }
1570
- buf->rb_posted_receives += count;
1571
- trace_xprtrdma_post_recvs(r_xprt, count, rc);
1452
+ ep->re_receive_count += count;
1453
+ return;
15721454 }