From a36159eec6ca17402b0e146b86efaf76568dc353 Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Fri, 20 Sep 2024 01:41:23 +0000 Subject: [PATCH] 重命名 AX88772C_eeprom/asix.c 为 asix_mac.c --- kernel/fs/dlm/lowcomms.c | 647 +++++++++++++++++++++++++--------------------------------- 1 files changed, 284 insertions(+), 363 deletions(-) diff --git a/kernel/fs/dlm/lowcomms.c b/kernel/fs/dlm/lowcomms.c index f476a90..68b7653 100644 --- a/kernel/fs/dlm/lowcomms.c +++ b/kernel/fs/dlm/lowcomms.c @@ -1,12 +1,10 @@ +// SPDX-License-Identifier: GPL-2.0-only /****************************************************************************** ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. ** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. ** ******************************************************************************* ******************************************************************************/ @@ -65,40 +63,7 @@ /* Number of messages to send before rescheduling */ #define MAX_SEND_MSG_COUNT 25 - -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -static bool cbuf_empty(struct cbuf *cb) -{ - return cb->len == 0; -} +#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) struct connection { struct socket *sock; /* NULL if not connected */ @@ -112,18 +77,23 @@ #define CF_CLOSE 6 #define CF_APP_LIMITED 7 #define CF_CLOSING 8 +#define CF_SHUTDOWN 9 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; int (*rx_action) (struct connection *); /* What to do when active */ void (*connect_action) (struct connection *); /* What to do to connect */ - struct page *rx_page; - struct cbuf cb; + void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; struct connection *othercon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ + wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ + unsigned char *rx_buf; + int rx_buflen; + int rx_leftover; + struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -165,8 +135,8 @@ static struct workqueue_struct *send_workqueue; static struct hlist_head connection_hash[CONN_HASH_SIZE]; -static DEFINE_MUTEX(connections_lock); -static struct kmem_cache *con_cache; +static DEFINE_SPINLOCK(connections_lock); +DEFINE_STATIC_SRCU(connections_srcu); static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); @@ -182,15 +152,20 @@ static struct connection *__find_con(int nodeid) { - int r; + int r, idx; struct connection *con; r = nodeid_hash(nodeid); - hlist_for_each_entry(con, &connection_hash[r], list) { - if (con->nodeid == nodeid) + idx = srcu_read_lock(&connections_srcu); + hlist_for_each_entry_rcu(con, &connection_hash[r], list) { + if (con->nodeid == nodeid) { + srcu_read_unlock(&connections_srcu, idx); return con; + } } + srcu_read_unlock(&connections_srcu, idx); + return NULL; } @@ -198,21 +173,25 @@ * If 'allocation' is zero then we don't attempt to create a new * connection structure for this node. */ -static struct connection *__nodeid2con(int nodeid, gfp_t alloc) +static struct connection *nodeid2con(int nodeid, gfp_t alloc) { - struct connection *con = NULL; + struct connection *con, *tmp; int r; con = __find_con(nodeid); if (con || !alloc) return con; - con = kmem_cache_zalloc(con_cache, alloc); + con = kzalloc(sizeof(*con), alloc); if (!con) return NULL; - r = nodeid_hash(nodeid); - hlist_add_head(&con->list, &connection_hash[r]); + con->rx_buflen = dlm_config.ci_buffer_size; + con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); + if (!con->rx_buf) { + kfree(con); + return NULL; + } con->nodeid = nodeid; mutex_init(&con->sock_mutex); @@ -220,6 +199,7 @@ spin_lock_init(&con->writequeue_lock); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); + init_waitqueue_head(&con->shutdown_wait); /* Setup action pointers for child sockets */ if (con->nodeid) { @@ -230,31 +210,41 @@ con->rx_action = zerocon->rx_action; } + r = nodeid_hash(nodeid); + + spin_lock(&connections_lock); + /* Because multiple workqueues/threads calls this function it can + * race on multiple cpu's. Instead of locking hot path __find_con() + * we just check in rare cases of recently added nodes again + * under protection of connections_lock. If this is the case we + * abort our connection creation and return the existing connection. + */ + tmp = __find_con(nodeid); + if (tmp) { + spin_unlock(&connections_lock); + kfree(con->rx_buf); + kfree(con); + return tmp; + } + + hlist_add_head_rcu(&con->list, &connection_hash[r]); + spin_unlock(&connections_lock); + return con; } /* Loop round all connections */ static void foreach_conn(void (*conn_func)(struct connection *c)) { - int i; - struct hlist_node *n; + int i, idx; struct connection *con; + idx = srcu_read_lock(&connections_srcu); for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_safe(con, n, &connection_hash[i], list) + hlist_for_each_entry_rcu(con, &connection_hash[i], list) conn_func(con); } -} - -static struct connection *nodeid2con(int nodeid, gfp_t allocation) -{ - struct connection *con; - - mutex_lock(&connections_lock); - con = __nodeid2con(nodeid, allocation); - mutex_unlock(&connections_lock); - - return con; + srcu_read_unlock(&connections_srcu, idx); } static struct dlm_node_addr *find_node_addr(int nodeid) @@ -481,8 +471,8 @@ static void lowcomms_error_report(struct sock *sk) { struct connection *con; - struct sockaddr_storage saddr; void (*orig_report)(struct sock *) = NULL; + struct inet_sock *inet; read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); @@ -490,34 +480,33 @@ goto out; orig_report = listen_sock.sk_error_report; - if (con->sock == NULL || - kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { - printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d, port %d, " - "sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, dlm_config.ci_tcp_port, - sk->sk_err, sk->sk_err_soft); - } else if (saddr.ss_family == AF_INET) { - struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; + inet = inet_sk(sk); + switch (sk->sk_family) { + case AF_INET: printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d at %pI4, port %d, " + "sending to node %d at %pI4, dport %d, " "sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, &sin4->sin_addr.s_addr, - dlm_config.ci_tcp_port, sk->sk_err, + con->nodeid, &inet->inet_daddr, + ntohs(inet->inet_dport), sk->sk_err, sk->sk_err_soft); - } else { - struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; - + break; +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d at %u.%u.%u.%u, " - "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, sin6->sin6_addr.s6_addr32[0], - sin6->sin6_addr.s6_addr32[1], - sin6->sin6_addr.s6_addr32[2], - sin6->sin6_addr.s6_addr32[3], - dlm_config.ci_tcp_port, sk->sk_err, + "sending to node %d at %pI6c, " + "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), + con->nodeid, &sk->sk_v6_daddr, + ntohs(inet->inet_dport), sk->sk_err, sk->sk_err_soft); + break; +#endif + default: + printk_ratelimited(KERN_ERR "dlm: node %d: socket error " + "invalid socket family %d set, " + "sk_err=%d/%d\n", dlm_our_nodeid(), + sk->sk_family, sk->sk_err, sk->sk_err_soft); + goto out; } out: read_unlock_bh(&sk->sk_callback_lock); @@ -611,26 +600,85 @@ /* Will only re-enter once. */ close_connection(con->othercon, false, tx, rx); } - if (con->rx_page) { - __free_page(con->rx_page); - con->rx_page = NULL; - } + con->rx_leftover = 0; con->retries = 0; mutex_unlock(&con->sock_mutex); clear_bit(CF_CLOSING, &con->flags); } +static void shutdown_connection(struct connection *con) +{ + int ret; + + flush_work(&con->swork); + + mutex_lock(&con->sock_mutex); + /* nothing to shutdown */ + if (!con->sock) { + mutex_unlock(&con->sock_mutex); + return; + } + + set_bit(CF_SHUTDOWN, &con->flags); + ret = kernel_sock_shutdown(con->sock, SHUT_WR); + mutex_unlock(&con->sock_mutex); + if (ret) { + log_print("Connection %p failed to shutdown: %d will force close", + con, ret); + goto force_close; + } else { + ret = wait_event_timeout(con->shutdown_wait, + !test_bit(CF_SHUTDOWN, &con->flags), + DLM_SHUTDOWN_WAIT_TIMEOUT); + if (ret == 0) { + log_print("Connection %p shutdown timed out, will force close", + con); + goto force_close; + } + } + + return; + +force_close: + clear_bit(CF_SHUTDOWN, &con->flags); + close_connection(con, false, true, true); +} + +static void dlm_tcp_shutdown(struct connection *con) +{ + if (con->othercon) + shutdown_connection(con->othercon); + shutdown_connection(con); +} + +static int con_realloc_receive_buf(struct connection *con, int newlen) +{ + unsigned char *newbuf; + + newbuf = kmalloc(newlen, GFP_NOFS); + if (!newbuf) + return -ENOMEM; + + /* copy any leftover from last receive */ + if (con->rx_leftover) + memmove(newbuf, con->rx_buf, con->rx_leftover); + + /* swap to new buffer space */ + kfree(con->rx_buf); + con->rx_buflen = newlen; + con->rx_buf = newbuf; + + return 0; +} + /* Data received from remote end */ static int receive_from_sock(struct connection *con) { - int ret = 0; - struct msghdr msg = {}; - struct kvec iov[2]; - unsigned len; - int r; int call_again_soon = 0; - int nvec; + struct msghdr msg; + struct kvec iov; + int ret, buflen; mutex_lock(&con->sock_mutex); @@ -638,71 +686,55 @@ ret = -EAGAIN; goto out_close; } + if (con->nodeid == 0) { ret = -EINVAL; goto out_close; } - if (con->rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - con->rx_page = alloc_page(GFP_ATOMIC); - if (con->rx_page == NULL) + /* realloc if we get new buffer size to read out */ + buflen = dlm_config.ci_buffer_size; + if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { + ret = con_realloc_receive_buf(con, buflen); + if (ret < 0) goto out_resched; - cbuf_init(&con->cb, PAGE_SIZE); } - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes */ - iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); - iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); - iov[1].iov_len = 0; - nvec = 1; + iov.iov_base = con->rx_buf + con->rx_leftover; + iov.iov_len = con->rx_buflen - con->rx_leftover; - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&con->cb) >= con->cb.base) { - iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb); - iov[1].iov_len = con->cb.base; - iov[1].iov_base = page_address(con->rx_page); - nvec = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, iov, nvec, len); - - r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); if (ret <= 0) goto out_close; - else if (ret == len) + else if (ret == iov.iov_len) call_again_soon = 1; - cbuf_add(&con->cb, ret); - ret = dlm_process_incoming_buffer(con->nodeid, - page_address(con->rx_page), - con->cb.base, con->cb.len, - PAGE_SIZE); - if (ret == -EBADMSG) { - log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d", - page_address(con->rx_page), con->cb.base, - con->cb.len, r); - } + /* new buflen according readed bytes and leftover from last receive */ + buflen = ret + con->rx_leftover; + ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); if (ret < 0) goto out_close; - cbuf_eat(&con->cb, ret); - if (cbuf_empty(&con->cb) && !call_again_soon) { - __free_page(con->rx_page); - con->rx_page = NULL; + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen - ret; + if (con->rx_leftover) { + memmove(con->rx_buf, con->rx_buf + ret, + con->rx_leftover); + call_again_soon = true; } if (call_again_soon) goto out_resched; + mutex_unlock(&con->sock_mutex); return 0; @@ -715,18 +747,23 @@ out_close: mutex_unlock(&con->sock_mutex); if (ret != -EAGAIN) { - close_connection(con, true, true, false); /* Reconnect when there is something to send */ + close_connection(con, false, true, false); + if (ret == 0) { + log_print("connection %p got EOF from %d", + con, con->nodeid); + /* handling for tcp shutdown */ + clear_bit(CF_SHUTDOWN, &con->flags); + wake_up(&con->shutdown_wait); + /* signal to breaking receive worker */ + ret = -1; + } } - /* Don't return success if we really got EOF */ - if (ret == 0) - ret = -EAGAIN; - return ret; } /* Listening socket is busy, accept a connection */ -static int tcp_accept_from_sock(struct connection *con) +static int accept_from_sock(struct connection *con) { int result; struct sockaddr_storage peeraddr; @@ -735,13 +772,11 @@ int nodeid; struct connection *newcon; struct connection *addcon; + unsigned int mark; - mutex_lock(&connections_lock); if (!dlm_allow_conn) { - mutex_unlock(&connections_lock); return -1; } - mutex_unlock(&connections_lock); mutex_lock_nested(&con->sock_mutex, 0); @@ -774,6 +809,9 @@ return -1; } + dlm_comm_mark(nodeid, &mark); + sock_set_mark(newsock->sk, mark); + log_print("got connection from %d", nodeid); /* Check to see if we already have a connection to this node. This @@ -791,13 +829,24 @@ struct connection *othercon = newcon->othercon; if (!othercon) { - othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); + othercon = kzalloc(sizeof(*othercon), GFP_NOFS); if (!othercon) { log_print("failed to allocate incoming socket"); mutex_unlock(&newcon->sock_mutex); result = -ENOMEM; goto accept_err; } + + othercon->rx_buflen = dlm_config.ci_buffer_size; + othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); + if (!othercon->rx_buf) { + mutex_unlock(&newcon->sock_mutex); + kfree(othercon); + log_print("failed to allocate incoming socket receive buffer"); + result = -ENOMEM; + goto accept_err; + } + othercon->nodeid = nodeid; othercon->rx_action = receive_from_sock; mutex_init(&othercon->sock_mutex); @@ -805,22 +854,18 @@ spin_lock_init(&othercon->writequeue_lock); INIT_WORK(&othercon->swork, process_send_sockets); INIT_WORK(&othercon->rwork, process_recv_sockets); + init_waitqueue_head(&othercon->shutdown_wait); set_bit(CF_IS_OTHERCON, &othercon->flags); + } else { + /* close other sock con if we have something new */ + close_connection(othercon, false, true, false); } + mutex_lock_nested(&othercon->sock_mutex, 2); - if (!othercon->sock) { - newcon->othercon = othercon; - add_sock(newsock, othercon); - addcon = othercon; - mutex_unlock(&othercon->sock_mutex); - } - else { - printk("Extra connection from node %d attempted\n", nodeid); - result = -EAGAIN; - mutex_unlock(&othercon->sock_mutex); - mutex_unlock(&newcon->sock_mutex); - goto accept_err; - } + newcon->othercon = othercon; + add_sock(newsock, othercon); + addcon = othercon; + mutex_unlock(&othercon->sock_mutex); } else { newcon->rx_action = receive_from_sock; @@ -854,123 +899,6 @@ return result; } -static int sctp_accept_from_sock(struct connection *con) -{ - /* Check that the new node is in the lockspace */ - struct sctp_prim prim; - int nodeid; - int prim_len, ret; - int addr_len; - struct connection *newcon; - struct connection *addcon; - struct socket *newsock; - - mutex_lock(&connections_lock); - if (!dlm_allow_conn) { - mutex_unlock(&connections_lock); - return -1; - } - mutex_unlock(&connections_lock); - - mutex_lock_nested(&con->sock_mutex, 0); - - ret = kernel_accept(con->sock, &newsock, O_NONBLOCK); - if (ret < 0) - goto accept_err; - - memset(&prim, 0, sizeof(struct sctp_prim)); - prim_len = sizeof(struct sctp_prim); - - ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR, - (char *)&prim, &prim_len); - if (ret < 0) { - log_print("getsockopt/sctp_primary_addr failed: %d", ret); - goto accept_err; - } - - make_sockaddr(&prim.ssp_addr, 0, &addr_len); - ret = addr_to_nodeid(&prim.ssp_addr, &nodeid); - if (ret) { - unsigned char *b = (unsigned char *)&prim.ssp_addr; - - log_print("reject connect from unknown addr"); - print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, - b, sizeof(struct sockaddr_storage)); - goto accept_err; - } - - newcon = nodeid2con(nodeid, GFP_NOFS); - if (!newcon) { - ret = -ENOMEM; - goto accept_err; - } - - mutex_lock_nested(&newcon->sock_mutex, 1); - - if (newcon->sock) { - struct connection *othercon = newcon->othercon; - - if (!othercon) { - othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); - if (!othercon) { - log_print("failed to allocate incoming socket"); - mutex_unlock(&newcon->sock_mutex); - ret = -ENOMEM; - goto accept_err; - } - othercon->nodeid = nodeid; - othercon->rx_action = receive_from_sock; - mutex_init(&othercon->sock_mutex); - INIT_LIST_HEAD(&othercon->writequeue); - spin_lock_init(&othercon->writequeue_lock); - INIT_WORK(&othercon->swork, process_send_sockets); - INIT_WORK(&othercon->rwork, process_recv_sockets); - set_bit(CF_IS_OTHERCON, &othercon->flags); - } - mutex_lock_nested(&othercon->sock_mutex, 2); - if (!othercon->sock) { - newcon->othercon = othercon; - add_sock(newsock, othercon); - addcon = othercon; - mutex_unlock(&othercon->sock_mutex); - } else { - printk("Extra connection from node %d attempted\n", nodeid); - ret = -EAGAIN; - mutex_unlock(&othercon->sock_mutex); - mutex_unlock(&newcon->sock_mutex); - goto accept_err; - } - } else { - newcon->rx_action = receive_from_sock; - add_sock(newsock, newcon); - addcon = newcon; - } - - log_print("connected to %d", nodeid); - - mutex_unlock(&newcon->sock_mutex); - - /* - * Add it to the active queue in case we got data - * between processing the accept adding the socket - * to the read_sockets list - */ - if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) - queue_work(recv_workqueue, &addcon->rwork); - mutex_unlock(&con->sock_mutex); - - return 0; - -accept_err: - mutex_unlock(&con->sock_mutex); - if (newsock) - sock_release(newsock); - if (ret != -EAGAIN) - log_print("error accepting connection from node: %d", ret); - - return ret; -} - static void free_entry(struct writequeue_entry *e) { __free_page(e->page); @@ -1001,6 +929,7 @@ static int sctp_bind_addrs(struct connection *con, uint16_t port) { struct sockaddr_storage localaddr; + struct sockaddr *addr = (struct sockaddr *)&localaddr; int i, addr_len, result = 0; for (i = 0; i < dlm_local_count; i++) { @@ -1008,13 +937,9 @@ make_sockaddr(&localaddr, port, &addr_len); if (!i) - result = kernel_bind(con->sock, - (struct sockaddr *)&localaddr, - addr_len); + result = kernel_bind(con->sock, addr, addr_len); else - result = kernel_setsockopt(con->sock, SOL_SCTP, - SCTP_SOCKOPT_BINDX_ADD, - (char *)&localaddr, addr_len); + result = sock_bind_add(con->sock->sk, addr, addr_len); if (result < 0) { log_print("Can't bind to %d addr number %d, %d.\n", @@ -1033,16 +958,17 @@ static void sctp_connect_to_sock(struct connection *con) { struct sockaddr_storage daddr; - int one = 1; int result; int addr_len; struct socket *sock; - struct timeval tv = { .tv_sec = 5, .tv_usec = 0 }; + unsigned int mark; if (con->nodeid == 0) { log_print("attempt to connect sock 0 foiled"); return; } + + dlm_comm_mark(con->nodeid, &mark); mutex_lock(&con->sock_mutex); @@ -1068,6 +994,8 @@ if (result < 0) goto socket_err; + sock_set_mark(sock->sk, mark); + con->rx_action = receive_from_sock; con->connect_action = sctp_connect_to_sock; add_sock(sock, con); @@ -1081,21 +1009,17 @@ log_print("connecting to %d", con->nodeid); /* Turn off Nagle's algorithm */ - kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, - sizeof(one)); + sctp_sock_set_nodelay(sock->sk); /* * Make sock->ops->connect() function return in specified time, * since O_NONBLOCK argument in connect() function does not work here, * then, we should restore the default value of this attribute. */ - kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, - sizeof(tv)); + sock_set_sndtimeo(sock->sk, 5); result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 0); - memset(&tv, 0, sizeof(tv)); - kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, - sizeof(tv)); + sock_set_sndtimeo(sock->sk, 0); if (result == -EINPROGRESS) result = 0; @@ -1134,13 +1058,15 @@ struct sockaddr_storage saddr, src_addr; int addr_len; struct socket *sock = NULL; - int one = 1; + unsigned int mark; int result; if (con->nodeid == 0) { log_print("attempt to connect sock 0 foiled"); return; } + + dlm_comm_mark(con->nodeid, &mark); mutex_lock(&con->sock_mutex); if (con->retries++ > MAX_CONNECT_RETRIES) @@ -1156,6 +1082,8 @@ if (result < 0) goto out_err; + sock_set_mark(sock->sk, mark); + memset(&saddr, 0, sizeof(saddr)); result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); if (result < 0) { @@ -1165,6 +1093,7 @@ con->rx_action = receive_from_sock; con->connect_action = tcp_connect_to_sock; + con->shutdown_action = dlm_tcp_shutdown; add_sock(sock, con); /* Bind to our cluster-known address connecting to avoid @@ -1183,8 +1112,7 @@ log_print("connecting to %d", con->nodeid); /* Turn off Nagle's algorithm */ - kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, - sizeof(one)); + tcp_sock_set_nodelay(sock->sk); result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, O_NONBLOCK); @@ -1226,7 +1154,6 @@ { struct socket *sock = NULL; int result = 0; - int one = 1; int addr_len; if (dlm_local_addr[0]->ss_family == AF_INET) @@ -1242,20 +1169,17 @@ goto create_out; } + sock_set_mark(sock->sk, dlm_config.ci_mark); + /* Turn off Nagle's algorithm */ - kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, - sizeof(one)); + tcp_sock_set_nodelay(sock->sk); - result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&one, sizeof(one)); + sock_set_reuseaddr(sock->sk); - if (result < 0) { - log_print("Failed to set SO_REUSEADDR on socket: %d", result); - } write_lock_bh(&sock->sk->sk_callback_lock); sock->sk->sk_user_data = con; save_listen_callbacks(sock); - con->rx_action = tcp_accept_from_sock; + con->rx_action = accept_from_sock; con->connect_action = tcp_connect_to_sock; write_unlock_bh(&sock->sk->sk_callback_lock); @@ -1269,11 +1193,7 @@ con->sock = NULL; goto create_out; } - result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, - (char *)&one, sizeof(one)); - if (result < 0) { - log_print("Set keepalive failed: %d", result); - } + sock_set_keepalive(sock->sk); result = sock->ops->listen(sock, 5); if (result < 0) { @@ -1305,14 +1225,20 @@ } } +static void deinit_local(void) +{ + int i; + + for (i = 0; i < dlm_local_count; i++) + kfree(dlm_local_addr[i]); +} + /* Initialise SCTP socket and bind to all interfaces */ static int sctp_listen_for_all(void) { struct socket *sock = NULL; int result = -EINVAL; struct connection *con = nodeid2con(0, GFP_NOFS); - int bufsize = NEEDED_RMEM; - int one = 1; if (!con) return -ENOMEM; @@ -1326,15 +1252,9 @@ goto out; } - result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, - (char *)&bufsize, sizeof(bufsize)); - if (result) - log_print("Error increasing buffer space on socket %d", result); - - result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, - sizeof(one)); - if (result < 0) - log_print("Could not set SCTP NODELAY error %d\n", result); + sock_set_rcvbuf(sock->sk, NEEDED_RMEM); + sock_set_mark(sock->sk, dlm_config.ci_mark); + sctp_sock_set_nodelay(sock->sk); write_lock_bh(&sock->sk->sk_callback_lock); /* Init con struct */ @@ -1342,7 +1262,7 @@ save_listen_callbacks(sock); con->sock = sock; con->sock->sk->sk_data_ready = lowcomms_data_ready; - con->rx_action = sctp_accept_from_sock; + con->rx_action = accept_from_sock; con->connect_action = sctp_connect_to_sock; write_unlock_bh(&sock->sk->sk_callback_lock); @@ -1545,7 +1465,7 @@ send_error: mutex_unlock(&con->sock_mutex); - close_connection(con, true, false, true); + close_connection(con, false, false, true); /* Requeue the send work. When the work daemon runs again, it will try a new connection, then call this function again. */ queue_work(send_workqueue, &con->swork); @@ -1621,13 +1541,6 @@ send_to_sock(con); } - -/* Discard all entries on the write queues */ -static void clean_writequeues(void) -{ - foreach_conn(clean_one_writequeue); -} - static void work_stop(void) { if (recv_workqueue) @@ -1677,26 +1590,40 @@ _stop_conn(con, true); } +static void shutdown_conn(struct connection *con) +{ + if (con->shutdown_action) + con->shutdown_action(con); +} + +static void connection_release(struct rcu_head *rcu) +{ + struct connection *con = container_of(rcu, struct connection, rcu); + + kfree(con->rx_buf); + kfree(con); +} + static void free_conn(struct connection *con) { close_connection(con, true, true, true); - if (con->othercon) - kmem_cache_free(con_cache, con->othercon); - hlist_del(&con->list); - kmem_cache_free(con_cache, con); + spin_lock(&connections_lock); + hlist_del_rcu(&con->list); + spin_unlock(&connections_lock); + if (con->othercon) { + clean_one_writequeue(con->othercon); + call_rcu(&con->othercon->rcu, connection_release); + } + clean_one_writequeue(con); + call_rcu(&con->rcu, connection_release); } static void work_flush(void) { - int ok; + int ok, idx; int i; - struct hlist_node *n; struct connection *con; - if (recv_workqueue) - flush_workqueue(recv_workqueue); - if (send_workqueue) - flush_workqueue(send_workqueue); do { ok = 1; foreach_conn(stop_conn); @@ -1704,9 +1631,10 @@ flush_workqueue(recv_workqueue); if (send_workqueue) flush_workqueue(send_workqueue); + idx = srcu_read_lock(&connections_srcu); for (i = 0; i < CONN_HASH_SIZE && ok; i++) { - hlist_for_each_entry_safe(con, n, - &connection_hash[i], list) { + hlist_for_each_entry_rcu(con, &connection_hash[i], + list) { ok &= test_bit(CF_READ_PENDING, &con->flags); ok &= test_bit(CF_WRITE_PENDING, &con->flags); if (con->othercon) { @@ -1717,6 +1645,7 @@ } } } + srcu_read_unlock(&connections_srcu, idx); } while (!ok); } @@ -1725,15 +1654,18 @@ /* Set all the flags to prevent any socket activity. */ - mutex_lock(&connections_lock); dlm_allow_conn = 0; - mutex_unlock(&connections_lock); + + if (recv_workqueue) + flush_workqueue(recv_workqueue); + if (send_workqueue) + flush_workqueue(send_workqueue); + + foreach_conn(shutdown_conn); work_flush(); - clean_writequeues(); foreach_conn(free_conn); work_stop(); - - kmem_cache_destroy(con_cache); + deinit_local(); } int dlm_lowcomms_start(void) @@ -1752,16 +1684,9 @@ goto fail; } - error = -ENOMEM; - con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), - __alignof__(struct connection), 0, - NULL); - if (!con_cache) - goto fail; - error = work_start(); if (error) - goto fail_destroy; + goto fail; dlm_allow_conn = 1; @@ -1778,12 +1703,8 @@ fail_unlisten: dlm_allow_conn = 0; con = nodeid2con(0,0); - if (con) { - close_connection(con, false, true, true); - kmem_cache_free(con_cache, con); - } -fail_destroy: - kmem_cache_destroy(con_cache); + if (con) + free_conn(con); fail: return error; } -- Gitblit v1.6.2