From 10ebd8556b7990499c896a550e3d416b444211e6 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Fri, 10 May 2024 02:23:07 +0000
Subject: [PATCH] add led

---
 kernel/net/smc/smc_cdc.c |  228 +++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 150 insertions(+), 78 deletions(-)

diff --git a/kernel/net/smc/smc_cdc.c b/kernel/net/smc/smc_cdc.c
index 333e435..9125d28 100644
--- a/kernel/net/smc/smc_cdc.c
+++ b/kernel/net/smc/smc_cdc.c
@@ -21,13 +21,6 @@
 
 /********************************** send *************************************/
 
-struct smc_cdc_tx_pend {
-	struct smc_connection	*conn;		/* socket connection */
-	union smc_host_cursor	cursor;	/* tx sndbuf cursor sent */
-	union smc_host_cursor	p_cursor;	/* rx RMBE cursor produced */
-	u16			ctrl_seq;	/* conn. tx sequence # */
-};
-
 /* handler for send/transmission completion of a CDC msg */
 static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
 			       struct smc_link *link,
@@ -37,10 +30,6 @@
 	struct smc_connection *conn = cdcpend->conn;
 	struct smc_sock *smc;
 	int diff;
-
-	if (!conn)
-		/* already dismissed */
-		return;
 
 	smc = container_of(conn, struct smc_sock, conn);
 	bh_lock_sock(&smc->sk);
@@ -54,23 +43,38 @@
 		/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
 		smp_mb__after_atomic();
 		smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn);
+		smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor,
+			      conn);
+		conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
 	}
+
+	if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) &&
+	    unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
+		wake_up(&conn->cdc_pend_tx_wq);
+	WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
+
 	smc_tx_sndbuf_nonfull(smc);
 	bh_unlock_sock(&smc->sk);
 }
 
 int smc_cdc_get_free_slot(struct smc_connection *conn,
+			  struct smc_link *link,
 			  struct smc_wr_buf **wr_buf,
+			  struct smc_rdma_wr **wr_rdma_buf,
 			  struct smc_cdc_tx_pend **pend)
 {
-	struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
 	int rc;
 
 	rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
+				     wr_rdma_buf,
 				     (struct smc_wr_tx_pend_priv **)pend);
-	if (!conn->alert_token_local)
+	if (conn->killed) {
 		/* abnormal termination */
+		if (!rc)
+			smc_wr_tx_put_slot(link,
+					   (struct smc_wr_tx_pend_priv *)(*pend));
 		rc = -EPIPE;
+	}
 	return rc;
 }
 
@@ -81,7 +85,7 @@
 		sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE,
 		"must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
 	BUILD_BUG_ON_MSG(
-		sizeof(struct smc_cdc_msg) != SMC_WR_TX_SIZE,
+		offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE,
 		"must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
 	BUILD_BUG_ON_MSG(
 		sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE,
@@ -96,20 +100,63 @@
 		     struct smc_wr_buf *wr_buf,
 		     struct smc_cdc_tx_pend *pend)
 {
+	struct smc_link *link = conn->lnk;
 	union smc_host_cursor cfed;
-	struct smc_link *link;
 	int rc;
 
-	link = &conn->lgr->lnk[SMC_SINGLE_LINK];
+	if (unlikely(!READ_ONCE(conn->sndbuf_desc)))
+		return -ENOBUFS;
 
 	smc_cdc_add_pending_send(conn, pend);
 
 	conn->tx_cdc_seq++;
 	conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
 	smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed);
+
+	atomic_inc(&conn->cdc_pend_tx_wr);
+	smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
+
 	rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
-	if (!rc)
+	if (!rc) {
 		smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
+		conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
+	} else {
+		conn->tx_cdc_seq--;
+		conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
+		atomic_dec(&conn->cdc_pend_tx_wr);
+	}
+
+	return rc;
+}
+
+/* send a validation msg indicating the move of a conn to an other QP link */
+int smcr_cdc_msg_send_validation(struct smc_connection *conn,
+				 struct smc_cdc_tx_pend *pend,
+				 struct smc_wr_buf *wr_buf)
+{
+	struct smc_host_cdc_msg *local = &conn->local_tx_ctrl;
+	struct smc_link *link = conn->lnk;
+	struct smc_cdc_msg *peer;
+	int rc;
+
+	peer = (struct smc_cdc_msg *)wr_buf;
+	peer->common.type = local->common.type;
+	peer->len = local->len;
+	peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */
+	peer->token = htonl(local->token);
+	peer->prod_flags.failover_validation = 1;
+
+	/* We need to set pend->conn here to make sure smc_cdc_tx_handler()
+	 * can handle properly
+	 */
+	smc_cdc_add_pending_send(conn, pend);
+
+	atomic_inc(&conn->cdc_pend_tx_wr);
+	smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
+
+	rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
+	if (unlikely(rc))
+		atomic_dec(&conn->cdc_pend_tx_wr);
 
 	return rc;
 }
@@ -118,18 +165,43 @@
 {
 	struct smc_cdc_tx_pend *pend;
 	struct smc_wr_buf *wr_buf;
+	struct smc_link *link;
+	bool again = false;
 	int rc;
 
-	rc = smc_cdc_get_free_slot(conn, &wr_buf, &pend);
+again:
+	link = conn->lnk;
+	if (!smc_wr_tx_link_hold(link))
+		return -ENOLINK;
+	rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
 	if (rc)
-		return rc;
+		goto put_out;
 
-	return smc_cdc_msg_send(conn, wr_buf, pend);
+	spin_lock_bh(&conn->send_lock);
+	if (link != conn->lnk) {
+		/* link of connection changed, try again one time*/
+		spin_unlock_bh(&conn->send_lock);
+		smc_wr_tx_put_slot(link,
+				   (struct smc_wr_tx_pend_priv *)pend);
+		smc_wr_tx_link_put(link);
+		if (again)
+			return -ENOLINK;
+		again = true;
+		goto again;
+	}
+	rc = smc_cdc_msg_send(conn, wr_buf, pend);
+	spin_unlock_bh(&conn->send_lock);
+put_out:
+	smc_wr_tx_link_put(link);
+	return rc;
 }
 
 int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 {
 	int rc;
+
+	if (!conn->lgr || (conn->lgr->is_smcd && conn->lgr->peer_shutdown))
+		return -EPIPE;
 
 	if (conn->lgr->is_smcd) {
 		spin_lock_bh(&conn->send_lock);
@@ -142,31 +214,9 @@
 	return rc;
 }
 
-static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
-			      unsigned long data)
+void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn)
 {
-	struct smc_connection *conn = (struct smc_connection *)data;
-	struct smc_cdc_tx_pend *cdc_pend =
-		(struct smc_cdc_tx_pend *)tx_pend;
-
-	return cdc_pend->conn == conn;
-}
-
-static void smc_cdc_tx_dismisser(struct smc_wr_tx_pend_priv *tx_pend)
-{
-	struct smc_cdc_tx_pend *cdc_pend =
-		(struct smc_cdc_tx_pend *)tx_pend;
-
-	cdc_pend->conn = NULL;
-}
-
-void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
-{
-	struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
-
-	smc_wr_tx_dismiss_slots(link, SMC_CDC_MSG_TYPE,
-				smc_cdc_tx_filter, smc_cdc_tx_dismisser,
-				(unsigned long)conn);
+	wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr));
 }
 
 /* Send a SMC-D CDC header.
@@ -176,23 +226,25 @@
 int smcd_cdc_msg_send(struct smc_connection *conn)
 {
 	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
+	union smc_host_cursor curs;
 	struct smcd_cdc_msg cdc;
 	int rc, diff;
 
 	memset(&cdc, 0, sizeof(cdc));
 	cdc.common.type = SMC_CDC_MSG_TYPE;
-	cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
-	cdc.prod_count = conn->local_tx_ctrl.prod.count;
-
-	cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
-	cdc.cons_count = conn->local_tx_ctrl.cons.count;
-	cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
-	cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
+	curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs);
+	cdc.prod.wrap = curs.wrap;
+	cdc.prod.count = curs.count;
+	curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs);
+	cdc.cons.wrap = curs.wrap;
+	cdc.cons.count = curs.count;
+	cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags;
+	cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
 	rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
 	if (rc)
 		return rc;
-	smc_curs_copy(&conn->rx_curs_confirmed, &conn->local_tx_ctrl.cons,
-		      conn);
+	smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn);
+	conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
 	/* Calculate transmitted data and increment free send buffer space */
 	diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
 			     &conn->tx_curs_sent);
@@ -234,6 +286,28 @@
 	sk_send_sigurg(&smc->sk);
 }
 
+static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc,
+				 struct smc_link *link)
+{
+	struct smc_connection *conn = &smc->conn;
+	u16 recv_seq = ntohs(cdc->seqno);
+	s16 diff;
+
+	/* check that seqnum was seen before */
+	diff = conn->local_rx_ctrl.seqno - recv_seq;
+	if (diff < 0) { /* diff larger than 0x7fff */
+		/* drop connection */
+		conn->out_of_sync = 1;	/* prevent any further receives */
+		spin_lock_bh(&conn->send_lock);
+		conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
+		conn->lnk = link;
+		spin_unlock_bh(&conn->send_lock);
+		sock_hold(&smc->sk); /* sock_put in abort_work */
+		if (!queue_work(smc_close_wq, &conn->abort_work))
+			sock_put(&smc->sk);
+	}
+}
+
 static void smc_cdc_msg_recv_action(struct smc_sock *smc,
 				    struct smc_cdc_msg *cdc)
 {
@@ -269,26 +343,18 @@
 		smp_mb__after_atomic();
 		smc->sk.sk_data_ready(&smc->sk);
 	} else {
-		if (conn->local_rx_ctrl.prod_flags.write_blocked ||
-		    conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
-		    conn->local_rx_ctrl.prod_flags.urg_data_pending) {
-			if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
-				conn->urg_state = SMC_URG_NOTYET;
-			/* force immediate tx of current consumer cursor, but
-			 * under send_lock to guarantee arrival in seqno-order
-			 */
-			if (smc->sk.sk_state != SMC_INIT)
-				smc_tx_sndbuf_nonempty(conn);
-		}
+		if (conn->local_rx_ctrl.prod_flags.write_blocked)
+			smc->sk.sk_data_ready(&smc->sk);
+		if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
+			conn->urg_state = SMC_URG_NOTYET;
 	}
 
-	/* piggy backed tx info */
 	/* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
-	if (diff_cons && smc_tx_prepared_sends(conn)) {
+	if ((diff_cons && smc_tx_prepared_sends(conn)) ||
+	    conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
+	    conn->local_rx_ctrl.prod_flags.urg_data_pending)
 		smc_tx_sndbuf_nonempty(conn);
-		/* trigger socket release if connection closed */
-		smc_close_wake_tx_prepared(smc);
-	}
+
 	if (diff_cons && conn->urg_tx_pend &&
 	    atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
 		/* urg data confirmed by peer, indicate we're ready for more */
@@ -306,7 +372,7 @@
 			smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
 		sock_set_flag(&smc->sk, SOCK_DONE);
 		sock_hold(&smc->sk); /* sock_put in close_work */
-		if (!schedule_work(&conn->close_work))
+		if (!queue_work(smc_close_wq, &conn->close_work))
 			sock_put(&smc->sk);
 	}
 }
@@ -330,13 +396,16 @@
 static void smcd_cdc_rx_tsklet(unsigned long data)
 {
 	struct smc_connection *conn = (struct smc_connection *)data;
+	struct smcd_cdc_msg *data_cdc;
 	struct smcd_cdc_msg cdc;
 	struct smc_sock *smc;
 
-	if (!conn)
+	if (!conn || conn->killed)
 		return;
 
-	memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
+	data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr;
+	smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn);
+	smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn);
 	smc = container_of(conn, struct smc_sock, conn);
 	smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
 }
@@ -369,16 +438,19 @@
 	read_lock_bh(&lgr->conns_lock);
 	conn = smc_lgr_find_conn(ntohl(cdc->token), lgr);
 	read_unlock_bh(&lgr->conns_lock);
-	if (!conn)
+	if (!conn || conn->out_of_sync)
 		return;
 	smc = container_of(conn, struct smc_sock, conn);
 
-	if (!cdc->prod_flags.failover_validation) {
-		if (smc_cdc_before(ntohs(cdc->seqno),
-				   conn->local_rx_ctrl.seqno))
-			/* received seqno is old */
-			return;
+	if (cdc->prod_flags.failover_validation) {
+		smc_cdc_msg_validate(smc, cdc, link);
+		return;
 	}
+	if (smc_cdc_before(ntohs(cdc->seqno),
+			   conn->local_rx_ctrl.seqno))
+		/* received seqno is old */
+		return;
+
 	smc_cdc_msg_recv(smc, cdc);
 }
 

--
Gitblit v1.6.2