From 95099d4622f8cb224d94e314c7a8e0df60b13f87 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Sat, 09 Dec 2023 08:38:01 +0000
Subject: [PATCH] enable docker ppp
---
kernel/fs/dlm/lowcomms.c | 647 +++++++++++++++++++++++++---------------------------------
1 files changed, 284 insertions(+), 363 deletions(-)
diff --git a/kernel/fs/dlm/lowcomms.c b/kernel/fs/dlm/lowcomms.c
index f476a90..68b7653 100644
--- a/kernel/fs/dlm/lowcomms.c
+++ b/kernel/fs/dlm/lowcomms.c
@@ -1,12 +1,10 @@
+// SPDX-License-Identifier: GPL-2.0-only
/******************************************************************************
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
**
-** This copyrighted material is made available to anyone wishing to use,
-** modify, copy, or redistribute it subject to the terms and conditions
-** of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/
@@ -65,40 +63,7 @@
/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
-
-struct cbuf {
- unsigned int base;
- unsigned int len;
- unsigned int mask;
-};
-
-static void cbuf_add(struct cbuf *cb, int n)
-{
- cb->len += n;
-}
-
-static int cbuf_data(struct cbuf *cb)
-{
- return ((cb->base + cb->len) & cb->mask);
-}
-
-static void cbuf_init(struct cbuf *cb, int size)
-{
- cb->base = cb->len = 0;
- cb->mask = size-1;
-}
-
-static void cbuf_eat(struct cbuf *cb, int n)
-{
- cb->len -= n;
- cb->base += n;
- cb->base &= cb->mask;
-}
-
-static bool cbuf_empty(struct cbuf *cb)
-{
- return cb->len == 0;
-}
+#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct connection {
struct socket *sock; /* NULL if not connected */
@@ -112,18 +77,23 @@
#define CF_CLOSE 6
#define CF_APP_LIMITED 7
#define CF_CLOSING 8
+#define CF_SHUTDOWN 9
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
- struct page *rx_page;
- struct cbuf cb;
+ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
+ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
+ unsigned char *rx_buf;
+ int rx_buflen;
+ int rx_leftover;
+ struct rcu_head rcu;
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -165,8 +135,8 @@
static struct workqueue_struct *send_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE];
-static DEFINE_MUTEX(connections_lock);
-static struct kmem_cache *con_cache;
+static DEFINE_SPINLOCK(connections_lock);
+DEFINE_STATIC_SRCU(connections_srcu);
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
@@ -182,15 +152,20 @@
static struct connection *__find_con(int nodeid)
{
- int r;
+ int r, idx;
struct connection *con;
r = nodeid_hash(nodeid);
- hlist_for_each_entry(con, &connection_hash[r], list) {
- if (con->nodeid == nodeid)
+ idx = srcu_read_lock(&connections_srcu);
+ hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
+ if (con->nodeid == nodeid) {
+ srcu_read_unlock(&connections_srcu, idx);
return con;
+ }
}
+ srcu_read_unlock(&connections_srcu, idx);
+
return NULL;
}
@@ -198,21 +173,25 @@
* If 'allocation' is zero then we don't attempt to create a new
* connection structure for this node.
*/
-static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
+static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{
- struct connection *con = NULL;
+ struct connection *con, *tmp;
int r;
con = __find_con(nodeid);
if (con || !alloc)
return con;
- con = kmem_cache_zalloc(con_cache, alloc);
+ con = kzalloc(sizeof(*con), alloc);
if (!con)
return NULL;
- r = nodeid_hash(nodeid);
- hlist_add_head(&con->list, &connection_hash[r]);
+ con->rx_buflen = dlm_config.ci_buffer_size;
+ con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
+ if (!con->rx_buf) {
+ kfree(con);
+ return NULL;
+ }
con->nodeid = nodeid;
mutex_init(&con->sock_mutex);
@@ -220,6 +199,7 @@
spin_lock_init(&con->writequeue_lock);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
+ init_waitqueue_head(&con->shutdown_wait);
/* Setup action pointers for child sockets */
if (con->nodeid) {
@@ -230,31 +210,41 @@
con->rx_action = zerocon->rx_action;
}
+ r = nodeid_hash(nodeid);
+
+ spin_lock(&connections_lock);
+ /* Because multiple workqueues/threads calls this function it can
+ * race on multiple cpu's. Instead of locking hot path __find_con()
+ * we just check in rare cases of recently added nodes again
+ * under protection of connections_lock. If this is the case we
+ * abort our connection creation and return the existing connection.
+ */
+ tmp = __find_con(nodeid);
+ if (tmp) {
+ spin_unlock(&connections_lock);
+ kfree(con->rx_buf);
+ kfree(con);
+ return tmp;
+ }
+
+ hlist_add_head_rcu(&con->list, &connection_hash[r]);
+ spin_unlock(&connections_lock);
+
return con;
}
/* Loop round all connections */
static void foreach_conn(void (*conn_func)(struct connection *c))
{
- int i;
- struct hlist_node *n;
+ int i, idx;
struct connection *con;
+ idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) {
- hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
+ hlist_for_each_entry_rcu(con, &connection_hash[i], list)
conn_func(con);
}
-}
-
-static struct connection *nodeid2con(int nodeid, gfp_t allocation)
-{
- struct connection *con;
-
- mutex_lock(&connections_lock);
- con = __nodeid2con(nodeid, allocation);
- mutex_unlock(&connections_lock);
-
- return con;
+ srcu_read_unlock(&connections_srcu, idx);
}
static struct dlm_node_addr *find_node_addr(int nodeid)
@@ -481,8 +471,8 @@
static void lowcomms_error_report(struct sock *sk)
{
struct connection *con;
- struct sockaddr_storage saddr;
void (*orig_report)(struct sock *) = NULL;
+ struct inet_sock *inet;
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
@@ -490,34 +480,33 @@
goto out;
orig_report = listen_sock.sk_error_report;
- if (con->sock == NULL ||
- kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
- printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
- "sending to node %d, port %d, "
- "sk_err=%d/%d\n", dlm_our_nodeid(),
- con->nodeid, dlm_config.ci_tcp_port,
- sk->sk_err, sk->sk_err_soft);
- } else if (saddr.ss_family == AF_INET) {
- struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
+ inet = inet_sk(sk);
+ switch (sk->sk_family) {
+ case AF_INET:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
- "sending to node %d at %pI4, port %d, "
+ "sending to node %d at %pI4, dport %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
- con->nodeid, &sin4->sin_addr.s_addr,
- dlm_config.ci_tcp_port, sk->sk_err,
+ con->nodeid, &inet->inet_daddr,
+ ntohs(inet->inet_dport), sk->sk_err,
sk->sk_err_soft);
- } else {
- struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
-
+ break;
+#if IS_ENABLED(CONFIG_IPV6)
+ case AF_INET6:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
- "sending to node %d at %u.%u.%u.%u, "
- "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
- con->nodeid, sin6->sin6_addr.s6_addr32[0],
- sin6->sin6_addr.s6_addr32[1],
- sin6->sin6_addr.s6_addr32[2],
- sin6->sin6_addr.s6_addr32[3],
- dlm_config.ci_tcp_port, sk->sk_err,
+ "sending to node %d at %pI6c, "
+ "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
+ con->nodeid, &sk->sk_v6_daddr,
+ ntohs(inet->inet_dport), sk->sk_err,
sk->sk_err_soft);
+ break;
+#endif
+ default:
+ printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
+ "invalid socket family %d set, "
+ "sk_err=%d/%d\n", dlm_our_nodeid(),
+ sk->sk_family, sk->sk_err, sk->sk_err_soft);
+ goto out;
}
out:
read_unlock_bh(&sk->sk_callback_lock);
@@ -611,26 +600,85 @@
/* Will only re-enter once. */
close_connection(con->othercon, false, tx, rx);
}
- if (con->rx_page) {
- __free_page(con->rx_page);
- con->rx_page = NULL;
- }
+ con->rx_leftover = 0;
con->retries = 0;
mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
}
+static void shutdown_connection(struct connection *con)
+{
+ int ret;
+
+ flush_work(&con->swork);
+
+ mutex_lock(&con->sock_mutex);
+ /* nothing to shutdown */
+ if (!con->sock) {
+ mutex_unlock(&con->sock_mutex);
+ return;
+ }
+
+ set_bit(CF_SHUTDOWN, &con->flags);
+ ret = kernel_sock_shutdown(con->sock, SHUT_WR);
+ mutex_unlock(&con->sock_mutex);
+ if (ret) {
+ log_print("Connection %p failed to shutdown: %d will force close",
+ con, ret);
+ goto force_close;
+ } else {
+ ret = wait_event_timeout(con->shutdown_wait,
+ !test_bit(CF_SHUTDOWN, &con->flags),
+ DLM_SHUTDOWN_WAIT_TIMEOUT);
+ if (ret == 0) {
+ log_print("Connection %p shutdown timed out, will force close",
+ con);
+ goto force_close;
+ }
+ }
+
+ return;
+
+force_close:
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ close_connection(con, false, true, true);
+}
+
+static void dlm_tcp_shutdown(struct connection *con)
+{
+ if (con->othercon)
+ shutdown_connection(con->othercon);
+ shutdown_connection(con);
+}
+
+static int con_realloc_receive_buf(struct connection *con, int newlen)
+{
+ unsigned char *newbuf;
+
+ newbuf = kmalloc(newlen, GFP_NOFS);
+ if (!newbuf)
+ return -ENOMEM;
+
+ /* copy any leftover from last receive */
+ if (con->rx_leftover)
+ memmove(newbuf, con->rx_buf, con->rx_leftover);
+
+ /* swap to new buffer space */
+ kfree(con->rx_buf);
+ con->rx_buflen = newlen;
+ con->rx_buf = newbuf;
+
+ return 0;
+}
+
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
- int ret = 0;
- struct msghdr msg = {};
- struct kvec iov[2];
- unsigned len;
- int r;
int call_again_soon = 0;
- int nvec;
+ struct msghdr msg;
+ struct kvec iov;
+ int ret, buflen;
mutex_lock(&con->sock_mutex);
@@ -638,71 +686,55 @@
ret = -EAGAIN;
goto out_close;
}
+
if (con->nodeid == 0) {
ret = -EINVAL;
goto out_close;
}
- if (con->rx_page == NULL) {
- /*
- * This doesn't need to be atomic, but I think it should
- * improve performance if it is.
- */
- con->rx_page = alloc_page(GFP_ATOMIC);
- if (con->rx_page == NULL)
+ /* realloc if we get new buffer size to read out */
+ buflen = dlm_config.ci_buffer_size;
+ if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
+ ret = con_realloc_receive_buf(con, buflen);
+ if (ret < 0)
goto out_resched;
- cbuf_init(&con->cb, PAGE_SIZE);
}
- /*
- * iov[0] is the bit of the circular buffer between the current end
- * point (cb.base + cb.len) and the end of the buffer.
+ /* calculate new buffer parameter regarding last receive and
+ * possible leftover bytes
*/
- iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
- iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
- iov[1].iov_len = 0;
- nvec = 1;
+ iov.iov_base = con->rx_buf + con->rx_leftover;
+ iov.iov_len = con->rx_buflen - con->rx_leftover;
- /*
- * iov[1] is the bit of the circular buffer between the start of the
- * buffer and the start of the currently used section (cb.base)
- */
- if (cbuf_data(&con->cb) >= con->cb.base) {
- iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
- iov[1].iov_len = con->cb.base;
- iov[1].iov_base = page_address(con->rx_page);
- nvec = 2;
- }
- len = iov[0].iov_len + iov[1].iov_len;
- iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, iov, nvec, len);
-
- r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+ ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
+ msg.msg_flags);
if (ret <= 0)
goto out_close;
- else if (ret == len)
+ else if (ret == iov.iov_len)
call_again_soon = 1;
- cbuf_add(&con->cb, ret);
- ret = dlm_process_incoming_buffer(con->nodeid,
- page_address(con->rx_page),
- con->cb.base, con->cb.len,
- PAGE_SIZE);
- if (ret == -EBADMSG) {
- log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
- page_address(con->rx_page), con->cb.base,
- con->cb.len, r);
- }
+ /* new buflen according readed bytes and leftover from last receive */
+ buflen = ret + con->rx_leftover;
+ ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
if (ret < 0)
goto out_close;
- cbuf_eat(&con->cb, ret);
- if (cbuf_empty(&con->cb) && !call_again_soon) {
- __free_page(con->rx_page);
- con->rx_page = NULL;
+ /* calculate leftover bytes from process and put it into begin of
+ * the receive buffer, so next receive we have the full message
+ * at the start address of the receive buffer.
+ */
+ con->rx_leftover = buflen - ret;
+ if (con->rx_leftover) {
+ memmove(con->rx_buf, con->rx_buf + ret,
+ con->rx_leftover);
+ call_again_soon = true;
}
if (call_again_soon)
goto out_resched;
+
mutex_unlock(&con->sock_mutex);
return 0;
@@ -715,18 +747,23 @@
out_close:
mutex_unlock(&con->sock_mutex);
if (ret != -EAGAIN) {
- close_connection(con, true, true, false);
/* Reconnect when there is something to send */
+ close_connection(con, false, true, false);
+ if (ret == 0) {
+ log_print("connection %p got EOF from %d",
+ con, con->nodeid);
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ /* signal to breaking receive worker */
+ ret = -1;
+ }
}
- /* Don't return success if we really got EOF */
- if (ret == 0)
- ret = -EAGAIN;
-
return ret;
}
/* Listening socket is busy, accept a connection */
-static int tcp_accept_from_sock(struct connection *con)
+static int accept_from_sock(struct connection *con)
{
int result;
struct sockaddr_storage peeraddr;
@@ -735,13 +772,11 @@
int nodeid;
struct connection *newcon;
struct connection *addcon;
+ unsigned int mark;
- mutex_lock(&connections_lock);
if (!dlm_allow_conn) {
- mutex_unlock(&connections_lock);
return -1;
}
- mutex_unlock(&connections_lock);
mutex_lock_nested(&con->sock_mutex, 0);
@@ -774,6 +809,9 @@
return -1;
}
+ dlm_comm_mark(nodeid, &mark);
+ sock_set_mark(newsock->sk, mark);
+
log_print("got connection from %d", nodeid);
/* Check to see if we already have a connection to this node. This
@@ -791,13 +829,24 @@
struct connection *othercon = newcon->othercon;
if (!othercon) {
- othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
+ othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
if (!othercon) {
log_print("failed to allocate incoming socket");
mutex_unlock(&newcon->sock_mutex);
result = -ENOMEM;
goto accept_err;
}
+
+ othercon->rx_buflen = dlm_config.ci_buffer_size;
+ othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
+ if (!othercon->rx_buf) {
+ mutex_unlock(&newcon->sock_mutex);
+ kfree(othercon);
+ log_print("failed to allocate incoming socket receive buffer");
+ result = -ENOMEM;
+ goto accept_err;
+ }
+
othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex);
@@ -805,22 +854,18 @@
spin_lock_init(&othercon->writequeue_lock);
INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets);
+ init_waitqueue_head(&othercon->shutdown_wait);
set_bit(CF_IS_OTHERCON, &othercon->flags);
+ } else {
+ /* close other sock con if we have something new */
+ close_connection(othercon, false, true, false);
}
+
mutex_lock_nested(&othercon->sock_mutex, 2);
- if (!othercon->sock) {
- newcon->othercon = othercon;
- add_sock(newsock, othercon);
- addcon = othercon;
- mutex_unlock(&othercon->sock_mutex);
- }
- else {
- printk("Extra connection from node %d attempted\n", nodeid);
- result = -EAGAIN;
- mutex_unlock(&othercon->sock_mutex);
- mutex_unlock(&newcon->sock_mutex);
- goto accept_err;
- }
+ newcon->othercon = othercon;
+ add_sock(newsock, othercon);
+ addcon = othercon;
+ mutex_unlock(&othercon->sock_mutex);
}
else {
newcon->rx_action = receive_from_sock;
@@ -854,123 +899,6 @@
return result;
}
-static int sctp_accept_from_sock(struct connection *con)
-{
- /* Check that the new node is in the lockspace */
- struct sctp_prim prim;
- int nodeid;
- int prim_len, ret;
- int addr_len;
- struct connection *newcon;
- struct connection *addcon;
- struct socket *newsock;
-
- mutex_lock(&connections_lock);
- if (!dlm_allow_conn) {
- mutex_unlock(&connections_lock);
- return -1;
- }
- mutex_unlock(&connections_lock);
-
- mutex_lock_nested(&con->sock_mutex, 0);
-
- ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
- if (ret < 0)
- goto accept_err;
-
- memset(&prim, 0, sizeof(struct sctp_prim));
- prim_len = sizeof(struct sctp_prim);
-
- ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
- (char *)&prim, &prim_len);
- if (ret < 0) {
- log_print("getsockopt/sctp_primary_addr failed: %d", ret);
- goto accept_err;
- }
-
- make_sockaddr(&prim.ssp_addr, 0, &addr_len);
- ret = addr_to_nodeid(&prim.ssp_addr, &nodeid);
- if (ret) {
- unsigned char *b = (unsigned char *)&prim.ssp_addr;
-
- log_print("reject connect from unknown addr");
- print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
- b, sizeof(struct sockaddr_storage));
- goto accept_err;
- }
-
- newcon = nodeid2con(nodeid, GFP_NOFS);
- if (!newcon) {
- ret = -ENOMEM;
- goto accept_err;
- }
-
- mutex_lock_nested(&newcon->sock_mutex, 1);
-
- if (newcon->sock) {
- struct connection *othercon = newcon->othercon;
-
- if (!othercon) {
- othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
- if (!othercon) {
- log_print("failed to allocate incoming socket");
- mutex_unlock(&newcon->sock_mutex);
- ret = -ENOMEM;
- goto accept_err;
- }
- othercon->nodeid = nodeid;
- othercon->rx_action = receive_from_sock;
- mutex_init(&othercon->sock_mutex);
- INIT_LIST_HEAD(&othercon->writequeue);
- spin_lock_init(&othercon->writequeue_lock);
- INIT_WORK(&othercon->swork, process_send_sockets);
- INIT_WORK(&othercon->rwork, process_recv_sockets);
- set_bit(CF_IS_OTHERCON, &othercon->flags);
- }
- mutex_lock_nested(&othercon->sock_mutex, 2);
- if (!othercon->sock) {
- newcon->othercon = othercon;
- add_sock(newsock, othercon);
- addcon = othercon;
- mutex_unlock(&othercon->sock_mutex);
- } else {
- printk("Extra connection from node %d attempted\n", nodeid);
- ret = -EAGAIN;
- mutex_unlock(&othercon->sock_mutex);
- mutex_unlock(&newcon->sock_mutex);
- goto accept_err;
- }
- } else {
- newcon->rx_action = receive_from_sock;
- add_sock(newsock, newcon);
- addcon = newcon;
- }
-
- log_print("connected to %d", nodeid);
-
- mutex_unlock(&newcon->sock_mutex);
-
- /*
- * Add it to the active queue in case we got data
- * between processing the accept adding the socket
- * to the read_sockets list
- */
- if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
- queue_work(recv_workqueue, &addcon->rwork);
- mutex_unlock(&con->sock_mutex);
-
- return 0;
-
-accept_err:
- mutex_unlock(&con->sock_mutex);
- if (newsock)
- sock_release(newsock);
- if (ret != -EAGAIN)
- log_print("error accepting connection from node: %d", ret);
-
- return ret;
-}
-
static void free_entry(struct writequeue_entry *e)
{
__free_page(e->page);
@@ -1001,6 +929,7 @@
static int sctp_bind_addrs(struct connection *con, uint16_t port)
{
struct sockaddr_storage localaddr;
+ struct sockaddr *addr = (struct sockaddr *)&localaddr;
int i, addr_len, result = 0;
for (i = 0; i < dlm_local_count; i++) {
@@ -1008,13 +937,9 @@
make_sockaddr(&localaddr, port, &addr_len);
if (!i)
- result = kernel_bind(con->sock,
- (struct sockaddr *)&localaddr,
- addr_len);
+ result = kernel_bind(con->sock, addr, addr_len);
else
- result = kernel_setsockopt(con->sock, SOL_SCTP,
- SCTP_SOCKOPT_BINDX_ADD,
- (char *)&localaddr, addr_len);
+ result = sock_bind_add(con->sock->sk, addr, addr_len);
if (result < 0) {
log_print("Can't bind to %d addr number %d, %d.\n",
@@ -1033,16 +958,17 @@
static void sctp_connect_to_sock(struct connection *con)
{
struct sockaddr_storage daddr;
- int one = 1;
int result;
int addr_len;
struct socket *sock;
- struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
+ unsigned int mark;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
return;
}
+
+ dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex);
@@ -1068,6 +994,8 @@
if (result < 0)
goto socket_err;
+ sock_set_mark(sock->sk, mark);
+
con->rx_action = receive_from_sock;
con->connect_action = sctp_connect_to_sock;
add_sock(sock, con);
@@ -1081,21 +1009,17 @@
log_print("connecting to %d", con->nodeid);
/* Turn off Nagle's algorithm */
- kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
- sizeof(one));
+ sctp_sock_set_nodelay(sock->sk);
/*
* Make sock->ops->connect() function return in specified time,
* since O_NONBLOCK argument in connect() function does not work here,
* then, we should restore the default value of this attribute.
*/
- kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
- sizeof(tv));
+ sock_set_sndtimeo(sock->sk, 5);
result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
0);
- memset(&tv, 0, sizeof(tv));
- kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
- sizeof(tv));
+ sock_set_sndtimeo(sock->sk, 0);
if (result == -EINPROGRESS)
result = 0;
@@ -1134,13 +1058,15 @@
struct sockaddr_storage saddr, src_addr;
int addr_len;
struct socket *sock = NULL;
- int one = 1;
+ unsigned int mark;
int result;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
return;
}
+
+ dlm_comm_mark(con->nodeid, &mark);
mutex_lock(&con->sock_mutex);
if (con->retries++ > MAX_CONNECT_RETRIES)
@@ -1156,6 +1082,8 @@
if (result < 0)
goto out_err;
+ sock_set_mark(sock->sk, mark);
+
memset(&saddr, 0, sizeof(saddr));
result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
if (result < 0) {
@@ -1165,6 +1093,7 @@
con->rx_action = receive_from_sock;
con->connect_action = tcp_connect_to_sock;
+ con->shutdown_action = dlm_tcp_shutdown;
add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid
@@ -1183,8 +1112,7 @@
log_print("connecting to %d", con->nodeid);
/* Turn off Nagle's algorithm */
- kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
- sizeof(one));
+ tcp_sock_set_nodelay(sock->sk);
result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
O_NONBLOCK);
@@ -1226,7 +1154,6 @@
{
struct socket *sock = NULL;
int result = 0;
- int one = 1;
int addr_len;
if (dlm_local_addr[0]->ss_family == AF_INET)
@@ -1242,20 +1169,17 @@
goto create_out;
}
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
+
/* Turn off Nagle's algorithm */
- kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
- sizeof(one));
+ tcp_sock_set_nodelay(sock->sk);
- result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
- (char *)&one, sizeof(one));
+ sock_set_reuseaddr(sock->sk);
- if (result < 0) {
- log_print("Failed to set SO_REUSEADDR on socket: %d", result);
- }
write_lock_bh(&sock->sk->sk_callback_lock);
sock->sk->sk_user_data = con;
save_listen_callbacks(sock);
- con->rx_action = tcp_accept_from_sock;
+ con->rx_action = accept_from_sock;
con->connect_action = tcp_connect_to_sock;
write_unlock_bh(&sock->sk->sk_callback_lock);
@@ -1269,11 +1193,7 @@
con->sock = NULL;
goto create_out;
}
- result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
- (char *)&one, sizeof(one));
- if (result < 0) {
- log_print("Set keepalive failed: %d", result);
- }
+ sock_set_keepalive(sock->sk);
result = sock->ops->listen(sock, 5);
if (result < 0) {
@@ -1305,14 +1225,20 @@
}
}
+static void deinit_local(void)
+{
+ int i;
+
+ for (i = 0; i < dlm_local_count; i++)
+ kfree(dlm_local_addr[i]);
+}
+
/* Initialise SCTP socket and bind to all interfaces */
static int sctp_listen_for_all(void)
{
struct socket *sock = NULL;
int result = -EINVAL;
struct connection *con = nodeid2con(0, GFP_NOFS);
- int bufsize = NEEDED_RMEM;
- int one = 1;
if (!con)
return -ENOMEM;
@@ -1326,15 +1252,9 @@
goto out;
}
- result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
- (char *)&bufsize, sizeof(bufsize));
- if (result)
- log_print("Error increasing buffer space on socket %d", result);
-
- result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
- sizeof(one));
- if (result < 0)
- log_print("Could not set SCTP NODELAY error %d\n", result);
+ sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
+ sctp_sock_set_nodelay(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
/* Init con struct */
@@ -1342,7 +1262,7 @@
save_listen_callbacks(sock);
con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready;
- con->rx_action = sctp_accept_from_sock;
+ con->rx_action = accept_from_sock;
con->connect_action = sctp_connect_to_sock;
write_unlock_bh(&sock->sk->sk_callback_lock);
@@ -1545,7 +1465,7 @@
send_error:
mutex_unlock(&con->sock_mutex);
- close_connection(con, true, false, true);
+ close_connection(con, false, false, true);
/* Requeue the send work. When the work daemon runs again, it will try
a new connection, then call this function again. */
queue_work(send_workqueue, &con->swork);
@@ -1621,13 +1541,6 @@
send_to_sock(con);
}
-
-/* Discard all entries on the write queues */
-static void clean_writequeues(void)
-{
- foreach_conn(clean_one_writequeue);
-}
-
static void work_stop(void)
{
if (recv_workqueue)
@@ -1677,26 +1590,40 @@
_stop_conn(con, true);
}
+static void shutdown_conn(struct connection *con)
+{
+ if (con->shutdown_action)
+ con->shutdown_action(con);
+}
+
+static void connection_release(struct rcu_head *rcu)
+{
+ struct connection *con = container_of(rcu, struct connection, rcu);
+
+ kfree(con->rx_buf);
+ kfree(con);
+}
+
static void free_conn(struct connection *con)
{
close_connection(con, true, true, true);
- if (con->othercon)
- kmem_cache_free(con_cache, con->othercon);
- hlist_del(&con->list);
- kmem_cache_free(con_cache, con);
+ spin_lock(&connections_lock);
+ hlist_del_rcu(&con->list);
+ spin_unlock(&connections_lock);
+ if (con->othercon) {
+ clean_one_writequeue(con->othercon);
+ call_rcu(&con->othercon->rcu, connection_release);
+ }
+ clean_one_writequeue(con);
+ call_rcu(&con->rcu, connection_release);
}
static void work_flush(void)
{
- int ok;
+ int ok, idx;
int i;
- struct hlist_node *n;
struct connection *con;
- if (recv_workqueue)
- flush_workqueue(recv_workqueue);
- if (send_workqueue)
- flush_workqueue(send_workqueue);
do {
ok = 1;
foreach_conn(stop_conn);
@@ -1704,9 +1631,10 @@
flush_workqueue(recv_workqueue);
if (send_workqueue)
flush_workqueue(send_workqueue);
+ idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
- hlist_for_each_entry_safe(con, n,
- &connection_hash[i], list) {
+ hlist_for_each_entry_rcu(con, &connection_hash[i],
+ list) {
ok &= test_bit(CF_READ_PENDING, &con->flags);
ok &= test_bit(CF_WRITE_PENDING, &con->flags);
if (con->othercon) {
@@ -1717,6 +1645,7 @@
}
}
}
+ srcu_read_unlock(&connections_srcu, idx);
} while (!ok);
}
@@ -1725,15 +1654,18 @@
/* Set all the flags to prevent any
socket activity.
*/
- mutex_lock(&connections_lock);
dlm_allow_conn = 0;
- mutex_unlock(&connections_lock);
+
+ if (recv_workqueue)
+ flush_workqueue(recv_workqueue);
+ if (send_workqueue)
+ flush_workqueue(send_workqueue);
+
+ foreach_conn(shutdown_conn);
work_flush();
- clean_writequeues();
foreach_conn(free_conn);
work_stop();
-
- kmem_cache_destroy(con_cache);
+ deinit_local();
}
int dlm_lowcomms_start(void)
@@ -1752,16 +1684,9 @@
goto fail;
}
- error = -ENOMEM;
- con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
- __alignof__(struct connection), 0,
- NULL);
- if (!con_cache)
- goto fail;
-
error = work_start();
if (error)
- goto fail_destroy;
+ goto fail;
dlm_allow_conn = 1;
@@ -1778,12 +1703,8 @@
fail_unlisten:
dlm_allow_conn = 0;
con = nodeid2con(0,0);
- if (con) {
- close_connection(con, false, true, true);
- kmem_cache_free(con_cache, con);
- }
-fail_destroy:
- kmem_cache_destroy(con_cache);
+ if (con)
+ free_conn(con);
fail:
return error;
}
--
Gitblit v1.6.2