From 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Tue, 22 Oct 2024 10:36:11 +0000
Subject: [PATCH] 修改4g拨号为QMI,需要在系统里后台执行quectel-CM

---
 kernel/net/tipc/socket.c |  866 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 652 insertions(+), 214 deletions(-)

diff --git a/kernel/net/tipc/socket.c b/kernel/net/tipc/socket.c
index 6c18b45..7cf9b40 100644
--- a/kernel/net/tipc/socket.c
+++ b/kernel/net/tipc/socket.c
@@ -46,13 +46,15 @@
 #include "bcast.h"
 #include "netlink.h"
 #include "group.h"
+#include "trace.h"
 
-#define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
+#define NAGLE_START_INIT	4
+#define NAGLE_START_MAX		1024
+#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
 #define CONN_PROBING_INTV	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
-#define TIPC_FWD_MSG		1
 #define TIPC_MAX_PORT		0xffffffff
 #define TIPC_MIN_PORT		1
-#define TIPC_ACK_RATE		4       /* ACK at 1/4 of of rcv window size */
+#define TIPC_ACK_RATE		4       /* ACK at 1/4 of rcv window size */
 
 enum {
 	TIPC_LISTEN = TCP_LISTEN,
@@ -74,13 +76,13 @@
  * @conn_instance: TIPC instance used when connection was established
  * @published: non-zero if port has one or more associated names
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
+ * @maxnagle: maximum size of msg which can be subject to nagle
  * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
  * #cong_links: list of congested links
  * @publications: list of publications for port
  * @blocking_link: address of the congested link we are currently sleeping on
  * @pub_count: total # of publications port has made during its lifetime
- * @probing_state:
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  * @cong_link_cnt: number of congested links
@@ -97,13 +99,14 @@
 	u32 conn_instance;
 	int published;
 	u32 max_pkt;
+	u32 maxnagle;
 	u32 portid;
 	struct tipc_msg phdr;
 	struct list_head cong_links;
 	struct list_head publications;
 	u32 pub_count;
-	uint conn_timeout;
 	atomic_t dupl_rcvcnt;
+	u16 conn_timeout;
 	bool probe_unacked;
 	u16 cong_link_cnt;
 	u16 snt_unacked;
@@ -116,6 +119,13 @@
 	struct tipc_mc_method mc_method;
 	struct rcu_head rcu;
 	struct tipc_group *group;
+	u32 oneway;
+	u32 nagle_start;
+	u16 snd_backlog;
+	u16 msg_acc;
+	u16 pkt_cnt;
+	bool expect_ack;
+	bool nodelay;
 	bool group_is_open;
 };
 
@@ -137,6 +147,8 @@
 static void tipc_sk_remove(struct tipc_sock *tsk);
 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
+static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
+static int tipc_wait_for_connect(struct socket *sock, long *timeo_p);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -184,17 +196,17 @@
 	return msg_importance(&tsk->phdr);
 }
 
-static int tsk_set_importance(struct tipc_sock *tsk, int imp)
-{
-	if (imp > TIPC_CRITICAL_IMPORTANCE)
-		return -EINVAL;
-	msg_set_importance(&tsk->phdr, (u32)imp);
-	return 0;
-}
-
 static struct tipc_sock *tipc_sk(const struct sock *sk)
 {
 	return container_of(sk, struct tipc_sock, sk);
+}
+
+int tsk_set_importance(struct sock *sk, int imp)
+{
+	if (imp > TIPC_CRITICAL_IMPORTANCE)
+		return -EINVAL;
+	msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
+	return 0;
 }
 
 static bool tsk_conn_cong(struct tipc_sock *tsk)
@@ -227,6 +239,26 @@
 	return 1;
 }
 
+/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
+ */
+static void tsk_set_nagle(struct tipc_sock *tsk)
+{
+	struct sock *sk = &tsk->sk;
+
+	tsk->maxnagle = 0;
+	if (sk->sk_type != SOCK_STREAM)
+		return;
+	if (tsk->nodelay)
+		return;
+	if (!(tsk->peer_caps & TIPC_NAGLE))
+		return;
+	/* Limit node local buffer size to avoid receive queue overflow */
+	if (tsk->max_pkt == MAX_MSG_SIZE)
+		tsk->maxnagle = 1500;
+	else
+		tsk->maxnagle = tsk->max_pkt;
+}
+
 /**
  * tsk_advance_rx_queue - discard first buffer in socket receive queue
  *
@@ -234,6 +266,7 @@
  */
 static void tsk_advance_rx_queue(struct sock *sk)
 {
+	trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
@@ -248,6 +281,7 @@
 	if (!tipc_msg_reverse(onode, &skb, err))
 		return;
 
+	trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
 	dnode = msg_destnode(buf_msg(skb));
 	selector = msg_origport(buf_msg(skb));
 	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
@@ -258,17 +292,17 @@
  *
  * Caller must hold socket lock
  */
-static void tsk_rej_rx_queue(struct sock *sk)
+static void tsk_rej_rx_queue(struct sock *sk, int error)
 {
 	struct sk_buff *skb;
 
 	while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
-		tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
+		tipc_sk_respond(sk, skb, error);
 }
 
-static bool tipc_sk_connected(struct sock *sk)
+static bool tipc_sk_connected(const struct sock *sk)
 {
-	return sk->sk_state == TIPC_ESTABLISHED;
+	return READ_ONCE(sk->sk_state) == TIPC_ESTABLISHED;
 }
 
 /* tipc_sk_type_connectionless - check if the socket is datagram socket
@@ -388,7 +422,7 @@
 		rc_ = tipc_sk_sock_err((sock_), timeo_);		       \
 		if (rc_)						       \
 			break;						       \
-		prepare_to_wait(sk_sleep(sk_), &wait_, TASK_INTERRUPTIBLE);    \
+		add_wait_queue(sk_sleep(sk_), &wait_);                         \
 		release_sock(sk_);					       \
 		*(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
 		sched_annotate_sleep();				               \
@@ -444,6 +478,8 @@
 
 	tsk = tipc_sk(sk);
 	tsk->max_pkt = MAX_PKT_DEFAULT;
+	tsk->maxnagle = 0;
+	tsk->nagle_start = NAGLE_START_INIT;
 	INIT_LIST_HEAD(&tsk->publications);
 	INIT_LIST_HEAD(&tsk->cong_links);
 	msg = &tsk->phdr;
@@ -453,6 +489,7 @@
 	sock_init_data(sock, sk);
 	tipc_set_sk_state(sk, TIPC_OPEN);
 	if (tipc_sk_insert(tsk)) {
+		sk_free(sk);
 		pr_warn("Socket create failed; port number exhausted\n");
 		return -EINVAL;
 	}
@@ -467,7 +504,7 @@
 	timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
 	sk->sk_shutdown = 0;
 	sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
-	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
+	sk->sk_rcvbuf = READ_ONCE(sysctl_tipc_rmem[1]);
 	sk->sk_data_ready = tipc_data_ready;
 	sk->sk_write_space = tipc_write_space;
 	sk->sk_destruct = tipc_sock_destruct;
@@ -484,7 +521,8 @@
 		if (sock->type == SOCK_DGRAM)
 			tsk_set_unreliable(tsk, true);
 	}
-
+	__skb_queue_head_init(&tsk->mc_method.deferredq);
+	trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
 	return 0;
 }
 
@@ -509,34 +547,50 @@
 	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
 					    !tsk_conn_cong(tsk)));
 
-	/* Reject all unreceived messages, except on an active connection
-	 * (which disconnects locally & sends a 'FIN+' to peer).
-	 */
-	while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
-		if (TIPC_SKB_CB(skb)->bytes_read) {
-			kfree_skb(skb);
-			continue;
-		}
-		if (!tipc_sk_type_connectionless(sk) &&
-		    sk->sk_state != TIPC_DISCONNECTING) {
-			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
-			tipc_node_remove_conn(net, dnode, tsk->portid);
-		}
-		tipc_sk_respond(sk, skb, error);
+	/* Push out delayed messages if in Nagle mode */
+	tipc_sk_push_backlog(tsk, false);
+	/* Remove pending SYN */
+	__skb_queue_purge(&sk->sk_write_queue);
+
+	/* Remove partially received buffer if any */
+	skb = skb_peek(&sk->sk_receive_queue);
+	if (skb && TIPC_SKB_CB(skb)->bytes_read) {
+		__skb_unlink(skb, &sk->sk_receive_queue);
+		kfree_skb(skb);
 	}
 
-	if (tipc_sk_type_connectionless(sk))
+	/* Reject all unreceived messages if connectionless */
+	if (tipc_sk_type_connectionless(sk)) {
+		tsk_rej_rx_queue(sk, error);
 		return;
+	}
 
-	if (sk->sk_state != TIPC_DISCONNECTING) {
+	switch (sk->sk_state) {
+	case TIPC_CONNECTING:
+	case TIPC_ESTABLISHED:
+		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+		tipc_node_remove_conn(net, dnode, tsk->portid);
+		/* Send a FIN+/- to its peer */
+		skb = __skb_dequeue(&sk->sk_receive_queue);
+		if (skb) {
+			__skb_queue_purge(&sk->sk_receive_queue);
+			tipc_sk_respond(sk, skb, error);
+			break;
+		}
 		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
 				      tsk_own_node(tsk), tsk_peer_port(tsk),
 				      tsk->portid, error);
 		if (skb)
 			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
-		tipc_node_remove_conn(net, dnode, tsk->portid);
-		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+		break;
+	case TIPC_LISTEN:
+		/* Reject all SYN messages */
+		tsk_rej_rx_queue(sk, error);
+		break;
+	default:
+		__skb_queue_purge(&sk->sk_receive_queue);
+		break;
 	}
 }
 
@@ -571,10 +625,12 @@
 	tsk = tipc_sk(sk);
 	lock_sock(sk);
 
+	trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
 	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 	sk->sk_shutdown = SHUTDOWN_MASK;
 	tipc_sk_leave(tsk);
 	tipc_sk_withdraw(tsk, 0, NULL);
+	__skb_queue_purge(&tsk->mc_method.deferredq);
 	sk_stop_timer(sk, &sk->sk_timer);
 	tipc_sk_remove(tsk);
 
@@ -656,7 +712,6 @@
  * tipc_getname - get port ID of socket or peer socket
  * @sock: socket structure
  * @uaddr: area for returned socket address
- * @uaddr_len: area for returned length of socket address
  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
  *
  * Returns 0 on success, errno otherwise
@@ -718,6 +773,7 @@
 	__poll_t revents = 0;
 
 	sock_poll_wait(file, sock, wait);
+	trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
 
 	if (sk->sk_shutdown & RCV_SHUTDOWN)
 		revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
@@ -728,7 +784,7 @@
 	case TIPC_ESTABLISHED:
 		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
 			revents |= EPOLLOUT;
-		/* fall thru' */
+		fallthrough;
 	case TIPC_LISTEN:
 	case TIPC_CONNECTING:
 		if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
@@ -804,9 +860,12 @@
 	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
 	/* Send message if build was successful */
-	if (unlikely(rc == dlen))
+	if (unlikely(rc == dlen)) {
+		trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
+					TIPC_DUMP_SK_SNDQ, " ");
 		rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
 				     &tsk->cong_link_cnt);
+	}
 
 	tipc_nlist_purge(&dsts);
 
@@ -842,7 +901,7 @@
 
 	/* Build message as chain of buffers */
 	__skb_queue_head_init(&pkts);
-	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
 		return rc;
@@ -994,7 +1053,7 @@
 
 /**
  * tipc_send_group_bcast - send message to all members in communication group
- * @sk: socket structure
+ * @sock: socket structure
  * @m: message to send
  * @dlen: total length of message data
  * @timeout: timeout to wait for wakeup
@@ -1199,6 +1258,56 @@
 	tipc_sk_rcv(net, inputq);
 }
 
+/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
+ *                         when socket is in Nagle mode
+ */
+static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
+{
+	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
+	struct sk_buff *skb = skb_peek_tail(txq);
+	struct net *net = sock_net(&tsk->sk);
+	u32 dnode = tsk_peer_node(tsk);
+	int rc;
+
+	if (nagle_ack) {
+		tsk->pkt_cnt += skb_queue_len(txq);
+		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
+			tsk->oneway = 0;
+			if (tsk->nagle_start < NAGLE_START_MAX)
+				tsk->nagle_start *= 2;
+			tsk->expect_ack = false;
+			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
+				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
+				 tsk->nagle_start);
+		} else {
+			tsk->nagle_start = NAGLE_START_INIT;
+			if (skb) {
+				msg_set_ack_required(buf_msg(skb));
+				tsk->expect_ack = true;
+			} else {
+				tsk->expect_ack = false;
+			}
+		}
+		tsk->msg_acc = 0;
+		tsk->pkt_cnt = 0;
+	}
+
+	if (!skb || tsk->cong_link_cnt)
+		return;
+
+	/* Do not send SYN again after congestion */
+	if (msg_is_syn(buf_msg(skb)))
+		return;
+
+	if (tsk->msg_acc)
+		tsk->pkt_cnt += skb_queue_len(txq);
+	tsk->snt_unacked += tsk->snd_backlog;
+	tsk->snd_backlog = 0;
+	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
+	if (rc == -ELINKCONG)
+		tsk->cong_link_cnt = 1;
+}
+
 /**
  * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
@@ -1212,11 +1321,13 @@
 	u32 onode = tsk_own_node(tsk);
 	struct sock *sk = &tsk->sk;
 	int mtyp = msg_type(hdr);
-	bool conn_cong;
+	bool was_cong;
 
 	/* Ignore if connection cannot be validated: */
-	if (!tsk_peer_msg(tsk, hdr))
+	if (!tsk_peer_msg(tsk, hdr)) {
+		trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
 		goto exit;
+	}
 
 	if (unlikely(msg_errcode(hdr))) {
 		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
@@ -1243,11 +1354,12 @@
 			__skb_queue_tail(xmitq, skb);
 		return;
 	} else if (mtyp == CONN_ACK) {
-		conn_cong = tsk_conn_cong(tsk);
+		was_cong = tsk_conn_cong(tsk);
+		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
 		tsk->snt_unacked -= msg_conn_ack(hdr);
 		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
 			tsk->snd_win = msg_adv_win(hdr);
-		if (conn_cong)
+		if (was_cong && !tsk_conn_cong(tsk))
 			sk->sk_write_space(sk);
 	} else if (mtyp != CONN_PROBE_REPLY) {
 		pr_warn("Received unknown CONN_PROTO msg\n");
@@ -1295,8 +1407,8 @@
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct tipc_name_seq *seq;
 	struct sk_buff_head pkts;
-	u32 dport, dnode = 0;
-	u32 type, inst;
+	u32 dport = 0, dnode = 0;
+	u32 type = 0, inst = 0;
 	int mtu, rc;
 
 	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
@@ -1338,6 +1450,7 @@
 			tsk->conn_type = dest->addr.name.name.type;
 			tsk->conn_instance = dest->addr.name.name.instance;
 		}
+		msg_set_syn(hdr, 1);
 	}
 
 	seq = &dest->addr.nameseq;
@@ -1348,23 +1461,11 @@
 		type = dest->addr.name.name.type;
 		inst = dest->addr.name.name.instance;
 		dnode = dest->addr.name.domain;
-		msg_set_type(hdr, TIPC_NAMED_MSG);
-		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
-		msg_set_nametype(hdr, type);
-		msg_set_nameinst(hdr, inst);
-		msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
 		dport = tipc_nametbl_translate(net, type, inst, &dnode);
-		msg_set_destnode(hdr, dnode);
-		msg_set_destport(hdr, dport);
 		if (unlikely(!dport && !dnode))
 			return -EHOSTUNREACH;
 	} else if (dest->addrtype == TIPC_ADDR_ID) {
 		dnode = dest->addr.id.node;
-		msg_set_type(hdr, TIPC_DIRECT_MSG);
-		msg_set_lookup_scope(hdr, 0);
-		msg_set_destnode(hdr, dnode);
-		msg_set_destport(hdr, dest->addr.id.ref);
-		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
 	} else {
 		return -EINVAL;
 	}
@@ -1375,12 +1476,33 @@
 	if (unlikely(rc))
 		return rc;
 
+	if (dest->addrtype == TIPC_ADDR_NAME) {
+		msg_set_type(hdr, TIPC_NAMED_MSG);
+		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
+		msg_set_nametype(hdr, type);
+		msg_set_nameinst(hdr, inst);
+		msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
+		msg_set_destnode(hdr, dnode);
+		msg_set_destport(hdr, dport);
+	} else { /* TIPC_ADDR_ID */
+		msg_set_type(hdr, TIPC_DIRECT_MSG);
+		msg_set_lookup_scope(hdr, 0);
+		msg_set_destnode(hdr, dnode);
+		msg_set_destport(hdr, dest->addr.id.ref);
+		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
+	}
+
 	__skb_queue_head_init(&pkts);
-	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, true);
 	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 	if (unlikely(rc != dlen))
 		return rc;
+	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
+		__skb_queue_purge(&pkts);
+		return -ENOMEM;
+	}
 
+	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
 	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 	if (unlikely(rc == -ELINKCONG)) {
 		tipc_dest_push(clinks, dnode, 0);
@@ -1388,8 +1510,13 @@
 		rc = 0;
 	}
 
-	if (unlikely(syn && !rc))
+	if (unlikely(syn && !rc)) {
 		tipc_set_sk_state(sk, TIPC_CONNECTING);
+		if (dlen && timeout) {
+			timeout = msecs_to_jiffies(timeout);
+			tipc_wait_for_connect(sock, &timeout);
+		}
+	}
 
 	return rc ? rc : dlen;
 }
@@ -1422,21 +1549,22 @@
 	struct sock *sk = sock->sk;
 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	struct sk_buff_head *txq = &sk->sk_write_queue;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
-	struct sk_buff_head pkts;
+	struct sk_buff *skb;
 	u32 dnode = tsk_peer_node(tsk);
+	int maxnagle = tsk->maxnagle;
+	int maxpkt = tsk->max_pkt;
 	int send, sent = 0;
-	int rc = 0;
-
-	__skb_queue_head_init(&pkts);
+	int blocks, rc = 0;
 
 	if (unlikely(dlen > INT_MAX))
 		return -EMSGSIZE;
 
 	/* Handle implicit connection setup */
-	if (unlikely(dest)) {
+	if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
 		rc = __tipc_sendmsg(sock, m, dlen);
 		if (dlen && dlen == rc) {
 			tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
@@ -1452,19 +1580,48 @@
 					 tipc_sk_connected(sk)));
 		if (unlikely(rc))
 			break;
-
 		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
-		rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
-		if (unlikely(rc != send))
-			break;
-
-		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+		blocks = tsk->snd_backlog;
+		if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
+		    send <= maxnagle) {
+			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
+			if (unlikely(rc < 0))
+				break;
+			blocks += rc;
+			tsk->msg_acc++;
+			if (blocks <= 64 && tsk->expect_ack) {
+				tsk->snd_backlog = blocks;
+				sent += send;
+				break;
+			} else if (blocks > 64) {
+				tsk->pkt_cnt += skb_queue_len(txq);
+			} else {
+				skb = skb_peek_tail(txq);
+				if (skb) {
+					msg_set_ack_required(buf_msg(skb));
+					tsk->expect_ack = true;
+				} else {
+					tsk->expect_ack = false;
+				}
+				tsk->msg_acc = 0;
+				tsk->pkt_cnt = 0;
+			}
+		} else {
+			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
+			if (unlikely(rc != send))
+				break;
+			blocks += tsk_inc(tsk, send + MIN_H_SIZE);
+		}
+		trace_tipc_sk_sendstream(sk, skb_peek(txq),
+					 TIPC_DUMP_SK_SNDQ, " ");
+		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
 		if (unlikely(rc == -ELINKCONG)) {
 			tsk->cong_link_cnt = 1;
 			rc = 0;
 		}
 		if (likely(!rc)) {
-			tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+			tsk->snt_unacked += blocks;
+			tsk->snd_backlog = 0;
 			sent += send;
 		}
 	} while (sent < dlen && !rc);
@@ -1499,6 +1656,7 @@
 	struct net *net = sock_net(sk);
 	struct tipc_msg *msg = &tsk->phdr;
 
+	msg_set_syn(msg, 0);
 	msg_set_destnode(msg, peer_node);
 	msg_set_destport(msg, peer_port);
 	msg_set_type(msg, TIPC_CONN_MSG);
@@ -1508,8 +1666,10 @@
 	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
 	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
 	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
-	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
 	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+	tsk_set_nagle(tsk);
+	__skb_queue_purge(&sk->sk_write_queue);
 	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
 		return;
 
@@ -1521,7 +1681,7 @@
 /**
  * tipc_sk_set_orig_addr - capture sender's address for received message
  * @m: descriptor for message info
- * @hdr: received message header
+ * @skb: received message
  *
  * Note: Address is not captured if not requested by receiver.
  */
@@ -1630,22 +1790,21 @@
 	return 0;
 }
 
-static void tipc_sk_send_ack(struct tipc_sock *tsk)
+static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
 {
 	struct sock *sk = &tsk->sk;
-	struct net *net = sock_net(sk);
 	struct sk_buff *skb = NULL;
 	struct tipc_msg *msg;
 	u32 peer_port = tsk_peer_port(tsk);
 	u32 dnode = tsk_peer_node(tsk);
 
 	if (!tipc_sk_connected(sk))
-		return;
+		return NULL;
 	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
 			      dnode, tsk_own_node(tsk), peer_port,
 			      tsk->portid, TIPC_OK);
 	if (!skb)
-		return;
+		return NULL;
 	msg = buf_msg(skb);
 	msg_set_conn_ack(msg, tsk->rcv_unacked);
 	tsk->rcv_unacked = 0;
@@ -1655,13 +1814,25 @@
 		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
 		msg_set_adv_win(msg, tsk->rcv_win);
 	}
-	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+	return skb;
+}
+
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
+{
+	struct sk_buff *skb;
+
+	skb = tipc_sk_build_ack(tsk);
+	if (!skb)
+		return;
+
+	tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
+			   msg_link_selector(buf_msg(skb)));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
 {
 	struct sock *sk = sock->sk;
-	DEFINE_WAIT(wait);
+	DEFINE_WAIT_FUNC(wait, woken_wake_function);
 	long timeo = *timeop;
 	int err = sock_error(sk);
 
@@ -1669,15 +1840,17 @@
 		return err;
 
 	for (;;) {
-		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
 			if (sk->sk_shutdown & RCV_SHUTDOWN) {
 				err = -ENOTCONN;
 				break;
 			}
+			add_wait_queue(sk_sleep(sk), &wait);
 			release_sock(sk);
-			timeo = schedule_timeout(timeo);
+			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
+			sched_annotate_sleep();
 			lock_sock(sk);
+			remove_wait_queue(sk_sleep(sk), &wait);
 		}
 		err = 0;
 		if (!skb_queue_empty(&sk->sk_receive_queue))
@@ -1693,7 +1866,6 @@
 		if (err)
 			break;
 	}
-	finish_wait(sk_sleep(sk), &wait);
 	*timeop = timeo;
 	return err;
 }
@@ -1919,7 +2091,7 @@
 
 		/* Send connection flow control advertisement when applicable */
 		tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
-		if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
+		if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
 			tipc_sk_send_ack(tsk);
 
 		/* Exit if all requested data or FIN/error received */
@@ -1951,7 +2123,6 @@
 /**
  * tipc_data_ready - wake up threads to indicate messages have been received
  * @sk: socket
- * @len: the length of messages
  */
 static void tipc_data_ready(struct sock *sk)
 {
@@ -1990,6 +2161,7 @@
 		smp_wmb();
 		tsk->cong_link_cnt--;
 		wakeup = true;
+		tipc_sk_push_backlog(tsk, false);
 		break;
 	case GROUP_PROTOCOL:
 		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2009,91 +2181,106 @@
 }
 
 /**
- * tipc_filter_connect - Handle incoming message for a connection-based socket
+ * tipc_sk_filter_connect - check incoming message for a connection-based socket
  * @tsk: TIPC socket
- * @skb: pointer to message buffer. Set to NULL if buffer is consumed
- *
- * Returns true if everything ok, false otherwise
+ * @skb: pointer to message buffer.
+ * @xmitq: for Nagle ACK if any
+ * Returns true if message should be added to receive queue, false otherwise
  */
-static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
+static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
+				   struct sk_buff_head *xmitq)
 {
 	struct sock *sk = &tsk->sk;
 	struct net *net = sock_net(sk);
 	struct tipc_msg *hdr = buf_msg(skb);
-	u32 pport = msg_origport(hdr);
-	u32 pnode = msg_orignode(hdr);
+	bool con_msg = msg_connected(hdr);
+	u32 pport = tsk_peer_port(tsk);
+	u32 pnode = tsk_peer_node(tsk);
+	u32 oport = msg_origport(hdr);
+	u32 onode = msg_orignode(hdr);
+	int err = msg_errcode(hdr);
+	unsigned long delay;
 
 	if (unlikely(msg_mcast(hdr)))
 		return false;
+	tsk->oneway = 0;
 
 	switch (sk->sk_state) {
 	case TIPC_CONNECTING:
-		/* Accept only ACK or NACK message */
-		if (unlikely(!msg_connected(hdr))) {
-			if (pport != tsk_peer_port(tsk) ||
-			    pnode != tsk_peer_node(tsk))
-				return false;
-
-			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
-			sk->sk_err = ECONNREFUSED;
+		/* Setup ACK */
+		if (likely(con_msg)) {
+			if (err)
+				break;
+			tipc_sk_finish_conn(tsk, oport, onode);
+			msg_set_importance(&tsk->phdr, msg_importance(hdr));
+			/* ACK+ message with data is added to receive queue */
+			if (msg_data_sz(hdr))
+				return true;
+			/* Empty ACK-, - wake up sleeping connect() and drop */
 			sk->sk_state_change(sk);
-			return true;
+			msg_set_dest_droppable(hdr, 1);
+			return false;
 		}
-
-		if (unlikely(msg_errcode(hdr))) {
-			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
-			sk->sk_err = ECONNREFUSED;
-			sk->sk_state_change(sk);
-			return true;
-		}
-
-		if (unlikely(!msg_isdata(hdr))) {
-			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
-			sk->sk_err = EINVAL;
-			sk->sk_state_change(sk);
-			return true;
-		}
-
-		tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
-		msg_set_importance(&tsk->phdr, msg_importance(hdr));
-
-		/* If 'ACK+' message, add to socket receive queue */
-		if (msg_data_sz(hdr))
-			return true;
-
-		/* If empty 'ACK-' message, wake up sleeping connect() */
-		sk->sk_state_change(sk);
-
-		/* 'ACK-' message is neither accepted nor rejected: */
-		msg_set_dest_droppable(hdr, 1);
-		return false;
-
-	case TIPC_OPEN:
-	case TIPC_DISCONNECTING:
-		break;
-	case TIPC_LISTEN:
-		/* Accept only SYN message */
-		if (!msg_connected(hdr) && !(msg_errcode(hdr)))
-			return true;
-		break;
-	case TIPC_ESTABLISHED:
-		/* Accept only connection-based messages sent by peer */
-		if (unlikely(!tsk_peer_msg(tsk, hdr)))
+		/* Ignore connectionless message if not from listening socket */
+		if (oport != pport || onode != pnode)
 			return false;
 
-		if (unlikely(msg_errcode(hdr))) {
-			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
-			/* Let timer expire on it's own */
-			tipc_node_remove_conn(net, tsk_peer_node(tsk),
-					      tsk->portid);
-			sk->sk_state_change(sk);
+		/* Rejected SYN */
+		if (err != TIPC_ERR_OVERLOAD)
+			break;
+
+		/* Prepare for new setup attempt if we have a SYN clone */
+		if (skb_queue_empty(&sk->sk_write_queue))
+			break;
+		get_random_bytes(&delay, 2);
+		delay %= (tsk->conn_timeout / 4);
+		delay = msecs_to_jiffies(delay + 100);
+		sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
+		return false;
+	case TIPC_OPEN:
+	case TIPC_DISCONNECTING:
+		return false;
+	case TIPC_LISTEN:
+		/* Accept only SYN message */
+		if (!msg_is_syn(hdr) &&
+		    tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
+			return false;
+		if (!con_msg && !err)
+			return true;
+		return false;
+	case TIPC_ESTABLISHED:
+		if (!skb_queue_empty(&sk->sk_write_queue))
+			tipc_sk_push_backlog(tsk, false);
+		/* Accept only connection-based messages sent by peer */
+		if (likely(con_msg && !err && pport == oport &&
+			   pnode == onode)) {
+			if (msg_ack_required(hdr)) {
+				struct sk_buff *skb;
+
+				skb = tipc_sk_build_ack(tsk);
+				if (skb) {
+					msg_set_nagle_ack(buf_msg(skb));
+					__skb_queue_tail(xmitq, skb);
+				}
+			}
+			return true;
 		}
+		if (!tsk_peer_msg(tsk, hdr))
+			return false;
+		if (!err)
+			return true;
+		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+		tipc_node_remove_conn(net, pnode, tsk->portid);
+		sk->sk_state_change(sk);
 		return true;
 	default:
 		pr_err("Unknown sk_state %u\n", sk->sk_state);
 	}
-
-	return false;
+	/* Abort connection setup attempt */
+	tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+	sk->sk_err = ECONNREFUSED;
+	sk->sk_state_change(sk);
+	return true;
 }
 
 /**
@@ -2120,13 +2307,13 @@
 	struct tipc_msg *hdr = buf_msg(skb);
 
 	if (unlikely(msg_in_group(hdr)))
-		return sk->sk_rcvbuf;
+		return READ_ONCE(sk->sk_rcvbuf);
 
 	if (unlikely(!msg_connected(hdr)))
-		return sk->sk_rcvbuf << msg_importance(hdr);
+		return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);
 
 	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
-		return sk->sk_rcvbuf;
+		return READ_ONCE(sk->sk_rcvbuf);
 
 	return FLOWCTL_MSG_LIM;
 }
@@ -2151,8 +2338,10 @@
 	struct tipc_msg *hdr = buf_msg(skb);
 	struct net *net = sock_net(sk);
 	struct sk_buff_head inputq;
+	int mtyp = msg_type(hdr);
 	int limit, err = TIPC_OK;
 
+	trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
 	TIPC_SKB_CB(skb)->bytes_read = 0;
 	__skb_queue_head_init(&inputq);
 	__skb_queue_tail(&inputq, skb);
@@ -2163,26 +2352,37 @@
 	if (unlikely(grp))
 		tipc_group_filter_msg(grp, &inputq, xmitq);
 
+	if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
+		tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
+
 	/* Validate and add to receive buffer if there is space */
 	while ((skb = __skb_dequeue(&inputq))) {
 		hdr = buf_msg(skb);
 		limit = rcvbuf_limit(sk, skb);
-		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
+		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
 		    (!sk_conn && msg_connected(hdr)) ||
 		    (!grp && msg_in_group(hdr)))
 			err = TIPC_ERR_NO_PORT;
 		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+			trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
+					   "err_overload2!");
 			atomic_inc(&sk->sk_drops);
 			err = TIPC_ERR_OVERLOAD;
 		}
 
 		if (unlikely(err)) {
-			tipc_skb_reject(net, err, skb, xmitq);
+			if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
+				trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
+						      "@filter_rcv!");
+				__skb_queue_tail(xmitq, skb);
+			}
 			err = TIPC_OK;
 			continue;
 		}
 		__skb_queue_tail(&sk->sk_receive_queue, skb);
 		skb_set_owner_r(skb, sk);
+		trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
+					 "rcvq >90% allocated!");
 		sk->sk_data_ready(sk);
 	}
 }
@@ -2248,14 +2448,21 @@
 		if (!sk->sk_backlog.len)
 			atomic_set(dcnt, 0);
 		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
-		if (likely(!sk_add_backlog(sk, skb, lim)))
+		if (likely(!sk_add_backlog(sk, skb, lim))) {
+			trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
+						 "bklg & rcvq >90% allocated!");
 			continue;
+		}
 
+		trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
 		/* Overload => reject message back to sender */
 		onode = tipc_own_addr(sock_net(sk));
 		atomic_inc(&sk->sk_drops);
-		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
+			trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
+					      "@sk_enqueue!");
 			__skb_queue_tail(xmitq, skb);
+		}
 		break;
 	}
 }
@@ -2304,6 +2511,8 @@
 		/* Prepare for message rejection */
 		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
 			continue;
+
+		trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
 xmit:
 		dnode = msg_destnode(buf_msg(skb));
 		tipc_node_xmit_skb(net, skb, dnode, dport);
@@ -2324,10 +2533,12 @@
 			return -ETIMEDOUT;
 		if (signal_pending(current))
 			return sock_intr_errno(*timeo_p);
+		if (sk->sk_state == TIPC_DISCONNECTING)
+			break;
 
 		add_wait_queue(sk_sleep(sk), &wait);
-		done = sk_wait_event(sk, timeo_p,
-				     sk->sk_state != TIPC_CONNECTING, &wait);
+		done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
+				     &wait);
 		remove_wait_queue(sk_sleep(sk), &wait);
 	} while (!done);
 	return 0;
@@ -2415,7 +2626,7 @@
 		 * case is EINPROGRESS, rather than EALREADY.
 		 */
 		res = -EINPROGRESS;
-		/* fall thru' */
+		fallthrough;
 	case TIPC_CONNECTING:
 		if (!timeout) {
 			if (previous == TIPC_CONNECTING)
@@ -2492,7 +2703,7 @@
 /**
  * tipc_accept - wait for connection request
  * @sock: listening socket
- * @newsock: new socket that is to be connected
+ * @new_sock: new socket that is to be connected
  * @flags: file-related flags associated with socket
  *
  * Returns 0 on success, errno otherwise
@@ -2501,9 +2712,10 @@
 		       bool kern)
 {
 	struct sock *new_sk, *sk = sock->sk;
-	struct sk_buff *buf;
 	struct tipc_sock *new_tsock;
+	struct msghdr m = {NULL,};
 	struct tipc_msg *msg;
+	struct sk_buff *buf;
 	long timeo;
 	int res;
 
@@ -2536,31 +2748,29 @@
 	 * Reject any stray messages received by new socket
 	 * before the socket lock was taken (very, very unlikely)
 	 */
-	tsk_rej_rx_queue(new_sk);
+	tsk_rej_rx_queue(new_sk, TIPC_ERR_NO_PORT);
 
 	/* Connect new socket to it's peer */
 	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
 
-	tsk_set_importance(new_tsock, msg_importance(msg));
+	tsk_set_importance(new_sk, msg_importance(msg));
 	if (msg_named(msg)) {
 		new_tsock->conn_type = msg_nametype(msg);
 		new_tsock->conn_instance = msg_nameinst(msg);
 	}
 
 	/*
-	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
-	 * Respond to 'SYN+' by queuing it on new socket.
+	 * Respond to 'SYN-' by discarding it & returning 'ACK'.
+	 * Respond to 'SYN+' by queuing it on new socket & returning 'ACK'.
 	 */
 	if (!msg_data_sz(msg)) {
-		struct msghdr m = {NULL,};
-
 		tsk_advance_rx_queue(sk);
-		__tipc_sendstream(new_sock, &m, 0);
 	} else {
 		__skb_dequeue(&sk->sk_receive_queue);
 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
 		skb_set_owner_r(buf, new_sk);
 	}
+	__tipc_sendstream(new_sock, &m, 0);
 	release_sock(new_sk);
 exit:
 	release_sock(sk);
@@ -2586,6 +2796,7 @@
 
 	lock_sock(sk);
 
+	trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
 	__tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
 	sk->sk_shutdown = SHUTDOWN_MASK;
 
@@ -2604,43 +2815,80 @@
 	return res;
 }
 
+static void tipc_sk_check_probing_state(struct sock *sk,
+					struct sk_buff_head *list)
+{
+	struct tipc_sock *tsk = tipc_sk(sk);
+	u32 pnode = tsk_peer_node(tsk);
+	u32 pport = tsk_peer_port(tsk);
+	u32 self = tsk_own_node(tsk);
+	u32 oport = tsk->portid;
+	struct sk_buff *skb;
+
+	if (tsk->probe_unacked) {
+		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+		sk->sk_err = ECONNABORTED;
+		tipc_node_remove_conn(sock_net(sk), pnode, pport);
+		sk->sk_state_change(sk);
+		return;
+	}
+	/* Prepare new probe */
+	skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
+			      pnode, self, pport, oport, TIPC_OK);
+	if (skb)
+		__skb_queue_tail(list, skb);
+	tsk->probe_unacked = true;
+	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
+}
+
+static void tipc_sk_retry_connect(struct sock *sk, struct sk_buff_head *list)
+{
+	struct tipc_sock *tsk = tipc_sk(sk);
+
+	/* Try again later if dest link is congested */
+	if (tsk->cong_link_cnt) {
+		sk_reset_timer(sk, &sk->sk_timer,
+			       jiffies + msecs_to_jiffies(100));
+		return;
+	}
+	/* Prepare SYN for retransmit */
+	tipc_msg_skb_clone(&sk->sk_write_queue, list);
+}
+
 static void tipc_sk_timeout(struct timer_list *t)
 {
 	struct sock *sk = from_timer(sk, t, sk_timer);
 	struct tipc_sock *tsk = tipc_sk(sk);
-	u32 peer_port = tsk_peer_port(tsk);
-	u32 peer_node = tsk_peer_node(tsk);
-	u32 own_node = tsk_own_node(tsk);
-	u32 own_port = tsk->portid;
-	struct net *net = sock_net(sk);
-	struct sk_buff *skb = NULL;
+	u32 pnode = tsk_peer_node(tsk);
+	struct sk_buff_head list;
+	int rc = 0;
 
+	__skb_queue_head_init(&list);
 	bh_lock_sock(sk);
-	if (!tipc_sk_connected(sk))
-		goto exit;
 
 	/* Try again later if socket is busy */
 	if (sock_owned_by_user(sk)) {
 		sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
-		goto exit;
+		bh_unlock_sock(sk);
+		sock_put(sk);
+		return;
 	}
 
-	if (tsk->probe_unacked) {
-		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
-		tipc_node_remove_conn(net, peer_node, peer_port);
-		sk->sk_state_change(sk);
-		goto exit;
-	}
-	/* Send new probe */
-	skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
-			      peer_node, own_node, peer_port, own_port,
-			      TIPC_OK);
-	tsk->probe_unacked = true;
-	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
-exit:
+	if (sk->sk_state == TIPC_ESTABLISHED)
+		tipc_sk_check_probing_state(sk, &list);
+	else if (sk->sk_state == TIPC_CONNECTING)
+		tipc_sk_retry_connect(sk, &list);
+
 	bh_unlock_sock(sk);
-	if (skb)
-		tipc_node_xmit_skb(net, skb, peer_node, own_port);
+
+	if (!skb_queue_empty(&list))
+		rc = tipc_node_xmit(sock_net(sk), &list, pnode, tsk->portid);
+
+	/* SYN messages may cause link congestion */
+	if (rc == -ELINKCONG) {
+		tipc_dest_push(&tsk->cong_links, pnode, 0);
+		tsk->cong_link_cnt = 1;
+	}
 	sock_put(sk);
 }
 
@@ -2746,7 +2994,7 @@
 	struct tipc_sock *tsk;
 
 	rcu_read_lock();
-	tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
+	tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
 	if (tsk)
 		sock_hold(&tsk->sk);
 	rcu_read_unlock();
@@ -2883,7 +3131,7 @@
  * Returns 0 on success, errno otherwise
  */
 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
-			   char __user *ov, unsigned int ol)
+			   sockptr_t ov, unsigned int ol)
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
@@ -2901,19 +3149,20 @@
 	case TIPC_SRC_DROPPABLE:
 	case TIPC_DEST_DROPPABLE:
 	case TIPC_CONN_TIMEOUT:
+	case TIPC_NODELAY:
 		if (ol < sizeof(value))
 			return -EINVAL;
-		if (get_user(value, (u32 __user *)ov))
+		if (copy_from_sockptr(&value, ov, sizeof(u32)))
 			return -EFAULT;
 		break;
 	case TIPC_GROUP_JOIN:
 		if (ol < sizeof(mreq))
 			return -EINVAL;
-		if (copy_from_user(&mreq, ov, sizeof(mreq)))
+		if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
 			return -EFAULT;
 		break;
 	default:
-		if (ov || ol)
+		if (!sockptr_is_null(ov) || ol)
 			return -EINVAL;
 	}
 
@@ -2921,7 +3170,7 @@
 
 	switch (opt) {
 	case TIPC_IMPORTANCE:
-		res = tsk_set_importance(tsk, value);
+		res = tsk_set_importance(sk, value);
 		break;
 	case TIPC_SRC_DROPPABLE:
 		if (sock->type != SOCK_STREAM)
@@ -2948,6 +3197,10 @@
 		break;
 	case TIPC_GROUP_LEAVE:
 		res = tipc_sk_leave(tsk);
+		break;
+	case TIPC_NODELAY:
+		tsk->nodelay = !!value;
+		tsk_set_nagle(tsk);
 		break;
 	default:
 		res = -EINVAL;
@@ -3010,6 +3263,9 @@
 		break;
 	case TIPC_SOCK_RECVQ_DEPTH:
 		value = skb_queue_len(&sk->sk_receive_queue);
+		break;
+	case TIPC_SOCK_RECVQ_USED:
+		value = sk_rmem_alloc_get(sk);
 		break;
 	case TIPC_GROUP_JOIN:
 		seq.type = 0;
@@ -3211,7 +3467,9 @@
 	peer_node = tsk_peer_node(tsk);
 	peer_port = tsk_peer_port(tsk);
 
-	nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
+	nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON);
+	if (!nest)
+		return -EMSGSIZE;
 
 	if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
 		goto msg_full;
@@ -3268,7 +3526,7 @@
 	if (!hdr)
 		goto msg_cancel;
 
-	attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
+	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
 	if (!attrs)
 		goto genlmsg_cancel;
 
@@ -3373,7 +3631,7 @@
 	if (!(sk_filter_state & (1 << sk->sk_state)))
 		return 0;
 
-	attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
+	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
 	if (!attrs)
 		goto msg_cancel;
 
@@ -3391,7 +3649,7 @@
 			      TIPC_NLA_SOCK_PAD))
 		goto attr_msg_cancel;
 
-	stat = nla_nest_start(skb, TIPC_NLA_SOCK_STAT);
+	stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT);
 	if (!stat)
 		goto attr_msg_cancel;
 
@@ -3448,7 +3706,7 @@
 	if (!hdr)
 		goto msg_cancel;
 
-	attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
+	attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL);
 	if (!attrs)
 		goto genlmsg_cancel;
 
@@ -3525,19 +3783,15 @@
 	struct tipc_sock *tsk;
 
 	if (!tsk_portid) {
-		struct nlattr **attrs;
+		struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
 		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
-
-		err = tipc_nlmsg_parse(cb->nlh, &attrs);
-		if (err)
-			return err;
 
 		if (!attrs[TIPC_NLA_SOCK])
 			return -EINVAL;
 
-		err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
-				       attrs[TIPC_NLA_SOCK],
-				       tipc_nl_sock_policy, NULL);
+		err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX,
+						  attrs[TIPC_NLA_SOCK],
+						  tipc_nl_sock_policy, NULL);
 		if (err)
 			return err;
 
@@ -3567,3 +3821,187 @@
 
 	return skb->len;
 }
+
+/**
+ * tipc_sk_filtering - check if a socket should be traced
+ * @sk: the socket to be examined
+ * @sysctl_tipc_sk_filter[]: the socket tuple for filtering,
+ *  (portid, sock type, name type, name lower, name upper)
+ *
+ * Returns true if the socket meets the socket tuple data
+ * (value 0 = 'any') or when there is no tuple set (all = 0),
+ * otherwise false
+ */
+bool tipc_sk_filtering(struct sock *sk)
+{
+	struct tipc_sock *tsk;
+	struct publication *p;
+	u32 _port, _sktype, _type, _lower, _upper;
+	u32 type = 0, lower = 0, upper = 0;
+
+	if (!sk)
+		return true;
+
+	tsk = tipc_sk(sk);
+
+	_port = sysctl_tipc_sk_filter[0];
+	_sktype = sysctl_tipc_sk_filter[1];
+	_type = sysctl_tipc_sk_filter[2];
+	_lower = sysctl_tipc_sk_filter[3];
+	_upper = sysctl_tipc_sk_filter[4];
+
+	if (!_port && !_sktype && !_type && !_lower && !_upper)
+		return true;
+
+	if (_port)
+		return (_port == tsk->portid);
+
+	if (_sktype && _sktype != sk->sk_type)
+		return false;
+
+	if (tsk->published) {
+		p = list_first_entry_or_null(&tsk->publications,
+					     struct publication, binding_sock);
+		if (p) {
+			type = p->type;
+			lower = p->lower;
+			upper = p->upper;
+		}
+	}
+
+	if (!tipc_sk_type_connectionless(sk)) {
+		type = tsk->conn_type;
+		lower = tsk->conn_instance;
+		upper = tsk->conn_instance;
+	}
+
+	if ((_type && _type != type) || (_lower && _lower != lower) ||
+	    (_upper && _upper != upper))
+		return false;
+
+	return true;
+}
+
+u32 tipc_sock_get_portid(struct sock *sk)
+{
+	return (sk) ? (tipc_sk(sk))->portid : 0;
+}
+
+/**
+ * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
+ *			both the rcv and backlog queues are considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
+{
+	atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+	unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+	unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
+
+	return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
+ *			only the rcv queue is considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
+{
+	unsigned int lim = rcvbuf_limit(sk, skb);
+	unsigned int qsize = sk_rmem_alloc_get(sk);
+
+	return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_dump - dump TIPC socket
+ * @sk: tipc sk to be dumped
+ * @dqueues: bitmask to decide if any socket queue to be dumped?
+ *           - TIPC_DUMP_NONE: don't dump socket queues
+ *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
+ *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
+ *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
+ *           - TIPC_DUMP_ALL: dump all the socket queues above
+ * @buf: returned buffer of dump data in format
+ */
+int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
+{
+	int i = 0;
+	size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
+	struct tipc_sock *tsk;
+	struct publication *p;
+	bool tsk_connected;
+
+	if (!sk) {
+		i += scnprintf(buf, sz, "sk data: (null)\n");
+		return i;
+	}
+
+	tsk = tipc_sk(sk);
+	tsk_connected = !tipc_sk_type_connectionless(sk);
+
+	i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
+	i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
+	i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
+	i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
+	i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
+	if (tsk_connected) {
+		i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
+		i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
+		i += scnprintf(buf + i, sz - i, " %u", tsk->conn_type);
+		i += scnprintf(buf + i, sz - i, " %u", tsk->conn_instance);
+	}
+	i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
+	if (tsk->published) {
+		p = list_first_entry_or_null(&tsk->publications,
+					     struct publication, binding_sock);
+		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->type : 0);
+		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->lower : 0);
+		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->upper : 0);
+	}
+	i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
+	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
+	i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
+	i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
+	i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
+	i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
+	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
+	i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
+	i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
+	i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
+	i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
+	i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
+	i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
+	i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len));
+
+	if (dqueues & TIPC_DUMP_SK_SNDQ) {
+		i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
+		i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
+	}
+
+	if (dqueues & TIPC_DUMP_SK_RCVQ) {
+		i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
+		i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
+	}
+
+	if (dqueues & TIPC_DUMP_SK_BKLGQ) {
+		i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
+		i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
+		if (sk->sk_backlog.tail != sk->sk_backlog.head) {
+			i += scnprintf(buf + i, sz - i, "  tail ");
+			i += tipc_skb_dump(sk->sk_backlog.tail, false,
+					   buf + i);
+		}
+	}
+
+	return i;
+}

--
Gitblit v1.6.2