hc
2024-10-22 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5
kernel/net/sunrpc/xprtrdma/backchannel.c
....@@ -5,7 +5,6 @@
55 * Support for backward direction RPCs on RPC/RDMA.
66 */
77
8
-#include <linux/module.h>
98 #include <linux/sunrpc/xprt.h>
109 #include <linux/sunrpc/svc.h>
1110 #include <linux/sunrpc/svc_xprt.h>
....@@ -20,59 +19,6 @@
2019
2120 #undef RPCRDMA_BACKCHANNEL_DEBUG
2221
23
-static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
24
- struct rpc_rqst *rqst)
25
-{
26
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
27
- struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
28
-
29
- spin_lock(&buf->rb_reqslock);
30
- list_del(&req->rl_all);
31
- spin_unlock(&buf->rb_reqslock);
32
-
33
- rpcrdma_destroy_req(req);
34
-}
35
-
36
-static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
37
- unsigned int count)
38
-{
39
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
40
- struct rpc_rqst *rqst;
41
- unsigned int i;
42
-
43
- for (i = 0; i < (count << 1); i++) {
44
- struct rpcrdma_regbuf *rb;
45
- struct rpcrdma_req *req;
46
- size_t size;
47
-
48
- req = rpcrdma_create_req(r_xprt);
49
- if (IS_ERR(req))
50
- return PTR_ERR(req);
51
- rqst = &req->rl_slot;
52
-
53
- rqst->rq_xprt = xprt;
54
- INIT_LIST_HEAD(&rqst->rq_list);
55
- INIT_LIST_HEAD(&rqst->rq_bc_list);
56
- __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
57
- spin_lock_bh(&xprt->bc_pa_lock);
58
- list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
59
- spin_unlock_bh(&xprt->bc_pa_lock);
60
-
61
- size = r_xprt->rx_data.inline_rsize;
62
- rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
63
- if (IS_ERR(rb))
64
- goto out_fail;
65
- req->rl_sendbuf = rb;
66
- xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
67
- min_t(size_t, size, PAGE_SIZE));
68
- }
69
- return 0;
70
-
71
-out_fail:
72
- rpcrdma_bc_free_rqst(r_xprt, rqst);
73
- return -ENOMEM;
74
-}
75
-
7622 /**
7723 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
7824 * @xprt: transport associated with these backchannel resources
....@@ -83,54 +29,9 @@
8329 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
8430 {
8531 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
86
- int rc;
8732
88
- /* The backchannel reply path returns each rpc_rqst to the
89
- * bc_pa_list _after_ the reply is sent. If the server is
90
- * faster than the client, it can send another backward
91
- * direction request before the rpc_rqst is returned to the
92
- * list. The client rejects the request in this case.
93
- *
94
- * Twice as many rpc_rqsts are prepared to ensure there is
95
- * always an rpc_rqst available as soon as a reply is sent.
96
- */
97
- if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
98
- goto out_err;
99
-
100
- rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
101
- if (rc)
102
- goto out_free;
103
-
104
- r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
105
- request_module("svcrdma");
33
+ r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
10634 trace_xprtrdma_cb_setup(r_xprt, reqs);
107
- return 0;
108
-
109
-out_free:
110
- xprt_rdma_bc_destroy(xprt, reqs);
111
-
112
-out_err:
113
- pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
114
- return -ENOMEM;
115
-}
116
-
117
-/**
118
- * xprt_rdma_bc_up - Create transport endpoint for backchannel service
119
- * @serv: server endpoint
120
- * @net: network namespace
121
- *
122
- * The "xprt" is an implied argument: it supplies the name of the
123
- * backchannel transport class.
124
- *
125
- * Returns zero on success, negative errno on failure
126
- */
127
-int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
128
-{
129
- int ret;
130
-
131
- ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
132
- if (ret < 0)
133
- return ret;
13435 return 0;
13536 }
13637
....@@ -143,12 +44,17 @@
14344 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
14445 {
14546 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
146
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
47
+ struct rpcrdma_ep *ep = r_xprt->rx_ep;
14748 size_t maxmsg;
14849
149
- maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
50
+ maxmsg = min_t(unsigned int, ep->re_inline_send, ep->re_inline_recv);
15051 maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
15152 return maxmsg - RPCRDMA_HDRLEN_MIN;
53
+}
54
+
55
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
56
+{
57
+ return RPCRDMA_BACKWARD_WRS >> 1;
15258 }
15359
15460 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
....@@ -159,7 +65,7 @@
15965
16066 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
16167 xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
162
- req->rl_rdmabuf->rg_base);
68
+ rdmab_data(req->rl_rdmabuf), rqst);
16369
16470 p = xdr_reserve_space(&req->rl_stream, 28);
16571 if (unlikely(!p))
....@@ -173,7 +79,7 @@
17379 *p = xdr_zero;
17480
17581 if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
176
- &rqst->rq_snd_buf, rpcrdma_noch))
82
+ &rqst->rq_snd_buf, rpcrdma_noch_pullup))
17783 return -EIO;
17884
17985 trace_xprtrdma_cb_reply(rqst);
....@@ -194,19 +100,22 @@
194100 */
195101 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
196102 {
197
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
103
+ struct rpc_xprt *xprt = rqst->rq_xprt;
104
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
198105 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
199106 int rc;
200107
201
- if (!xprt_connected(rqst->rq_xprt))
202
- goto drop_connection;
108
+ if (!xprt_connected(xprt))
109
+ return -ENOTCONN;
110
+
111
+ if (!xprt_request_get_cong(xprt, rqst))
112
+ return -EBADSLT;
203113
204114 rc = rpcrdma_bc_marshal_reply(rqst);
205115 if (rc < 0)
206116 goto failed_marshal;
207117
208
- rpcrdma_post_recvs(r_xprt, true);
209
- if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
118
+ if (rpcrdma_post_sends(r_xprt, req))
210119 goto drop_connection;
211120 return 0;
212121
....@@ -214,7 +123,7 @@
214123 if (rc != -ENOTCONN)
215124 return rc;
216125 drop_connection:
217
- xprt_disconnect_done(rqst->rq_xprt);
126
+ xprt_rdma_close(xprt);
218127 return -ENOTCONN;
219128 }
220129
....@@ -225,19 +134,18 @@
225134 */
226135 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
227136 {
228
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
229137 struct rpc_rqst *rqst, *tmp;
230138
231
- spin_lock_bh(&xprt->bc_pa_lock);
139
+ spin_lock(&xprt->bc_pa_lock);
232140 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
233141 list_del(&rqst->rq_bc_pa_list);
234
- spin_unlock_bh(&xprt->bc_pa_lock);
142
+ spin_unlock(&xprt->bc_pa_lock);
235143
236
- rpcrdma_bc_free_rqst(r_xprt, rqst);
144
+ rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
237145
238
- spin_lock_bh(&xprt->bc_pa_lock);
146
+ spin_lock(&xprt->bc_pa_lock);
239147 }
240
- spin_unlock_bh(&xprt->bc_pa_lock);
148
+ spin_unlock(&xprt->bc_pa_lock);
241149 }
242150
243151 /**
....@@ -249,15 +157,54 @@
249157 struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
250158 struct rpc_xprt *xprt = rqst->rq_xprt;
251159
252
- dprintk("RPC: %s: freeing rqst %p (req %p)\n",
253
- __func__, rqst, req);
254
-
255160 rpcrdma_recv_buffer_put(req->rl_reply);
256161 req->rl_reply = NULL;
257162
258
- spin_lock_bh(&xprt->bc_pa_lock);
163
+ spin_lock(&xprt->bc_pa_lock);
259164 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
260
- spin_unlock_bh(&xprt->bc_pa_lock);
165
+ spin_unlock(&xprt->bc_pa_lock);
166
+ xprt_put(xprt);
167
+}
168
+
169
+static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
170
+{
171
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
172
+ struct rpcrdma_req *req;
173
+ struct rpc_rqst *rqst;
174
+ size_t size;
175
+
176
+ spin_lock(&xprt->bc_pa_lock);
177
+ rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
178
+ rq_bc_pa_list);
179
+ if (!rqst)
180
+ goto create_req;
181
+ list_del(&rqst->rq_bc_pa_list);
182
+ spin_unlock(&xprt->bc_pa_lock);
183
+ return rqst;
184
+
185
+create_req:
186
+ spin_unlock(&xprt->bc_pa_lock);
187
+
188
+ /* Set a limit to prevent a remote from overrunning our resources.
189
+ */
190
+ if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
191
+ return NULL;
192
+
193
+ size = min_t(size_t, r_xprt->rx_ep->re_inline_recv, PAGE_SIZE);
194
+ req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
195
+ if (!req)
196
+ return NULL;
197
+ if (rpcrdma_req_setup(r_xprt, req)) {
198
+ rpcrdma_req_destroy(req);
199
+ return NULL;
200
+ }
201
+
202
+ xprt->bc_alloc_count++;
203
+ rqst = &req->rl_slot;
204
+ rqst->rq_xprt = xprt;
205
+ __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
206
+ xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
207
+ return rqst;
261208 }
262209
263210 /**
....@@ -291,20 +238,11 @@
291238 pr_info("RPC: %s: %*ph\n", __func__, size, p);
292239 #endif
293240
294
- /* Grab a free bc rqst */
295
- spin_lock(&xprt->bc_pa_lock);
296
- if (list_empty(&xprt->bc_pa_list)) {
297
- spin_unlock(&xprt->bc_pa_lock);
241
+ rqst = rpcrdma_bc_rqst_get(r_xprt);
242
+ if (!rqst)
298243 goto out_overflow;
299
- }
300
- rqst = list_first_entry(&xprt->bc_pa_list,
301
- struct rpc_rqst, rq_bc_pa_list);
302
- list_del(&rqst->rq_bc_pa_list);
303
- spin_unlock(&xprt->bc_pa_lock);
304244
305
- /* Prepare rqst */
306245 rqst->rq_reply_bytes_recvd = 0;
307
- rqst->rq_bytes_sent = 0;
308246 rqst->rq_xid = *p;
309247
310248 rqst->rq_private_buf.len = size;
....@@ -326,6 +264,7 @@
326264
327265 /* Queue rqst for ULP's callback service */
328266 bc_serv = xprt->bc_serv;
267
+ xprt_get(xprt);
329268 spin_lock(&bc_serv->sv_cb_lock);
330269 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
331270 spin_unlock(&bc_serv->sv_cb_lock);
....@@ -337,7 +276,7 @@
337276
338277 out_overflow:
339278 pr_warn("RPC/RDMA backchannel overflow\n");
340
- xprt_disconnect_done(xprt);
279
+ xprt_force_disconnect(xprt);
341280 /* This receive buffer gets reposted automatically
342281 * when the connection is re-established.
343282 */