hc
2024-05-10 10ebd8556b7990499c896a550e3d416b444211e6
kernel/net/smc/smc_cdc.c
....@@ -21,13 +21,6 @@
2121
2222 /********************************** send *************************************/
2323
24
-struct smc_cdc_tx_pend {
25
- struct smc_connection *conn; /* socket connection */
26
- union smc_host_cursor cursor; /* tx sndbuf cursor sent */
27
- union smc_host_cursor p_cursor; /* rx RMBE cursor produced */
28
- u16 ctrl_seq; /* conn. tx sequence # */
29
-};
30
-
3124 /* handler for send/transmission completion of a CDC msg */
3225 static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
3326 struct smc_link *link,
....@@ -37,10 +30,6 @@
3730 struct smc_connection *conn = cdcpend->conn;
3831 struct smc_sock *smc;
3932 int diff;
40
-
41
- if (!conn)
42
- /* already dismissed */
43
- return;
4433
4534 smc = container_of(conn, struct smc_sock, conn);
4635 bh_lock_sock(&smc->sk);
....@@ -54,23 +43,38 @@
5443 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
5544 smp_mb__after_atomic();
5645 smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn);
46
+ smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor,
47
+ conn);
48
+ conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
5749 }
50
+
51
+ if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) &&
52
+ unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
53
+ wake_up(&conn->cdc_pend_tx_wq);
54
+ WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
55
+
5856 smc_tx_sndbuf_nonfull(smc);
5957 bh_unlock_sock(&smc->sk);
6058 }
6159
6260 int smc_cdc_get_free_slot(struct smc_connection *conn,
61
+ struct smc_link *link,
6362 struct smc_wr_buf **wr_buf,
63
+ struct smc_rdma_wr **wr_rdma_buf,
6464 struct smc_cdc_tx_pend **pend)
6565 {
66
- struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
6766 int rc;
6867
6968 rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
69
+ wr_rdma_buf,
7070 (struct smc_wr_tx_pend_priv **)pend);
71
- if (!conn->alert_token_local)
71
+ if (conn->killed) {
7272 /* abnormal termination */
73
+ if (!rc)
74
+ smc_wr_tx_put_slot(link,
75
+ (struct smc_wr_tx_pend_priv *)(*pend));
7376 rc = -EPIPE;
77
+ }
7478 return rc;
7579 }
7680
....@@ -81,7 +85,7 @@
8185 sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE,
8286 "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
8387 BUILD_BUG_ON_MSG(
84
- sizeof(struct smc_cdc_msg) != SMC_WR_TX_SIZE,
88
+ offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE,
8589 "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
8690 BUILD_BUG_ON_MSG(
8791 sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE,
....@@ -96,20 +100,63 @@
96100 struct smc_wr_buf *wr_buf,
97101 struct smc_cdc_tx_pend *pend)
98102 {
103
+ struct smc_link *link = conn->lnk;
99104 union smc_host_cursor cfed;
100
- struct smc_link *link;
101105 int rc;
102106
103
- link = &conn->lgr->lnk[SMC_SINGLE_LINK];
107
+ if (unlikely(!READ_ONCE(conn->sndbuf_desc)))
108
+ return -ENOBUFS;
104109
105110 smc_cdc_add_pending_send(conn, pend);
106111
107112 conn->tx_cdc_seq++;
108113 conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
109114 smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed);
115
+
116
+ atomic_inc(&conn->cdc_pend_tx_wr);
117
+ smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
118
+
110119 rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
111
- if (!rc)
120
+ if (!rc) {
112121 smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
122
+ conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
123
+ } else {
124
+ conn->tx_cdc_seq--;
125
+ conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
126
+ atomic_dec(&conn->cdc_pend_tx_wr);
127
+ }
128
+
129
+ return rc;
130
+}
131
+
132
+/* send a validation msg indicating the move of a conn to an other QP link */
133
+int smcr_cdc_msg_send_validation(struct smc_connection *conn,
134
+ struct smc_cdc_tx_pend *pend,
135
+ struct smc_wr_buf *wr_buf)
136
+{
137
+ struct smc_host_cdc_msg *local = &conn->local_tx_ctrl;
138
+ struct smc_link *link = conn->lnk;
139
+ struct smc_cdc_msg *peer;
140
+ int rc;
141
+
142
+ peer = (struct smc_cdc_msg *)wr_buf;
143
+ peer->common.type = local->common.type;
144
+ peer->len = local->len;
145
+ peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */
146
+ peer->token = htonl(local->token);
147
+ peer->prod_flags.failover_validation = 1;
148
+
149
+ /* We need to set pend->conn here to make sure smc_cdc_tx_handler()
150
+ * can handle properly
151
+ */
152
+ smc_cdc_add_pending_send(conn, pend);
153
+
154
+ atomic_inc(&conn->cdc_pend_tx_wr);
155
+ smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
156
+
157
+ rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
158
+ if (unlikely(rc))
159
+ atomic_dec(&conn->cdc_pend_tx_wr);
113160
114161 return rc;
115162 }
....@@ -118,18 +165,43 @@
118165 {
119166 struct smc_cdc_tx_pend *pend;
120167 struct smc_wr_buf *wr_buf;
168
+ struct smc_link *link;
169
+ bool again = false;
121170 int rc;
122171
123
- rc = smc_cdc_get_free_slot(conn, &wr_buf, &pend);
172
+again:
173
+ link = conn->lnk;
174
+ if (!smc_wr_tx_link_hold(link))
175
+ return -ENOLINK;
176
+ rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
124177 if (rc)
125
- return rc;
178
+ goto put_out;
126179
127
- return smc_cdc_msg_send(conn, wr_buf, pend);
180
+ spin_lock_bh(&conn->send_lock);
181
+ if (link != conn->lnk) {
182
+ /* link of connection changed, try again one time*/
183
+ spin_unlock_bh(&conn->send_lock);
184
+ smc_wr_tx_put_slot(link,
185
+ (struct smc_wr_tx_pend_priv *)pend);
186
+ smc_wr_tx_link_put(link);
187
+ if (again)
188
+ return -ENOLINK;
189
+ again = true;
190
+ goto again;
191
+ }
192
+ rc = smc_cdc_msg_send(conn, wr_buf, pend);
193
+ spin_unlock_bh(&conn->send_lock);
194
+put_out:
195
+ smc_wr_tx_link_put(link);
196
+ return rc;
128197 }
129198
130199 int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
131200 {
132201 int rc;
202
+
203
+ if (!conn->lgr || (conn->lgr->is_smcd && conn->lgr->peer_shutdown))
204
+ return -EPIPE;
133205
134206 if (conn->lgr->is_smcd) {
135207 spin_lock_bh(&conn->send_lock);
....@@ -142,31 +214,9 @@
142214 return rc;
143215 }
144216
145
-static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
146
- unsigned long data)
217
+void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn)
147218 {
148
- struct smc_connection *conn = (struct smc_connection *)data;
149
- struct smc_cdc_tx_pend *cdc_pend =
150
- (struct smc_cdc_tx_pend *)tx_pend;
151
-
152
- return cdc_pend->conn == conn;
153
-}
154
-
155
-static void smc_cdc_tx_dismisser(struct smc_wr_tx_pend_priv *tx_pend)
156
-{
157
- struct smc_cdc_tx_pend *cdc_pend =
158
- (struct smc_cdc_tx_pend *)tx_pend;
159
-
160
- cdc_pend->conn = NULL;
161
-}
162
-
163
-void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
164
-{
165
- struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
166
-
167
- smc_wr_tx_dismiss_slots(link, SMC_CDC_MSG_TYPE,
168
- smc_cdc_tx_filter, smc_cdc_tx_dismisser,
169
- (unsigned long)conn);
219
+ wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr));
170220 }
171221
172222 /* Send a SMC-D CDC header.
....@@ -176,23 +226,25 @@
176226 int smcd_cdc_msg_send(struct smc_connection *conn)
177227 {
178228 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
229
+ union smc_host_cursor curs;
179230 struct smcd_cdc_msg cdc;
180231 int rc, diff;
181232
182233 memset(&cdc, 0, sizeof(cdc));
183234 cdc.common.type = SMC_CDC_MSG_TYPE;
184
- cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
185
- cdc.prod_count = conn->local_tx_ctrl.prod.count;
186
-
187
- cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
188
- cdc.cons_count = conn->local_tx_ctrl.cons.count;
189
- cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
190
- cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
235
+ curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs);
236
+ cdc.prod.wrap = curs.wrap;
237
+ cdc.prod.count = curs.count;
238
+ curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs);
239
+ cdc.cons.wrap = curs.wrap;
240
+ cdc.cons.count = curs.count;
241
+ cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags;
242
+ cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
191243 rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
192244 if (rc)
193245 return rc;
194
- smc_curs_copy(&conn->rx_curs_confirmed, &conn->local_tx_ctrl.cons,
195
- conn);
246
+ smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn);
247
+ conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
196248 /* Calculate transmitted data and increment free send buffer space */
197249 diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
198250 &conn->tx_curs_sent);
....@@ -234,6 +286,28 @@
234286 sk_send_sigurg(&smc->sk);
235287 }
236288
289
+static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc,
290
+ struct smc_link *link)
291
+{
292
+ struct smc_connection *conn = &smc->conn;
293
+ u16 recv_seq = ntohs(cdc->seqno);
294
+ s16 diff;
295
+
296
+ /* check that seqnum was seen before */
297
+ diff = conn->local_rx_ctrl.seqno - recv_seq;
298
+ if (diff < 0) { /* diff larger than 0x7fff */
299
+ /* drop connection */
300
+ conn->out_of_sync = 1; /* prevent any further receives */
301
+ spin_lock_bh(&conn->send_lock);
302
+ conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
303
+ conn->lnk = link;
304
+ spin_unlock_bh(&conn->send_lock);
305
+ sock_hold(&smc->sk); /* sock_put in abort_work */
306
+ if (!queue_work(smc_close_wq, &conn->abort_work))
307
+ sock_put(&smc->sk);
308
+ }
309
+}
310
+
237311 static void smc_cdc_msg_recv_action(struct smc_sock *smc,
238312 struct smc_cdc_msg *cdc)
239313 {
....@@ -269,26 +343,18 @@
269343 smp_mb__after_atomic();
270344 smc->sk.sk_data_ready(&smc->sk);
271345 } else {
272
- if (conn->local_rx_ctrl.prod_flags.write_blocked ||
273
- conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
274
- conn->local_rx_ctrl.prod_flags.urg_data_pending) {
275
- if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
276
- conn->urg_state = SMC_URG_NOTYET;
277
- /* force immediate tx of current consumer cursor, but
278
- * under send_lock to guarantee arrival in seqno-order
279
- */
280
- if (smc->sk.sk_state != SMC_INIT)
281
- smc_tx_sndbuf_nonempty(conn);
282
- }
346
+ if (conn->local_rx_ctrl.prod_flags.write_blocked)
347
+ smc->sk.sk_data_ready(&smc->sk);
348
+ if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
349
+ conn->urg_state = SMC_URG_NOTYET;
283350 }
284351
285
- /* piggy backed tx info */
286352 /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
287
- if (diff_cons && smc_tx_prepared_sends(conn)) {
353
+ if ((diff_cons && smc_tx_prepared_sends(conn)) ||
354
+ conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
355
+ conn->local_rx_ctrl.prod_flags.urg_data_pending)
288356 smc_tx_sndbuf_nonempty(conn);
289
- /* trigger socket release if connection closed */
290
- smc_close_wake_tx_prepared(smc);
291
- }
357
+
292358 if (diff_cons && conn->urg_tx_pend &&
293359 atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
294360 /* urg data confirmed by peer, indicate we're ready for more */
....@@ -306,7 +372,7 @@
306372 smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
307373 sock_set_flag(&smc->sk, SOCK_DONE);
308374 sock_hold(&smc->sk); /* sock_put in close_work */
309
- if (!schedule_work(&conn->close_work))
375
+ if (!queue_work(smc_close_wq, &conn->close_work))
310376 sock_put(&smc->sk);
311377 }
312378 }
....@@ -330,13 +396,16 @@
330396 static void smcd_cdc_rx_tsklet(unsigned long data)
331397 {
332398 struct smc_connection *conn = (struct smc_connection *)data;
399
+ struct smcd_cdc_msg *data_cdc;
333400 struct smcd_cdc_msg cdc;
334401 struct smc_sock *smc;
335402
336
- if (!conn)
403
+ if (!conn || conn->killed)
337404 return;
338405
339
- memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
406
+ data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr;
407
+ smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn);
408
+ smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn);
340409 smc = container_of(conn, struct smc_sock, conn);
341410 smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
342411 }
....@@ -369,16 +438,19 @@
369438 read_lock_bh(&lgr->conns_lock);
370439 conn = smc_lgr_find_conn(ntohl(cdc->token), lgr);
371440 read_unlock_bh(&lgr->conns_lock);
372
- if (!conn)
441
+ if (!conn || conn->out_of_sync)
373442 return;
374443 smc = container_of(conn, struct smc_sock, conn);
375444
376
- if (!cdc->prod_flags.failover_validation) {
377
- if (smc_cdc_before(ntohs(cdc->seqno),
378
- conn->local_rx_ctrl.seqno))
379
- /* received seqno is old */
380
- return;
445
+ if (cdc->prod_flags.failover_validation) {
446
+ smc_cdc_msg_validate(smc, cdc, link);
447
+ return;
381448 }
449
+ if (smc_cdc_before(ntohs(cdc->seqno),
450
+ conn->local_rx_ctrl.seqno))
451
+ /* received seqno is old */
452
+ return;
453
+
382454 smc_cdc_msg_recv(smc, cdc);
383455 }
384456