hc
2024-12-19 9370bb92b2d16684ee45cf24e879c93c509162da
kernel/net/rxrpc/sendmsg.c
....@@ -1,12 +1,8 @@
1
+// SPDX-License-Identifier: GPL-2.0-or-later
12 /* AF_RXRPC sendmsg() implementation.
23 *
34 * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
45 * Written by David Howells (dhowells@redhat.com)
5
- *
6
- * This program is free software; you can redistribute it and/or
7
- * modify it under the terms of the GNU General Public Licence
8
- * as published by the Free Software Foundation; either version
9
- * 2 of the Licence, or (at your option) any later version.
106 */
117
128 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
....@@ -22,6 +18,21 @@
2218 #include "ar-internal.h"
2319
2420 /*
21
+ * Return true if there's sufficient Tx queue space.
22
+ */
23
+static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
24
+{
25
+ unsigned int win_size =
26
+ min_t(unsigned int, call->tx_winsize,
27
+ call->cong_cwnd + call->cong_extra);
28
+ rxrpc_seq_t tx_win = READ_ONCE(call->tx_hard_ack);
29
+
30
+ if (_tx_win)
31
+ *_tx_win = tx_win;
32
+ return call->tx_top - tx_win < win_size;
33
+}
34
+
35
+/*
2536 * Wait for space to appear in the Tx queue or a signal to occur.
2637 */
2738 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
....@@ -30,9 +41,7 @@
3041 {
3142 for (;;) {
3243 set_current_state(TASK_INTERRUPTIBLE);
33
- if (call->tx_top - call->tx_hard_ack <
34
- min_t(unsigned int, call->tx_winsize,
35
- call->cong_cwnd + call->cong_extra))
44
+ if (rxrpc_check_tx_space(call, NULL))
3645 return 0;
3746
3847 if (call->state >= RXRPC_CALL_COMPLETE)
....@@ -42,10 +51,7 @@
4251 return sock_intr_errno(*timeo);
4352
4453 trace_rxrpc_transmit(call, rxrpc_transmit_wait);
45
- mutex_unlock(&call->user_mutex);
4654 *timeo = schedule_timeout(*timeo);
47
- if (mutex_lock_interruptible(&call->user_mutex) < 0)
48
- return sock_intr_errno(*timeo);
4955 }
5056 }
5157
....@@ -53,28 +59,25 @@
5359 * Wait for space to appear in the Tx queue uninterruptibly, but with
5460 * a timeout of 2*RTT if no progress was made and a signal occurred.
5561 */
56
-static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
62
+static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
5763 struct rxrpc_call *call)
5864 {
5965 rxrpc_seq_t tx_start, tx_win;
60
- signed long rtt2, timeout;
61
- u64 rtt;
66
+ signed long rtt, timeout;
6267
63
- rtt = READ_ONCE(call->peer->rtt);
64
- rtt2 = nsecs_to_jiffies64(rtt) * 2;
65
- if (rtt2 < 2)
66
- rtt2 = 2;
68
+ rtt = READ_ONCE(call->peer->srtt_us) >> 3;
69
+ rtt = usecs_to_jiffies(rtt) * 2;
70
+ if (rtt < 2)
71
+ rtt = 2;
6772
68
- timeout = rtt2;
73
+ timeout = rtt;
6974 tx_start = READ_ONCE(call->tx_hard_ack);
7075
7176 for (;;) {
7277 set_current_state(TASK_UNINTERRUPTIBLE);
7378
7479 tx_win = READ_ONCE(call->tx_hard_ack);
75
- if (call->tx_top - tx_win <
76
- min_t(unsigned int, call->tx_winsize,
77
- call->cong_cwnd + call->cong_extra))
80
+ if (rxrpc_check_tx_space(call, &tx_win))
7881 return 0;
7982
8083 if (call->state >= RXRPC_CALL_COMPLETE)
....@@ -85,12 +88,32 @@
8588 return -EINTR;
8689
8790 if (tx_win != tx_start) {
88
- timeout = rtt2;
91
+ timeout = rtt;
8992 tx_start = tx_win;
9093 }
9194
9295 trace_rxrpc_transmit(call, rxrpc_transmit_wait);
9396 timeout = schedule_timeout(timeout);
97
+ }
98
+}
99
+
100
+/*
101
+ * Wait for space to appear in the Tx queue uninterruptibly.
102
+ */
103
+static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
104
+ struct rxrpc_call *call,
105
+ long *timeo)
106
+{
107
+ for (;;) {
108
+ set_current_state(TASK_UNINTERRUPTIBLE);
109
+ if (rxrpc_check_tx_space(call, NULL))
110
+ return 0;
111
+
112
+ if (call->state >= RXRPC_CALL_COMPLETE)
113
+ return call->error;
114
+
115
+ trace_rxrpc_transmit(call, rxrpc_transmit_wait);
116
+ *timeo = schedule_timeout(*timeo);
94117 }
95118 }
96119
....@@ -111,10 +134,19 @@
111134
112135 add_wait_queue(&call->waitq, &myself);
113136
114
- if (waitall)
115
- ret = rxrpc_wait_for_tx_window_nonintr(rx, call);
116
- else
117
- ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
137
+ switch (call->interruptibility) {
138
+ case RXRPC_INTERRUPTIBLE:
139
+ if (waitall)
140
+ ret = rxrpc_wait_for_tx_window_waitall(rx, call);
141
+ else
142
+ ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
143
+ break;
144
+ case RXRPC_PREINTERRUPTIBLE:
145
+ case RXRPC_UNINTERRUPTIBLE:
146
+ default:
147
+ ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
148
+ break;
149
+ }
118150
119151 remove_wait_queue(&call->waitq, &myself);
120152 set_current_state(TASK_RUNNING);
....@@ -152,12 +184,13 @@
152184 }
153185
154186 /*
155
- * Queue a DATA packet for transmission, set the resend timeout and send the
156
- * packet immediately
187
+ * Queue a DATA packet for transmission, set the resend timeout and send
188
+ * the packet immediately. Returns the error from rxrpc_send_data_packet()
189
+ * in case the caller wants to do something with it.
157190 */
158
-static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
159
- struct sk_buff *skb, bool last,
160
- rxrpc_notify_end_tx_t notify_end_tx)
191
+static int rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
192
+ struct sk_buff *skb, bool last,
193
+ rxrpc_notify_end_tx_t notify_end_tx)
161194 {
162195 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
163196 unsigned long now;
....@@ -169,10 +202,8 @@
169202
170203 ASSERTCMP(seq, ==, call->tx_top + 1);
171204
172
- if (last) {
205
+ if (last)
173206 annotation |= RXRPC_TX_ANNO_LAST;
174
- set_bit(RXRPC_CALL_TX_LASTQ, &call->flags);
175
- }
176207
177208 /* We have to set the timestamp before queueing as the retransmit
178209 * algorithm can see the packet as soon as we queue it.
....@@ -180,7 +211,7 @@
180211 skb->tstamp = ktime_get_real();
181212
182213 ix = seq & RXRPC_RXTX_BUFF_MASK;
183
- rxrpc_get_skb(skb, rxrpc_skb_tx_got);
214
+ rxrpc_get_skb(skb, rxrpc_skb_got);
184215 call->rxtx_annotations[ix] = annotation;
185216 smp_wmb();
186217 call->rxtx_buffer[ix] = skb;
....@@ -207,7 +238,7 @@
207238 trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
208239 if (!last)
209240 break;
210
- /* Fall through */
241
+ fallthrough;
211242 case RXRPC_CALL_SERVER_SEND_REPLY:
212243 call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
213244 rxrpc_notify_end_tx(rx, call, notify_end_tx);
....@@ -227,33 +258,25 @@
227258 case -ENETUNREACH:
228259 case -EHOSTUNREACH:
229260 case -ECONNREFUSED:
230
- rxrpc_set_call_completion(call,
231
- RXRPC_CALL_LOCAL_ERROR,
261
+ rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
232262 0, ret);
233
- rxrpc_notify_socket(call);
234263 goto out;
235264 }
236265 _debug("need instant resend %d", ret);
237266 rxrpc_instant_resend(call, ix);
238267 } else {
239
- unsigned long now = jiffies, resend_at;
268
+ unsigned long now = jiffies;
269
+ unsigned long resend_at = now + call->peer->rto_j;
240270
241
- if (call->peer->rtt_usage > 1)
242
- resend_at = nsecs_to_jiffies(call->peer->rtt * 3 / 2);
243
- else
244
- resend_at = rxrpc_resend_timeout;
245
- if (resend_at < 1)
246
- resend_at = 1;
247
-
248
- resend_at += now;
249271 WRITE_ONCE(call->resend_at, resend_at);
250272 rxrpc_reduce_call_timer(call, resend_at, now,
251273 rxrpc_timer_set_for_send);
252274 }
253275
254276 out:
255
- rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
256
- _leave("");
277
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
278
+ _leave(" = %d", ret);
279
+ return ret;
257280 }
258281
259282 /*
....@@ -264,37 +287,48 @@
264287 static int rxrpc_send_data(struct rxrpc_sock *rx,
265288 struct rxrpc_call *call,
266289 struct msghdr *msg, size_t len,
267
- rxrpc_notify_end_tx_t notify_end_tx)
290
+ rxrpc_notify_end_tx_t notify_end_tx,
291
+ bool *_dropped_lock)
268292 {
269293 struct rxrpc_skb_priv *sp;
270294 struct sk_buff *skb;
271295 struct sock *sk = &rx->sk;
296
+ enum rxrpc_call_state state;
272297 long timeo;
273
- bool more;
274
- int ret, copied;
298
+ bool more = msg->msg_flags & MSG_MORE;
299
+ int ret, copied = 0;
275300
276301 timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
277302
278303 /* this should be in poll */
279304 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
280305
306
+reload:
307
+ ret = -EPIPE;
281308 if (sk->sk_shutdown & SEND_SHUTDOWN)
282
- return -EPIPE;
309
+ goto maybe_error;
310
+ state = READ_ONCE(call->state);
311
+ ret = -ESHUTDOWN;
312
+ if (state >= RXRPC_CALL_COMPLETE)
313
+ goto maybe_error;
314
+ ret = -EPROTO;
315
+ if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
316
+ state != RXRPC_CALL_SERVER_ACK_REQUEST &&
317
+ state != RXRPC_CALL_SERVER_SEND_REPLY)
318
+ goto maybe_error;
283319
284
- more = msg->msg_flags & MSG_MORE;
285
-
320
+ ret = -EMSGSIZE;
286321 if (call->tx_total_len != -1) {
287
- if (len > call->tx_total_len)
288
- return -EMSGSIZE;
289
- if (!more && len != call->tx_total_len)
290
- return -EMSGSIZE;
322
+ if (len - copied > call->tx_total_len)
323
+ goto maybe_error;
324
+ if (!more && len - copied != call->tx_total_len)
325
+ goto maybe_error;
291326 }
292327
293328 skb = call->tx_pending;
294329 call->tx_pending = NULL;
295
- rxrpc_see_skb(skb, rxrpc_skb_tx_seen);
330
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
296331
297
- copied = 0;
298332 do {
299333 /* Check to see if there's a ping ACK to reply to. */
300334 if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE)
....@@ -305,18 +339,8 @@
305339
306340 _debug("alloc");
307341
308
- if (call->tx_top - call->tx_hard_ack >=
309
- min_t(unsigned int, call->tx_winsize,
310
- call->cong_cwnd + call->cong_extra)) {
311
- ret = -EAGAIN;
312
- if (msg->msg_flags & MSG_DONTWAIT)
313
- goto maybe_error;
314
- ret = rxrpc_wait_for_tx_window(rx, call,
315
- &timeo,
316
- msg->msg_flags & MSG_WAITALL);
317
- if (ret < 0)
318
- goto maybe_error;
319
- }
342
+ if (!rxrpc_check_tx_space(call, NULL))
343
+ goto wait_for_space;
320344
321345 max = RXRPC_JUMBO_DATALEN;
322346 max -= call->conn->security_size;
....@@ -339,7 +363,9 @@
339363 if (!skb)
340364 goto maybe_error;
341365
342
- rxrpc_new_skb(skb, rxrpc_skb_tx_new);
366
+ sp = rxrpc_skb(skb);
367
+ sp->rx_flags |= RXRPC_SKB_TX_BUFFER;
368
+ rxrpc_new_skb(skb, rxrpc_skb_new);
343369
344370 _debug("ALLOC SEND %p", skb);
345371
....@@ -349,7 +375,6 @@
349375 skb_reserve(skb, call->conn->security_size);
350376 skb->len += call->conn->security_size;
351377
352
- sp = rxrpc_skb(skb);
353378 sp->remain = chunk;
354379 if (sp->remain > skb_tailroom(skb))
355380 sp->remain = skb_tailroom(skb);
....@@ -387,6 +412,11 @@
387412 call->tx_total_len -= copy;
388413 }
389414
415
+ /* check for the far side aborting the call or a network error
416
+ * occurring */
417
+ if (call->state == RXRPC_CALL_COMPLETE)
418
+ goto call_terminated;
419
+
390420 /* add the packet to the send queue if it's now full */
391421 if (sp->remain <= 0 ||
392422 (msg_data_left(msg) == 0 && !more)) {
....@@ -416,34 +446,36 @@
416446 call->tx_winsize)
417447 sp->hdr.flags |= RXRPC_MORE_PACKETS;
418448
419
- ret = conn->security->secure_packet(
449
+ ret = call->security->secure_packet(
420450 call, skb, skb->mark, skb->head);
421451 if (ret < 0)
422452 goto out;
423453
424
- rxrpc_queue_packet(rx, call, skb,
425
- !msg_data_left(msg) && !more,
426
- notify_end_tx);
454
+ ret = rxrpc_queue_packet(rx, call, skb,
455
+ !msg_data_left(msg) && !more,
456
+ notify_end_tx);
457
+ /* Should check for failure here */
427458 skb = NULL;
428
- }
429
-
430
- /* Check for the far side aborting the call or a network error
431
- * occurring. If this happens, save any packet that was under
432
- * construction so that in the case of a network error, the
433
- * call can be retried or redirected.
434
- */
435
- if (call->state == RXRPC_CALL_COMPLETE) {
436
- ret = call->error;
437
- goto out;
438459 }
439460 } while (msg_data_left(msg) > 0);
440461
441462 success:
442463 ret = copied;
464
+ if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) {
465
+ read_lock_bh(&call->state_lock);
466
+ if (call->error < 0)
467
+ ret = call->error;
468
+ read_unlock_bh(&call->state_lock);
469
+ }
443470 out:
444471 call->tx_pending = skb;
445472 _leave(" = %d", ret);
446473 return ret;
474
+
475
+call_terminated:
476
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
477
+ _leave(" = %d", call->error);
478
+ return call->error;
447479
448480 maybe_error:
449481 if (copied)
....@@ -453,6 +485,27 @@
453485 efault:
454486 ret = -EFAULT;
455487 goto out;
488
+
489
+wait_for_space:
490
+ ret = -EAGAIN;
491
+ if (msg->msg_flags & MSG_DONTWAIT)
492
+ goto maybe_error;
493
+ mutex_unlock(&call->user_mutex);
494
+ *_dropped_lock = true;
495
+ ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
496
+ msg->msg_flags & MSG_WAITALL);
497
+ if (ret < 0)
498
+ goto maybe_error;
499
+ if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
500
+ if (mutex_lock_interruptible(&call->user_mutex) < 0) {
501
+ ret = sock_intr_errno(timeo);
502
+ goto maybe_error;
503
+ }
504
+ } else {
505
+ mutex_lock(&call->user_mutex);
506
+ }
507
+ *_dropped_lock = false;
508
+ goto reload;
456509 }
457510
458511 /*
....@@ -504,10 +557,10 @@
504557 return -EINVAL;
505558 break;
506559
507
- case RXRPC_ACCEPT:
560
+ case RXRPC_CHARGE_ACCEPT:
508561 if (p->command != RXRPC_CMD_SEND_DATA)
509562 return -EINVAL;
510
- p->command = RXRPC_CMD_ACCEPT;
563
+ p->command = RXRPC_CMD_CHARGE_ACCEPT;
511564 if (len != 0)
512565 return -EINVAL;
513566 break;
....@@ -614,12 +667,14 @@
614667 enum rxrpc_call_state state;
615668 struct rxrpc_call *call;
616669 unsigned long now, j;
670
+ bool dropped_lock = false;
617671 int ret;
618672
619673 struct rxrpc_send_params p = {
620674 .call.tx_total_len = -1,
621675 .call.user_call_ID = 0,
622676 .call.nr_timeouts = 0,
677
+ .call.interruptibility = RXRPC_INTERRUPTIBLE,
623678 .abort_code = 0,
624679 .command = RXRPC_CMD_SEND_DATA,
625680 .exclusive = false,
....@@ -632,16 +687,12 @@
632687 if (ret < 0)
633688 goto error_release_sock;
634689
635
- if (p.command == RXRPC_CMD_ACCEPT) {
690
+ if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
636691 ret = -EINVAL;
637692 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
638693 goto error_release_sock;
639
- call = rxrpc_accept_call(rx, p.call.user_call_ID, NULL);
640
- /* The socket is now unlocked. */
641
- if (IS_ERR(call))
642
- return PTR_ERR(call);
643
- ret = 0;
644
- goto out_put_unlock;
694
+ ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
695
+ goto error_release_sock;
645696 }
646697
647698 call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
....@@ -663,7 +714,6 @@
663714 case RXRPC_CALL_CLIENT_AWAIT_CONN:
664715 case RXRPC_CALL_SERVER_PREALLOC:
665716 case RXRPC_CALL_SERVER_SECURING:
666
- case RXRPC_CALL_SERVER_ACCEPTING:
667717 rxrpc_put_call(call, rxrpc_call_put);
668718 ret = -EBUSY;
669719 goto error_release_sock;
....@@ -683,7 +733,7 @@
683733 if (call->tx_total_len != -1 ||
684734 call->tx_pending ||
685735 call->tx_top != 0)
686
- goto error_put;
736
+ goto out_put_unlock;
687737 call->tx_total_len = p.call.tx_total_len;
688738 }
689739 }
....@@ -694,16 +744,16 @@
694744 if (p.call.timeouts.normal > 0 && j == 0)
695745 j = 1;
696746 WRITE_ONCE(call->next_rx_timo, j);
697
- /* Fall through */
747
+ fallthrough;
698748 case 2:
699749 j = msecs_to_jiffies(p.call.timeouts.idle);
700750 if (p.call.timeouts.idle > 0 && j == 0)
701751 j = 1;
702752 WRITE_ONCE(call->next_req_timo, j);
703
- /* Fall through */
753
+ fallthrough;
704754 case 1:
705755 if (p.call.timeouts.hard > 0) {
706
- j = msecs_to_jiffies(p.call.timeouts.hard);
756
+ j = p.call.timeouts.hard * HZ;
707757 now = jiffies;
708758 j += now;
709759 WRITE_ONCE(call->expect_term_by, j);
....@@ -726,21 +776,13 @@
726776 ret = rxrpc_send_abort_packet(call);
727777 } else if (p.command != RXRPC_CMD_SEND_DATA) {
728778 ret = -EINVAL;
729
- } else if (rxrpc_is_client_call(call) &&
730
- state != RXRPC_CALL_CLIENT_SEND_REQUEST) {
731
- /* request phase complete for this client call */
732
- ret = -EPROTO;
733
- } else if (rxrpc_is_service_call(call) &&
734
- state != RXRPC_CALL_SERVER_ACK_REQUEST &&
735
- state != RXRPC_CALL_SERVER_SEND_REPLY) {
736
- /* Reply phase not begun or not complete for service call. */
737
- ret = -EPROTO;
738779 } else {
739
- ret = rxrpc_send_data(rx, call, msg, len, NULL);
780
+ ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
740781 }
741782
742783 out_put_unlock:
743
- mutex_unlock(&call->user_mutex);
784
+ if (!dropped_lock)
785
+ mutex_unlock(&call->user_mutex);
744786 error_put:
745787 rxrpc_put_call(call, rxrpc_call_put);
746788 _leave(" = %d", ret);
....@@ -768,6 +810,7 @@
768810 struct msghdr *msg, size_t len,
769811 rxrpc_notify_end_tx_t notify_end_tx)
770812 {
813
+ bool dropped_lock = false;
771814 int ret;
772815
773816 _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
....@@ -785,7 +828,7 @@
785828 case RXRPC_CALL_SERVER_ACK_REQUEST:
786829 case RXRPC_CALL_SERVER_SEND_REPLY:
787830 ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
788
- notify_end_tx);
831
+ notify_end_tx, &dropped_lock);
789832 break;
790833 case RXRPC_CALL_COMPLETE:
791834 read_lock_bh(&call->state_lock);
....@@ -799,7 +842,8 @@
799842 break;
800843 }
801844
802
- mutex_unlock(&call->user_mutex);
845
+ if (!dropped_lock)
846
+ mutex_unlock(&call->user_mutex);
803847 _leave(" = %d", ret);
804848 return ret;
805849 }