forked from ~ljy/RK356X_SDK_RELEASE

hc
2023-12-09 95099d4622f8cb224d94e314c7a8e0df60b13f87
kernel/net/smc/smc_cdc.c
....@@ -21,13 +21,6 @@
2121
2222 /********************************** send *************************************/
2323
24
-struct smc_cdc_tx_pend {
25
- struct smc_connection *conn; /* socket connection */
26
- union smc_host_cursor cursor; /* tx sndbuf cursor sent */
27
- union smc_host_cursor p_cursor; /* rx RMBE cursor produced */
28
- u16 ctrl_seq; /* conn. tx sequence # */
29
-};
30
-
3124 /* handler for send/transmission completion of a CDC msg */
3225 static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
3326 struct smc_link *link,
....@@ -37,10 +30,6 @@
3730 struct smc_connection *conn = cdcpend->conn;
3831 struct smc_sock *smc;
3932 int diff;
40
-
41
- if (!conn)
42
- /* already dismissed */
43
- return;
4433
4534 smc = container_of(conn, struct smc_sock, conn);
4635 bh_lock_sock(&smc->sk);
....@@ -54,23 +43,38 @@
5443 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
5544 smp_mb__after_atomic();
5645 smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn);
46
+ smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor,
47
+ conn);
48
+ conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
5749 }
50
+
51
+ if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) &&
52
+ unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
53
+ wake_up(&conn->cdc_pend_tx_wq);
54
+ WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
55
+
5856 smc_tx_sndbuf_nonfull(smc);
5957 bh_unlock_sock(&smc->sk);
6058 }
6159
6260 int smc_cdc_get_free_slot(struct smc_connection *conn,
61
+ struct smc_link *link,
6362 struct smc_wr_buf **wr_buf,
63
+ struct smc_rdma_wr **wr_rdma_buf,
6464 struct smc_cdc_tx_pend **pend)
6565 {
66
- struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
6766 int rc;
6867
6968 rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
69
+ wr_rdma_buf,
7070 (struct smc_wr_tx_pend_priv **)pend);
71
- if (!conn->alert_token_local)
71
+ if (conn->killed) {
7272 /* abnormal termination */
73
+ if (!rc)
74
+ smc_wr_tx_put_slot(link,
75
+ (struct smc_wr_tx_pend_priv *)(*pend));
7376 rc = -EPIPE;
77
+ }
7478 return rc;
7579 }
7680
....@@ -81,7 +85,7 @@
8185 sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE,
8286 "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
8387 BUILD_BUG_ON_MSG(
84
- sizeof(struct smc_cdc_msg) != SMC_WR_TX_SIZE,
88
+ offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE,
8589 "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
8690 BUILD_BUG_ON_MSG(
8791 sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE,
....@@ -96,20 +100,60 @@
96100 struct smc_wr_buf *wr_buf,
97101 struct smc_cdc_tx_pend *pend)
98102 {
103
+ struct smc_link *link = conn->lnk;
99104 union smc_host_cursor cfed;
100
- struct smc_link *link;
101105 int rc;
102
-
103
- link = &conn->lgr->lnk[SMC_SINGLE_LINK];
104106
105107 smc_cdc_add_pending_send(conn, pend);
106108
107109 conn->tx_cdc_seq++;
108110 conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
109111 smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed);
112
+
113
+ atomic_inc(&conn->cdc_pend_tx_wr);
114
+ smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
115
+
110116 rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
111
- if (!rc)
117
+ if (!rc) {
112118 smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
119
+ conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
120
+ } else {
121
+ conn->tx_cdc_seq--;
122
+ conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
123
+ atomic_dec(&conn->cdc_pend_tx_wr);
124
+ }
125
+
126
+ return rc;
127
+}
128
+
129
+/* send a validation msg indicating the move of a conn to an other QP link */
130
+int smcr_cdc_msg_send_validation(struct smc_connection *conn,
131
+ struct smc_cdc_tx_pend *pend,
132
+ struct smc_wr_buf *wr_buf)
133
+{
134
+ struct smc_host_cdc_msg *local = &conn->local_tx_ctrl;
135
+ struct smc_link *link = conn->lnk;
136
+ struct smc_cdc_msg *peer;
137
+ int rc;
138
+
139
+ peer = (struct smc_cdc_msg *)wr_buf;
140
+ peer->common.type = local->common.type;
141
+ peer->len = local->len;
142
+ peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */
143
+ peer->token = htonl(local->token);
144
+ peer->prod_flags.failover_validation = 1;
145
+
146
+ /* We need to set pend->conn here to make sure smc_cdc_tx_handler()
147
+ * can handle properly
148
+ */
149
+ smc_cdc_add_pending_send(conn, pend);
150
+
151
+ atomic_inc(&conn->cdc_pend_tx_wr);
152
+ smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
153
+
154
+ rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
155
+ if (unlikely(rc))
156
+ atomic_dec(&conn->cdc_pend_tx_wr);
113157
114158 return rc;
115159 }
....@@ -118,18 +162,43 @@
118162 {
119163 struct smc_cdc_tx_pend *pend;
120164 struct smc_wr_buf *wr_buf;
165
+ struct smc_link *link;
166
+ bool again = false;
121167 int rc;
122168
123
- rc = smc_cdc_get_free_slot(conn, &wr_buf, &pend);
169
+again:
170
+ link = conn->lnk;
171
+ if (!smc_wr_tx_link_hold(link))
172
+ return -ENOLINK;
173
+ rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
124174 if (rc)
125
- return rc;
175
+ goto put_out;
126176
127
- return smc_cdc_msg_send(conn, wr_buf, pend);
177
+ spin_lock_bh(&conn->send_lock);
178
+ if (link != conn->lnk) {
179
+ /* link of connection changed, try again one time*/
180
+ spin_unlock_bh(&conn->send_lock);
181
+ smc_wr_tx_put_slot(link,
182
+ (struct smc_wr_tx_pend_priv *)pend);
183
+ smc_wr_tx_link_put(link);
184
+ if (again)
185
+ return -ENOLINK;
186
+ again = true;
187
+ goto again;
188
+ }
189
+ rc = smc_cdc_msg_send(conn, wr_buf, pend);
190
+ spin_unlock_bh(&conn->send_lock);
191
+put_out:
192
+ smc_wr_tx_link_put(link);
193
+ return rc;
128194 }
129195
130196 int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
131197 {
132198 int rc;
199
+
200
+ if (!conn->lgr || (conn->lgr->is_smcd && conn->lgr->peer_shutdown))
201
+ return -EPIPE;
133202
134203 if (conn->lgr->is_smcd) {
135204 spin_lock_bh(&conn->send_lock);
....@@ -142,31 +211,9 @@
142211 return rc;
143212 }
144213
145
-static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
146
- unsigned long data)
214
+void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn)
147215 {
148
- struct smc_connection *conn = (struct smc_connection *)data;
149
- struct smc_cdc_tx_pend *cdc_pend =
150
- (struct smc_cdc_tx_pend *)tx_pend;
151
-
152
- return cdc_pend->conn == conn;
153
-}
154
-
155
-static void smc_cdc_tx_dismisser(struct smc_wr_tx_pend_priv *tx_pend)
156
-{
157
- struct smc_cdc_tx_pend *cdc_pend =
158
- (struct smc_cdc_tx_pend *)tx_pend;
159
-
160
- cdc_pend->conn = NULL;
161
-}
162
-
163
-void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
164
-{
165
- struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
166
-
167
- smc_wr_tx_dismiss_slots(link, SMC_CDC_MSG_TYPE,
168
- smc_cdc_tx_filter, smc_cdc_tx_dismisser,
169
- (unsigned long)conn);
216
+ wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr));
170217 }
171218
172219 /* Send a SMC-D CDC header.
....@@ -176,23 +223,25 @@
176223 int smcd_cdc_msg_send(struct smc_connection *conn)
177224 {
178225 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
226
+ union smc_host_cursor curs;
179227 struct smcd_cdc_msg cdc;
180228 int rc, diff;
181229
182230 memset(&cdc, 0, sizeof(cdc));
183231 cdc.common.type = SMC_CDC_MSG_TYPE;
184
- cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
185
- cdc.prod_count = conn->local_tx_ctrl.prod.count;
186
-
187
- cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
188
- cdc.cons_count = conn->local_tx_ctrl.cons.count;
189
- cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
190
- cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
232
+ curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs);
233
+ cdc.prod.wrap = curs.wrap;
234
+ cdc.prod.count = curs.count;
235
+ curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs);
236
+ cdc.cons.wrap = curs.wrap;
237
+ cdc.cons.count = curs.count;
238
+ cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags;
239
+ cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
191240 rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
192241 if (rc)
193242 return rc;
194
- smc_curs_copy(&conn->rx_curs_confirmed, &conn->local_tx_ctrl.cons,
195
- conn);
243
+ smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn);
244
+ conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
196245 /* Calculate transmitted data and increment free send buffer space */
197246 diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
198247 &conn->tx_curs_sent);
....@@ -234,6 +283,28 @@
234283 sk_send_sigurg(&smc->sk);
235284 }
236285
286
+static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc,
287
+ struct smc_link *link)
288
+{
289
+ struct smc_connection *conn = &smc->conn;
290
+ u16 recv_seq = ntohs(cdc->seqno);
291
+ s16 diff;
292
+
293
+ /* check that seqnum was seen before */
294
+ diff = conn->local_rx_ctrl.seqno - recv_seq;
295
+ if (diff < 0) { /* diff larger than 0x7fff */
296
+ /* drop connection */
297
+ conn->out_of_sync = 1; /* prevent any further receives */
298
+ spin_lock_bh(&conn->send_lock);
299
+ conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
300
+ conn->lnk = link;
301
+ spin_unlock_bh(&conn->send_lock);
302
+ sock_hold(&smc->sk); /* sock_put in abort_work */
303
+ if (!queue_work(smc_close_wq, &conn->abort_work))
304
+ sock_put(&smc->sk);
305
+ }
306
+}
307
+
237308 static void smc_cdc_msg_recv_action(struct smc_sock *smc,
238309 struct smc_cdc_msg *cdc)
239310 {
....@@ -269,26 +340,18 @@
269340 smp_mb__after_atomic();
270341 smc->sk.sk_data_ready(&smc->sk);
271342 } else {
272
- if (conn->local_rx_ctrl.prod_flags.write_blocked ||
273
- conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
274
- conn->local_rx_ctrl.prod_flags.urg_data_pending) {
275
- if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
276
- conn->urg_state = SMC_URG_NOTYET;
277
- /* force immediate tx of current consumer cursor, but
278
- * under send_lock to guarantee arrival in seqno-order
279
- */
280
- if (smc->sk.sk_state != SMC_INIT)
281
- smc_tx_sndbuf_nonempty(conn);
282
- }
343
+ if (conn->local_rx_ctrl.prod_flags.write_blocked)
344
+ smc->sk.sk_data_ready(&smc->sk);
345
+ if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
346
+ conn->urg_state = SMC_URG_NOTYET;
283347 }
284348
285
- /* piggy backed tx info */
286349 /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
287
- if (diff_cons && smc_tx_prepared_sends(conn)) {
350
+ if ((diff_cons && smc_tx_prepared_sends(conn)) ||
351
+ conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
352
+ conn->local_rx_ctrl.prod_flags.urg_data_pending)
288353 smc_tx_sndbuf_nonempty(conn);
289
- /* trigger socket release if connection closed */
290
- smc_close_wake_tx_prepared(smc);
291
- }
354
+
292355 if (diff_cons && conn->urg_tx_pend &&
293356 atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
294357 /* urg data confirmed by peer, indicate we're ready for more */
....@@ -306,7 +369,7 @@
306369 smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
307370 sock_set_flag(&smc->sk, SOCK_DONE);
308371 sock_hold(&smc->sk); /* sock_put in close_work */
309
- if (!schedule_work(&conn->close_work))
372
+ if (!queue_work(smc_close_wq, &conn->close_work))
310373 sock_put(&smc->sk);
311374 }
312375 }
....@@ -330,13 +393,16 @@
330393 static void smcd_cdc_rx_tsklet(unsigned long data)
331394 {
332395 struct smc_connection *conn = (struct smc_connection *)data;
396
+ struct smcd_cdc_msg *data_cdc;
333397 struct smcd_cdc_msg cdc;
334398 struct smc_sock *smc;
335399
336
- if (!conn)
400
+ if (!conn || conn->killed)
337401 return;
338402
339
- memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
403
+ data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr;
404
+ smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn);
405
+ smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn);
340406 smc = container_of(conn, struct smc_sock, conn);
341407 smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
342408 }
....@@ -369,16 +435,19 @@
369435 read_lock_bh(&lgr->conns_lock);
370436 conn = smc_lgr_find_conn(ntohl(cdc->token), lgr);
371437 read_unlock_bh(&lgr->conns_lock);
372
- if (!conn)
438
+ if (!conn || conn->out_of_sync)
373439 return;
374440 smc = container_of(conn, struct smc_sock, conn);
375441
376
- if (!cdc->prod_flags.failover_validation) {
377
- if (smc_cdc_before(ntohs(cdc->seqno),
378
- conn->local_rx_ctrl.seqno))
379
- /* received seqno is old */
380
- return;
442
+ if (cdc->prod_flags.failover_validation) {
443
+ smc_cdc_msg_validate(smc, cdc, link);
444
+ return;
381445 }
446
+ if (smc_cdc_before(ntohs(cdc->seqno),
447
+ conn->local_rx_ctrl.seqno))
448
+ /* received seqno is old */
449
+ return;
450
+
382451 smc_cdc_msg_recv(smc, cdc);
383452 }
384453