hc
2024-02-20 102a0743326a03cd1a1202ceda21e175b7d3575c
kernel/fs/dlm/lowcomms.c
....@@ -1,12 +1,10 @@
1
+// SPDX-License-Identifier: GPL-2.0-only
12 /******************************************************************************
23 *******************************************************************************
34 **
45 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
56 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
67 **
7
-** This copyrighted material is made available to anyone wishing to use,
8
-** modify, copy, or redistribute it subject to the terms and conditions
9
-** of the GNU General Public License v.2.
108 **
119 *******************************************************************************
1210 ******************************************************************************/
....@@ -65,40 +63,7 @@
6563
6664 /* Number of messages to send before rescheduling */
6765 #define MAX_SEND_MSG_COUNT 25
68
-
69
-struct cbuf {
70
- unsigned int base;
71
- unsigned int len;
72
- unsigned int mask;
73
-};
74
-
75
-static void cbuf_add(struct cbuf *cb, int n)
76
-{
77
- cb->len += n;
78
-}
79
-
80
-static int cbuf_data(struct cbuf *cb)
81
-{
82
- return ((cb->base + cb->len) & cb->mask);
83
-}
84
-
85
-static void cbuf_init(struct cbuf *cb, int size)
86
-{
87
- cb->base = cb->len = 0;
88
- cb->mask = size-1;
89
-}
90
-
91
-static void cbuf_eat(struct cbuf *cb, int n)
92
-{
93
- cb->len -= n;
94
- cb->base += n;
95
- cb->base &= cb->mask;
96
-}
97
-
98
-static bool cbuf_empty(struct cbuf *cb)
99
-{
100
- return cb->len == 0;
101
-}
66
+#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
10267
10368 struct connection {
10469 struct socket *sock; /* NULL if not connected */
....@@ -112,18 +77,23 @@
11277 #define CF_CLOSE 6
11378 #define CF_APP_LIMITED 7
11479 #define CF_CLOSING 8
80
+#define CF_SHUTDOWN 9
11581 struct list_head writequeue; /* List of outgoing writequeue_entries */
11682 spinlock_t writequeue_lock;
11783 int (*rx_action) (struct connection *); /* What to do when active */
11884 void (*connect_action) (struct connection *); /* What to do to connect */
119
- struct page *rx_page;
120
- struct cbuf cb;
85
+ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
12186 int retries;
12287 #define MAX_CONNECT_RETRIES 3
12388 struct hlist_node list;
12489 struct connection *othercon;
12590 struct work_struct rwork; /* Receive workqueue */
12691 struct work_struct swork; /* Send workqueue */
92
+ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
93
+ unsigned char *rx_buf;
94
+ int rx_buflen;
95
+ int rx_leftover;
96
+ struct rcu_head rcu;
12797 };
12898 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
12999
....@@ -165,8 +135,8 @@
165135 static struct workqueue_struct *send_workqueue;
166136
167137 static struct hlist_head connection_hash[CONN_HASH_SIZE];
168
-static DEFINE_MUTEX(connections_lock);
169
-static struct kmem_cache *con_cache;
138
+static DEFINE_SPINLOCK(connections_lock);
139
+DEFINE_STATIC_SRCU(connections_srcu);
170140
171141 static void process_recv_sockets(struct work_struct *work);
172142 static void process_send_sockets(struct work_struct *work);
....@@ -182,15 +152,20 @@
182152
183153 static struct connection *__find_con(int nodeid)
184154 {
185
- int r;
155
+ int r, idx;
186156 struct connection *con;
187157
188158 r = nodeid_hash(nodeid);
189159
190
- hlist_for_each_entry(con, &connection_hash[r], list) {
191
- if (con->nodeid == nodeid)
160
+ idx = srcu_read_lock(&connections_srcu);
161
+ hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
162
+ if (con->nodeid == nodeid) {
163
+ srcu_read_unlock(&connections_srcu, idx);
192164 return con;
165
+ }
193166 }
167
+ srcu_read_unlock(&connections_srcu, idx);
168
+
194169 return NULL;
195170 }
196171
....@@ -198,21 +173,25 @@
198173 * If 'allocation' is zero then we don't attempt to create a new
199174 * connection structure for this node.
200175 */
201
-static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
176
+static struct connection *nodeid2con(int nodeid, gfp_t alloc)
202177 {
203
- struct connection *con = NULL;
178
+ struct connection *con, *tmp;
204179 int r;
205180
206181 con = __find_con(nodeid);
207182 if (con || !alloc)
208183 return con;
209184
210
- con = kmem_cache_zalloc(con_cache, alloc);
185
+ con = kzalloc(sizeof(*con), alloc);
211186 if (!con)
212187 return NULL;
213188
214
- r = nodeid_hash(nodeid);
215
- hlist_add_head(&con->list, &connection_hash[r]);
189
+ con->rx_buflen = dlm_config.ci_buffer_size;
190
+ con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
191
+ if (!con->rx_buf) {
192
+ kfree(con);
193
+ return NULL;
194
+ }
216195
217196 con->nodeid = nodeid;
218197 mutex_init(&con->sock_mutex);
....@@ -220,6 +199,7 @@
220199 spin_lock_init(&con->writequeue_lock);
221200 INIT_WORK(&con->swork, process_send_sockets);
222201 INIT_WORK(&con->rwork, process_recv_sockets);
202
+ init_waitqueue_head(&con->shutdown_wait);
223203
224204 /* Setup action pointers for child sockets */
225205 if (con->nodeid) {
....@@ -230,31 +210,41 @@
230210 con->rx_action = zerocon->rx_action;
231211 }
232212
213
+ r = nodeid_hash(nodeid);
214
+
215
+ spin_lock(&connections_lock);
216
+ /* Because multiple workqueues/threads calls this function it can
217
+ * race on multiple cpu's. Instead of locking hot path __find_con()
218
+ * we just check in rare cases of recently added nodes again
219
+ * under protection of connections_lock. If this is the case we
220
+ * abort our connection creation and return the existing connection.
221
+ */
222
+ tmp = __find_con(nodeid);
223
+ if (tmp) {
224
+ spin_unlock(&connections_lock);
225
+ kfree(con->rx_buf);
226
+ kfree(con);
227
+ return tmp;
228
+ }
229
+
230
+ hlist_add_head_rcu(&con->list, &connection_hash[r]);
231
+ spin_unlock(&connections_lock);
232
+
233233 return con;
234234 }
235235
236236 /* Loop round all connections */
237237 static void foreach_conn(void (*conn_func)(struct connection *c))
238238 {
239
- int i;
240
- struct hlist_node *n;
239
+ int i, idx;
241240 struct connection *con;
242241
242
+ idx = srcu_read_lock(&connections_srcu);
243243 for (i = 0; i < CONN_HASH_SIZE; i++) {
244
- hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
244
+ hlist_for_each_entry_rcu(con, &connection_hash[i], list)
245245 conn_func(con);
246246 }
247
-}
248
-
249
-static struct connection *nodeid2con(int nodeid, gfp_t allocation)
250
-{
251
- struct connection *con;
252
-
253
- mutex_lock(&connections_lock);
254
- con = __nodeid2con(nodeid, allocation);
255
- mutex_unlock(&connections_lock);
256
-
257
- return con;
247
+ srcu_read_unlock(&connections_srcu, idx);
258248 }
259249
260250 static struct dlm_node_addr *find_node_addr(int nodeid)
....@@ -481,8 +471,8 @@
481471 static void lowcomms_error_report(struct sock *sk)
482472 {
483473 struct connection *con;
484
- struct sockaddr_storage saddr;
485474 void (*orig_report)(struct sock *) = NULL;
475
+ struct inet_sock *inet;
486476
487477 read_lock_bh(&sk->sk_callback_lock);
488478 con = sock2con(sk);
....@@ -490,34 +480,33 @@
490480 goto out;
491481
492482 orig_report = listen_sock.sk_error_report;
493
- if (con->sock == NULL ||
494
- kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
495
- printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
496
- "sending to node %d, port %d, "
497
- "sk_err=%d/%d\n", dlm_our_nodeid(),
498
- con->nodeid, dlm_config.ci_tcp_port,
499
- sk->sk_err, sk->sk_err_soft);
500
- } else if (saddr.ss_family == AF_INET) {
501
- struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
502483
484
+ inet = inet_sk(sk);
485
+ switch (sk->sk_family) {
486
+ case AF_INET:
503487 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
504
- "sending to node %d at %pI4, port %d, "
488
+ "sending to node %d at %pI4, dport %d, "
505489 "sk_err=%d/%d\n", dlm_our_nodeid(),
506
- con->nodeid, &sin4->sin_addr.s_addr,
507
- dlm_config.ci_tcp_port, sk->sk_err,
490
+ con->nodeid, &inet->inet_daddr,
491
+ ntohs(inet->inet_dport), sk->sk_err,
508492 sk->sk_err_soft);
509
- } else {
510
- struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
511
-
493
+ break;
494
+#if IS_ENABLED(CONFIG_IPV6)
495
+ case AF_INET6:
512496 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
513
- "sending to node %d at %u.%u.%u.%u, "
514
- "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
515
- con->nodeid, sin6->sin6_addr.s6_addr32[0],
516
- sin6->sin6_addr.s6_addr32[1],
517
- sin6->sin6_addr.s6_addr32[2],
518
- sin6->sin6_addr.s6_addr32[3],
519
- dlm_config.ci_tcp_port, sk->sk_err,
497
+ "sending to node %d at %pI6c, "
498
+ "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
499
+ con->nodeid, &sk->sk_v6_daddr,
500
+ ntohs(inet->inet_dport), sk->sk_err,
520501 sk->sk_err_soft);
502
+ break;
503
+#endif
504
+ default:
505
+ printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
506
+ "invalid socket family %d set, "
507
+ "sk_err=%d/%d\n", dlm_our_nodeid(),
508
+ sk->sk_family, sk->sk_err, sk->sk_err_soft);
509
+ goto out;
521510 }
522511 out:
523512 read_unlock_bh(&sk->sk_callback_lock);
....@@ -611,26 +600,85 @@
611600 /* Will only re-enter once. */
612601 close_connection(con->othercon, false, tx, rx);
613602 }
614
- if (con->rx_page) {
615
- __free_page(con->rx_page);
616
- con->rx_page = NULL;
617
- }
618603
604
+ con->rx_leftover = 0;
619605 con->retries = 0;
620606 mutex_unlock(&con->sock_mutex);
621607 clear_bit(CF_CLOSING, &con->flags);
622608 }
623609
610
+static void shutdown_connection(struct connection *con)
611
+{
612
+ int ret;
613
+
614
+ flush_work(&con->swork);
615
+
616
+ mutex_lock(&con->sock_mutex);
617
+ /* nothing to shutdown */
618
+ if (!con->sock) {
619
+ mutex_unlock(&con->sock_mutex);
620
+ return;
621
+ }
622
+
623
+ set_bit(CF_SHUTDOWN, &con->flags);
624
+ ret = kernel_sock_shutdown(con->sock, SHUT_WR);
625
+ mutex_unlock(&con->sock_mutex);
626
+ if (ret) {
627
+ log_print("Connection %p failed to shutdown: %d will force close",
628
+ con, ret);
629
+ goto force_close;
630
+ } else {
631
+ ret = wait_event_timeout(con->shutdown_wait,
632
+ !test_bit(CF_SHUTDOWN, &con->flags),
633
+ DLM_SHUTDOWN_WAIT_TIMEOUT);
634
+ if (ret == 0) {
635
+ log_print("Connection %p shutdown timed out, will force close",
636
+ con);
637
+ goto force_close;
638
+ }
639
+ }
640
+
641
+ return;
642
+
643
+force_close:
644
+ clear_bit(CF_SHUTDOWN, &con->flags);
645
+ close_connection(con, false, true, true);
646
+}
647
+
648
+static void dlm_tcp_shutdown(struct connection *con)
649
+{
650
+ if (con->othercon)
651
+ shutdown_connection(con->othercon);
652
+ shutdown_connection(con);
653
+}
654
+
655
+static int con_realloc_receive_buf(struct connection *con, int newlen)
656
+{
657
+ unsigned char *newbuf;
658
+
659
+ newbuf = kmalloc(newlen, GFP_NOFS);
660
+ if (!newbuf)
661
+ return -ENOMEM;
662
+
663
+ /* copy any leftover from last receive */
664
+ if (con->rx_leftover)
665
+ memmove(newbuf, con->rx_buf, con->rx_leftover);
666
+
667
+ /* swap to new buffer space */
668
+ kfree(con->rx_buf);
669
+ con->rx_buflen = newlen;
670
+ con->rx_buf = newbuf;
671
+
672
+ return 0;
673
+}
674
+
624675 /* Data received from remote end */
625676 static int receive_from_sock(struct connection *con)
626677 {
627
- int ret = 0;
628
- struct msghdr msg = {};
629
- struct kvec iov[2];
630
- unsigned len;
631
- int r;
632678 int call_again_soon = 0;
633
- int nvec;
679
+ struct msghdr msg;
680
+ struct kvec iov;
681
+ int ret, buflen;
634682
635683 mutex_lock(&con->sock_mutex);
636684
....@@ -638,71 +686,55 @@
638686 ret = -EAGAIN;
639687 goto out_close;
640688 }
689
+
641690 if (con->nodeid == 0) {
642691 ret = -EINVAL;
643692 goto out_close;
644693 }
645694
646
- if (con->rx_page == NULL) {
647
- /*
648
- * This doesn't need to be atomic, but I think it should
649
- * improve performance if it is.
650
- */
651
- con->rx_page = alloc_page(GFP_ATOMIC);
652
- if (con->rx_page == NULL)
695
+ /* realloc if we get new buffer size to read out */
696
+ buflen = dlm_config.ci_buffer_size;
697
+ if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
698
+ ret = con_realloc_receive_buf(con, buflen);
699
+ if (ret < 0)
653700 goto out_resched;
654
- cbuf_init(&con->cb, PAGE_SIZE);
655701 }
656702
657
- /*
658
- * iov[0] is the bit of the circular buffer between the current end
659
- * point (cb.base + cb.len) and the end of the buffer.
703
+ /* calculate new buffer parameter regarding last receive and
704
+ * possible leftover bytes
660705 */
661
- iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
662
- iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
663
- iov[1].iov_len = 0;
664
- nvec = 1;
706
+ iov.iov_base = con->rx_buf + con->rx_leftover;
707
+ iov.iov_len = con->rx_buflen - con->rx_leftover;
665708
666
- /*
667
- * iov[1] is the bit of the circular buffer between the start of the
668
- * buffer and the start of the currently used section (cb.base)
669
- */
670
- if (cbuf_data(&con->cb) >= con->cb.base) {
671
- iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
672
- iov[1].iov_len = con->cb.base;
673
- iov[1].iov_base = page_address(con->rx_page);
674
- nvec = 2;
675
- }
676
- len = iov[0].iov_len + iov[1].iov_len;
677
- iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, iov, nvec, len);
678
-
679
- r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
709
+ memset(&msg, 0, sizeof(msg));
710
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
711
+ ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
712
+ msg.msg_flags);
680713 if (ret <= 0)
681714 goto out_close;
682
- else if (ret == len)
715
+ else if (ret == iov.iov_len)
683716 call_again_soon = 1;
684717
685
- cbuf_add(&con->cb, ret);
686
- ret = dlm_process_incoming_buffer(con->nodeid,
687
- page_address(con->rx_page),
688
- con->cb.base, con->cb.len,
689
- PAGE_SIZE);
690
- if (ret == -EBADMSG) {
691
- log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
692
- page_address(con->rx_page), con->cb.base,
693
- con->cb.len, r);
694
- }
718
+ /* new buflen according readed bytes and leftover from last receive */
719
+ buflen = ret + con->rx_leftover;
720
+ ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
695721 if (ret < 0)
696722 goto out_close;
697
- cbuf_eat(&con->cb, ret);
698723
699
- if (cbuf_empty(&con->cb) && !call_again_soon) {
700
- __free_page(con->rx_page);
701
- con->rx_page = NULL;
724
+ /* calculate leftover bytes from process and put it into begin of
725
+ * the receive buffer, so next receive we have the full message
726
+ * at the start address of the receive buffer.
727
+ */
728
+ con->rx_leftover = buflen - ret;
729
+ if (con->rx_leftover) {
730
+ memmove(con->rx_buf, con->rx_buf + ret,
731
+ con->rx_leftover);
732
+ call_again_soon = true;
702733 }
703734
704735 if (call_again_soon)
705736 goto out_resched;
737
+
706738 mutex_unlock(&con->sock_mutex);
707739 return 0;
708740
....@@ -715,18 +747,23 @@
715747 out_close:
716748 mutex_unlock(&con->sock_mutex);
717749 if (ret != -EAGAIN) {
718
- close_connection(con, true, true, false);
719750 /* Reconnect when there is something to send */
751
+ close_connection(con, false, true, false);
752
+ if (ret == 0) {
753
+ log_print("connection %p got EOF from %d",
754
+ con, con->nodeid);
755
+ /* handling for tcp shutdown */
756
+ clear_bit(CF_SHUTDOWN, &con->flags);
757
+ wake_up(&con->shutdown_wait);
758
+ /* signal to breaking receive worker */
759
+ ret = -1;
760
+ }
720761 }
721
- /* Don't return success if we really got EOF */
722
- if (ret == 0)
723
- ret = -EAGAIN;
724
-
725762 return ret;
726763 }
727764
728765 /* Listening socket is busy, accept a connection */
729
-static int tcp_accept_from_sock(struct connection *con)
766
+static int accept_from_sock(struct connection *con)
730767 {
731768 int result;
732769 struct sockaddr_storage peeraddr;
....@@ -735,13 +772,11 @@
735772 int nodeid;
736773 struct connection *newcon;
737774 struct connection *addcon;
775
+ unsigned int mark;
738776
739
- mutex_lock(&connections_lock);
740777 if (!dlm_allow_conn) {
741
- mutex_unlock(&connections_lock);
742778 return -1;
743779 }
744
- mutex_unlock(&connections_lock);
745780
746781 mutex_lock_nested(&con->sock_mutex, 0);
747782
....@@ -774,6 +809,9 @@
774809 return -1;
775810 }
776811
812
+ dlm_comm_mark(nodeid, &mark);
813
+ sock_set_mark(newsock->sk, mark);
814
+
777815 log_print("got connection from %d", nodeid);
778816
779817 /* Check to see if we already have a connection to this node. This
....@@ -791,13 +829,24 @@
791829 struct connection *othercon = newcon->othercon;
792830
793831 if (!othercon) {
794
- othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
832
+ othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
795833 if (!othercon) {
796834 log_print("failed to allocate incoming socket");
797835 mutex_unlock(&newcon->sock_mutex);
798836 result = -ENOMEM;
799837 goto accept_err;
800838 }
839
+
840
+ othercon->rx_buflen = dlm_config.ci_buffer_size;
841
+ othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
842
+ if (!othercon->rx_buf) {
843
+ mutex_unlock(&newcon->sock_mutex);
844
+ kfree(othercon);
845
+ log_print("failed to allocate incoming socket receive buffer");
846
+ result = -ENOMEM;
847
+ goto accept_err;
848
+ }
849
+
801850 othercon->nodeid = nodeid;
802851 othercon->rx_action = receive_from_sock;
803852 mutex_init(&othercon->sock_mutex);
....@@ -805,22 +854,18 @@
805854 spin_lock_init(&othercon->writequeue_lock);
806855 INIT_WORK(&othercon->swork, process_send_sockets);
807856 INIT_WORK(&othercon->rwork, process_recv_sockets);
857
+ init_waitqueue_head(&othercon->shutdown_wait);
808858 set_bit(CF_IS_OTHERCON, &othercon->flags);
859
+ } else {
860
+ /* close other sock con if we have something new */
861
+ close_connection(othercon, false, true, false);
809862 }
863
+
810864 mutex_lock_nested(&othercon->sock_mutex, 2);
811
- if (!othercon->sock) {
812
- newcon->othercon = othercon;
813
- add_sock(newsock, othercon);
814
- addcon = othercon;
815
- mutex_unlock(&othercon->sock_mutex);
816
- }
817
- else {
818
- printk("Extra connection from node %d attempted\n", nodeid);
819
- result = -EAGAIN;
820
- mutex_unlock(&othercon->sock_mutex);
821
- mutex_unlock(&newcon->sock_mutex);
822
- goto accept_err;
823
- }
865
+ newcon->othercon = othercon;
866
+ add_sock(newsock, othercon);
867
+ addcon = othercon;
868
+ mutex_unlock(&othercon->sock_mutex);
824869 }
825870 else {
826871 newcon->rx_action = receive_from_sock;
....@@ -854,123 +899,6 @@
854899 return result;
855900 }
856901
857
-static int sctp_accept_from_sock(struct connection *con)
858
-{
859
- /* Check that the new node is in the lockspace */
860
- struct sctp_prim prim;
861
- int nodeid;
862
- int prim_len, ret;
863
- int addr_len;
864
- struct connection *newcon;
865
- struct connection *addcon;
866
- struct socket *newsock;
867
-
868
- mutex_lock(&connections_lock);
869
- if (!dlm_allow_conn) {
870
- mutex_unlock(&connections_lock);
871
- return -1;
872
- }
873
- mutex_unlock(&connections_lock);
874
-
875
- mutex_lock_nested(&con->sock_mutex, 0);
876
-
877
- ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
878
- if (ret < 0)
879
- goto accept_err;
880
-
881
- memset(&prim, 0, sizeof(struct sctp_prim));
882
- prim_len = sizeof(struct sctp_prim);
883
-
884
- ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
885
- (char *)&prim, &prim_len);
886
- if (ret < 0) {
887
- log_print("getsockopt/sctp_primary_addr failed: %d", ret);
888
- goto accept_err;
889
- }
890
-
891
- make_sockaddr(&prim.ssp_addr, 0, &addr_len);
892
- ret = addr_to_nodeid(&prim.ssp_addr, &nodeid);
893
- if (ret) {
894
- unsigned char *b = (unsigned char *)&prim.ssp_addr;
895
-
896
- log_print("reject connect from unknown addr");
897
- print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
898
- b, sizeof(struct sockaddr_storage));
899
- goto accept_err;
900
- }
901
-
902
- newcon = nodeid2con(nodeid, GFP_NOFS);
903
- if (!newcon) {
904
- ret = -ENOMEM;
905
- goto accept_err;
906
- }
907
-
908
- mutex_lock_nested(&newcon->sock_mutex, 1);
909
-
910
- if (newcon->sock) {
911
- struct connection *othercon = newcon->othercon;
912
-
913
- if (!othercon) {
914
- othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
915
- if (!othercon) {
916
- log_print("failed to allocate incoming socket");
917
- mutex_unlock(&newcon->sock_mutex);
918
- ret = -ENOMEM;
919
- goto accept_err;
920
- }
921
- othercon->nodeid = nodeid;
922
- othercon->rx_action = receive_from_sock;
923
- mutex_init(&othercon->sock_mutex);
924
- INIT_LIST_HEAD(&othercon->writequeue);
925
- spin_lock_init(&othercon->writequeue_lock);
926
- INIT_WORK(&othercon->swork, process_send_sockets);
927
- INIT_WORK(&othercon->rwork, process_recv_sockets);
928
- set_bit(CF_IS_OTHERCON, &othercon->flags);
929
- }
930
- mutex_lock_nested(&othercon->sock_mutex, 2);
931
- if (!othercon->sock) {
932
- newcon->othercon = othercon;
933
- add_sock(newsock, othercon);
934
- addcon = othercon;
935
- mutex_unlock(&othercon->sock_mutex);
936
- } else {
937
- printk("Extra connection from node %d attempted\n", nodeid);
938
- ret = -EAGAIN;
939
- mutex_unlock(&othercon->sock_mutex);
940
- mutex_unlock(&newcon->sock_mutex);
941
- goto accept_err;
942
- }
943
- } else {
944
- newcon->rx_action = receive_from_sock;
945
- add_sock(newsock, newcon);
946
- addcon = newcon;
947
- }
948
-
949
- log_print("connected to %d", nodeid);
950
-
951
- mutex_unlock(&newcon->sock_mutex);
952
-
953
- /*
954
- * Add it to the active queue in case we got data
955
- * between processing the accept adding the socket
956
- * to the read_sockets list
957
- */
958
- if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
959
- queue_work(recv_workqueue, &addcon->rwork);
960
- mutex_unlock(&con->sock_mutex);
961
-
962
- return 0;
963
-
964
-accept_err:
965
- mutex_unlock(&con->sock_mutex);
966
- if (newsock)
967
- sock_release(newsock);
968
- if (ret != -EAGAIN)
969
- log_print("error accepting connection from node: %d", ret);
970
-
971
- return ret;
972
-}
973
-
974902 static void free_entry(struct writequeue_entry *e)
975903 {
976904 __free_page(e->page);
....@@ -1001,6 +929,7 @@
1001929 static int sctp_bind_addrs(struct connection *con, uint16_t port)
1002930 {
1003931 struct sockaddr_storage localaddr;
932
+ struct sockaddr *addr = (struct sockaddr *)&localaddr;
1004933 int i, addr_len, result = 0;
1005934
1006935 for (i = 0; i < dlm_local_count; i++) {
....@@ -1008,13 +937,9 @@
1008937 make_sockaddr(&localaddr, port, &addr_len);
1009938
1010939 if (!i)
1011
- result = kernel_bind(con->sock,
1012
- (struct sockaddr *)&localaddr,
1013
- addr_len);
940
+ result = kernel_bind(con->sock, addr, addr_len);
1014941 else
1015
- result = kernel_setsockopt(con->sock, SOL_SCTP,
1016
- SCTP_SOCKOPT_BINDX_ADD,
1017
- (char *)&localaddr, addr_len);
942
+ result = sock_bind_add(con->sock->sk, addr, addr_len);
1018943
1019944 if (result < 0) {
1020945 log_print("Can't bind to %d addr number %d, %d.\n",
....@@ -1033,16 +958,17 @@
1033958 static void sctp_connect_to_sock(struct connection *con)
1034959 {
1035960 struct sockaddr_storage daddr;
1036
- int one = 1;
1037961 int result;
1038962 int addr_len;
1039963 struct socket *sock;
1040
- struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
964
+ unsigned int mark;
1041965
1042966 if (con->nodeid == 0) {
1043967 log_print("attempt to connect sock 0 foiled");
1044968 return;
1045969 }
970
+
971
+ dlm_comm_mark(con->nodeid, &mark);
1046972
1047973 mutex_lock(&con->sock_mutex);
1048974
....@@ -1068,6 +994,8 @@
1068994 if (result < 0)
1069995 goto socket_err;
1070996
997
+ sock_set_mark(sock->sk, mark);
998
+
1071999 con->rx_action = receive_from_sock;
10721000 con->connect_action = sctp_connect_to_sock;
10731001 add_sock(sock, con);
....@@ -1081,21 +1009,17 @@
10811009 log_print("connecting to %d", con->nodeid);
10821010
10831011 /* Turn off Nagle's algorithm */
1084
- kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1085
- sizeof(one));
1012
+ sctp_sock_set_nodelay(sock->sk);
10861013
10871014 /*
10881015 * Make sock->ops->connect() function return in specified time,
10891016 * since O_NONBLOCK argument in connect() function does not work here,
10901017 * then, we should restore the default value of this attribute.
10911018 */
1092
- kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
1093
- sizeof(tv));
1019
+ sock_set_sndtimeo(sock->sk, 5);
10941020 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
10951021 0);
1096
- memset(&tv, 0, sizeof(tv));
1097
- kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
1098
- sizeof(tv));
1022
+ sock_set_sndtimeo(sock->sk, 0);
10991023
11001024 if (result == -EINPROGRESS)
11011025 result = 0;
....@@ -1134,13 +1058,15 @@
11341058 struct sockaddr_storage saddr, src_addr;
11351059 int addr_len;
11361060 struct socket *sock = NULL;
1137
- int one = 1;
1061
+ unsigned int mark;
11381062 int result;
11391063
11401064 if (con->nodeid == 0) {
11411065 log_print("attempt to connect sock 0 foiled");
11421066 return;
11431067 }
1068
+
1069
+ dlm_comm_mark(con->nodeid, &mark);
11441070
11451071 mutex_lock(&con->sock_mutex);
11461072 if (con->retries++ > MAX_CONNECT_RETRIES)
....@@ -1156,6 +1082,8 @@
11561082 if (result < 0)
11571083 goto out_err;
11581084
1085
+ sock_set_mark(sock->sk, mark);
1086
+
11591087 memset(&saddr, 0, sizeof(saddr));
11601088 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
11611089 if (result < 0) {
....@@ -1165,6 +1093,7 @@
11651093
11661094 con->rx_action = receive_from_sock;
11671095 con->connect_action = tcp_connect_to_sock;
1096
+ con->shutdown_action = dlm_tcp_shutdown;
11681097 add_sock(sock, con);
11691098
11701099 /* Bind to our cluster-known address connecting to avoid
....@@ -1183,8 +1112,7 @@
11831112 log_print("connecting to %d", con->nodeid);
11841113
11851114 /* Turn off Nagle's algorithm */
1186
- kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1187
- sizeof(one));
1115
+ tcp_sock_set_nodelay(sock->sk);
11881116
11891117 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
11901118 O_NONBLOCK);
....@@ -1226,7 +1154,6 @@
12261154 {
12271155 struct socket *sock = NULL;
12281156 int result = 0;
1229
- int one = 1;
12301157 int addr_len;
12311158
12321159 if (dlm_local_addr[0]->ss_family == AF_INET)
....@@ -1242,20 +1169,17 @@
12421169 goto create_out;
12431170 }
12441171
1172
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
1173
+
12451174 /* Turn off Nagle's algorithm */
1246
- kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1247
- sizeof(one));
1175
+ tcp_sock_set_nodelay(sock->sk);
12481176
1249
- result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1250
- (char *)&one, sizeof(one));
1177
+ sock_set_reuseaddr(sock->sk);
12511178
1252
- if (result < 0) {
1253
- log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1254
- }
12551179 write_lock_bh(&sock->sk->sk_callback_lock);
12561180 sock->sk->sk_user_data = con;
12571181 save_listen_callbacks(sock);
1258
- con->rx_action = tcp_accept_from_sock;
1182
+ con->rx_action = accept_from_sock;
12591183 con->connect_action = tcp_connect_to_sock;
12601184 write_unlock_bh(&sock->sk->sk_callback_lock);
12611185
....@@ -1269,11 +1193,7 @@
12691193 con->sock = NULL;
12701194 goto create_out;
12711195 }
1272
- result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1273
- (char *)&one, sizeof(one));
1274
- if (result < 0) {
1275
- log_print("Set keepalive failed: %d", result);
1276
- }
1196
+ sock_set_keepalive(sock->sk);
12771197
12781198 result = sock->ops->listen(sock, 5);
12791199 if (result < 0) {
....@@ -1305,14 +1225,20 @@
13051225 }
13061226 }
13071227
1228
+static void deinit_local(void)
1229
+{
1230
+ int i;
1231
+
1232
+ for (i = 0; i < dlm_local_count; i++)
1233
+ kfree(dlm_local_addr[i]);
1234
+}
1235
+
13081236 /* Initialise SCTP socket and bind to all interfaces */
13091237 static int sctp_listen_for_all(void)
13101238 {
13111239 struct socket *sock = NULL;
13121240 int result = -EINVAL;
13131241 struct connection *con = nodeid2con(0, GFP_NOFS);
1314
- int bufsize = NEEDED_RMEM;
1315
- int one = 1;
13161242
13171243 if (!con)
13181244 return -ENOMEM;
....@@ -1326,15 +1252,9 @@
13261252 goto out;
13271253 }
13281254
1329
- result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1330
- (char *)&bufsize, sizeof(bufsize));
1331
- if (result)
1332
- log_print("Error increasing buffer space on socket %d", result);
1333
-
1334
- result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1335
- sizeof(one));
1336
- if (result < 0)
1337
- log_print("Could not set SCTP NODELAY error %d\n", result);
1255
+ sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1256
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
1257
+ sctp_sock_set_nodelay(sock->sk);
13381258
13391259 write_lock_bh(&sock->sk->sk_callback_lock);
13401260 /* Init con struct */
....@@ -1342,7 +1262,7 @@
13421262 save_listen_callbacks(sock);
13431263 con->sock = sock;
13441264 con->sock->sk->sk_data_ready = lowcomms_data_ready;
1345
- con->rx_action = sctp_accept_from_sock;
1265
+ con->rx_action = accept_from_sock;
13461266 con->connect_action = sctp_connect_to_sock;
13471267
13481268 write_unlock_bh(&sock->sk->sk_callback_lock);
....@@ -1545,7 +1465,7 @@
15451465
15461466 send_error:
15471467 mutex_unlock(&con->sock_mutex);
1548
- close_connection(con, true, false, true);
1468
+ close_connection(con, false, false, true);
15491469 /* Requeue the send work. When the work daemon runs again, it will try
15501470 a new connection, then call this function again. */
15511471 queue_work(send_workqueue, &con->swork);
....@@ -1621,13 +1541,6 @@
16211541 send_to_sock(con);
16221542 }
16231543
1624
-
1625
-/* Discard all entries on the write queues */
1626
-static void clean_writequeues(void)
1627
-{
1628
- foreach_conn(clean_one_writequeue);
1629
-}
1630
-
16311544 static void work_stop(void)
16321545 {
16331546 if (recv_workqueue)
....@@ -1677,26 +1590,40 @@
16771590 _stop_conn(con, true);
16781591 }
16791592
1593
+static void shutdown_conn(struct connection *con)
1594
+{
1595
+ if (con->shutdown_action)
1596
+ con->shutdown_action(con);
1597
+}
1598
+
1599
+static void connection_release(struct rcu_head *rcu)
1600
+{
1601
+ struct connection *con = container_of(rcu, struct connection, rcu);
1602
+
1603
+ kfree(con->rx_buf);
1604
+ kfree(con);
1605
+}
1606
+
16801607 static void free_conn(struct connection *con)
16811608 {
16821609 close_connection(con, true, true, true);
1683
- if (con->othercon)
1684
- kmem_cache_free(con_cache, con->othercon);
1685
- hlist_del(&con->list);
1686
- kmem_cache_free(con_cache, con);
1610
+ spin_lock(&connections_lock);
1611
+ hlist_del_rcu(&con->list);
1612
+ spin_unlock(&connections_lock);
1613
+ if (con->othercon) {
1614
+ clean_one_writequeue(con->othercon);
1615
+ call_rcu(&con->othercon->rcu, connection_release);
1616
+ }
1617
+ clean_one_writequeue(con);
1618
+ call_rcu(&con->rcu, connection_release);
16871619 }
16881620
16891621 static void work_flush(void)
16901622 {
1691
- int ok;
1623
+ int ok, idx;
16921624 int i;
1693
- struct hlist_node *n;
16941625 struct connection *con;
16951626
1696
- if (recv_workqueue)
1697
- flush_workqueue(recv_workqueue);
1698
- if (send_workqueue)
1699
- flush_workqueue(send_workqueue);
17001627 do {
17011628 ok = 1;
17021629 foreach_conn(stop_conn);
....@@ -1704,9 +1631,10 @@
17041631 flush_workqueue(recv_workqueue);
17051632 if (send_workqueue)
17061633 flush_workqueue(send_workqueue);
1634
+ idx = srcu_read_lock(&connections_srcu);
17071635 for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1708
- hlist_for_each_entry_safe(con, n,
1709
- &connection_hash[i], list) {
1636
+ hlist_for_each_entry_rcu(con, &connection_hash[i],
1637
+ list) {
17101638 ok &= test_bit(CF_READ_PENDING, &con->flags);
17111639 ok &= test_bit(CF_WRITE_PENDING, &con->flags);
17121640 if (con->othercon) {
....@@ -1717,6 +1645,7 @@
17171645 }
17181646 }
17191647 }
1648
+ srcu_read_unlock(&connections_srcu, idx);
17201649 } while (!ok);
17211650 }
17221651
....@@ -1725,15 +1654,18 @@
17251654 /* Set all the flags to prevent any
17261655 socket activity.
17271656 */
1728
- mutex_lock(&connections_lock);
17291657 dlm_allow_conn = 0;
1730
- mutex_unlock(&connections_lock);
1658
+
1659
+ if (recv_workqueue)
1660
+ flush_workqueue(recv_workqueue);
1661
+ if (send_workqueue)
1662
+ flush_workqueue(send_workqueue);
1663
+
1664
+ foreach_conn(shutdown_conn);
17311665 work_flush();
1732
- clean_writequeues();
17331666 foreach_conn(free_conn);
17341667 work_stop();
1735
-
1736
- kmem_cache_destroy(con_cache);
1668
+ deinit_local();
17371669 }
17381670
17391671 int dlm_lowcomms_start(void)
....@@ -1752,16 +1684,9 @@
17521684 goto fail;
17531685 }
17541686
1755
- error = -ENOMEM;
1756
- con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1757
- __alignof__(struct connection), 0,
1758
- NULL);
1759
- if (!con_cache)
1760
- goto fail;
1761
-
17621687 error = work_start();
17631688 if (error)
1764
- goto fail_destroy;
1689
+ goto fail;
17651690
17661691 dlm_allow_conn = 1;
17671692
....@@ -1778,12 +1703,8 @@
17781703 fail_unlisten:
17791704 dlm_allow_conn = 0;
17801705 con = nodeid2con(0,0);
1781
- if (con) {
1782
- close_connection(con, false, true, true);
1783
- kmem_cache_free(con_cache, con);
1784
- }
1785
-fail_destroy:
1786
- kmem_cache_destroy(con_cache);
1706
+ if (con)
1707
+ free_conn(con);
17871708 fail:
17881709 return error;
17891710 }