hc
2024-10-22 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5
kernel/net/sunrpc/xprtsock.c
....@@ -47,12 +47,15 @@
4747 #include <net/checksum.h>
4848 #include <net/udp.h>
4949 #include <net/tcp.h>
50
+#include <linux/bvec.h>
51
+#include <linux/highmem.h>
52
+#include <linux/uio.h>
53
+#include <linux/sched/mm.h>
5054
5155 #include <trace/events/sunrpc.h>
5256
57
+#include "socklib.h"
5358 #include "sunrpc.h"
54
-
55
-#define RPC_TCP_READ_CHUNK_SZ (3*512*1024)
5659
5760 static void xs_close(struct rpc_xprt *xprt);
5861 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
....@@ -67,8 +70,6 @@
6770
6871 static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
6972 static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
70
-
71
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
7273
7374 #define XS_TCP_LINGER_TO (15U * HZ)
7475 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
....@@ -158,8 +159,6 @@
158159 },
159160 { },
160161 };
161
-
162
-#endif
163162
164163 /*
165164 * Wait duration for a reply from the RPC portmapper.
....@@ -325,160 +324,452 @@
325324 }
326325 }
327326
327
+static size_t
328
+xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
329
+{
330
+ size_t i,n;
331
+
332
+ if (!want || !(buf->flags & XDRBUF_SPARSE_PAGES))
333
+ return want;
334
+ n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
335
+ for (i = 0; i < n; i++) {
336
+ if (buf->pages[i])
337
+ continue;
338
+ buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
339
+ if (!buf->pages[i]) {
340
+ i *= PAGE_SIZE;
341
+ return i > buf->page_base ? i - buf->page_base : 0;
342
+ }
343
+ }
344
+ return want;
345
+}
346
+
347
+static ssize_t
348
+xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
349
+{
350
+ ssize_t ret;
351
+ if (seek != 0)
352
+ iov_iter_advance(&msg->msg_iter, seek);
353
+ ret = sock_recvmsg(sock, msg, flags);
354
+ return ret > 0 ? ret + seek : ret;
355
+}
356
+
357
+static ssize_t
358
+xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
359
+ struct kvec *kvec, size_t count, size_t seek)
360
+{
361
+ iov_iter_kvec(&msg->msg_iter, READ, kvec, 1, count);
362
+ return xs_sock_recvmsg(sock, msg, flags, seek);
363
+}
364
+
365
+static ssize_t
366
+xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
367
+ struct bio_vec *bvec, unsigned long nr, size_t count,
368
+ size_t seek)
369
+{
370
+ iov_iter_bvec(&msg->msg_iter, READ, bvec, nr, count);
371
+ return xs_sock_recvmsg(sock, msg, flags, seek);
372
+}
373
+
374
+static ssize_t
375
+xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
376
+ size_t count)
377
+{
378
+ iov_iter_discard(&msg->msg_iter, READ, count);
379
+ return sock_recvmsg(sock, msg, flags);
380
+}
381
+
382
+#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE
383
+static void
384
+xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek)
385
+{
386
+ struct bvec_iter bi = {
387
+ .bi_size = count,
388
+ };
389
+ struct bio_vec bv;
390
+
391
+ bvec_iter_advance(bvec, &bi, seek & PAGE_MASK);
392
+ for_each_bvec(bv, bvec, bi, bi)
393
+ flush_dcache_page(bv.bv_page);
394
+}
395
+#else
396
+static inline void
397
+xs_flush_bvec(const struct bio_vec *bvec, size_t count, size_t seek)
398
+{
399
+}
400
+#endif
401
+
402
+static ssize_t
403
+xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
404
+ struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
405
+{
406
+ size_t want, seek_init = seek, offset = 0;
407
+ ssize_t ret;
408
+
409
+ want = min_t(size_t, count, buf->head[0].iov_len);
410
+ if (seek < want) {
411
+ ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
412
+ if (ret <= 0)
413
+ goto sock_err;
414
+ offset += ret;
415
+ if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
416
+ goto out;
417
+ if (ret != want)
418
+ goto out;
419
+ seek = 0;
420
+ } else {
421
+ seek -= want;
422
+ offset += want;
423
+ }
424
+
425
+ want = xs_alloc_sparse_pages(buf,
426
+ min_t(size_t, count - offset, buf->page_len),
427
+ GFP_KERNEL);
428
+ if (seek < want) {
429
+ ret = xs_read_bvec(sock, msg, flags, buf->bvec,
430
+ xdr_buf_pagecount(buf),
431
+ want + buf->page_base,
432
+ seek + buf->page_base);
433
+ if (ret <= 0)
434
+ goto sock_err;
435
+ xs_flush_bvec(buf->bvec, ret, seek + buf->page_base);
436
+ ret -= buf->page_base;
437
+ offset += ret;
438
+ if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
439
+ goto out;
440
+ if (ret != want)
441
+ goto out;
442
+ seek = 0;
443
+ } else {
444
+ seek -= want;
445
+ offset += want;
446
+ }
447
+
448
+ want = min_t(size_t, count - offset, buf->tail[0].iov_len);
449
+ if (seek < want) {
450
+ ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
451
+ if (ret <= 0)
452
+ goto sock_err;
453
+ offset += ret;
454
+ if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
455
+ goto out;
456
+ if (ret != want)
457
+ goto out;
458
+ } else if (offset < seek_init)
459
+ offset = seek_init;
460
+ ret = -EMSGSIZE;
461
+out:
462
+ *read = offset - seek_init;
463
+ return ret;
464
+sock_err:
465
+ offset += seek;
466
+ goto out;
467
+}
468
+
469
+static void
470
+xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
471
+{
472
+ if (!transport->recv.copied) {
473
+ if (buf->head[0].iov_len >= transport->recv.offset)
474
+ memcpy(buf->head[0].iov_base,
475
+ &transport->recv.xid,
476
+ transport->recv.offset);
477
+ transport->recv.copied = transport->recv.offset;
478
+ }
479
+}
480
+
481
+static bool
482
+xs_read_stream_request_done(struct sock_xprt *transport)
483
+{
484
+ return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
485
+}
486
+
487
+static void
488
+xs_read_stream_check_eor(struct sock_xprt *transport,
489
+ struct msghdr *msg)
490
+{
491
+ if (xs_read_stream_request_done(transport))
492
+ msg->msg_flags |= MSG_EOR;
493
+}
494
+
495
+static ssize_t
496
+xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
497
+ int flags, struct rpc_rqst *req)
498
+{
499
+ struct xdr_buf *buf = &req->rq_private_buf;
500
+ size_t want, read;
501
+ ssize_t ret;
502
+
503
+ xs_read_header(transport, buf);
504
+
505
+ want = transport->recv.len - transport->recv.offset;
506
+ if (want != 0) {
507
+ ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
508
+ transport->recv.copied + want,
509
+ transport->recv.copied,
510
+ &read);
511
+ transport->recv.offset += read;
512
+ transport->recv.copied += read;
513
+ }
514
+
515
+ if (transport->recv.offset == transport->recv.len)
516
+ xs_read_stream_check_eor(transport, msg);
517
+
518
+ if (want == 0)
519
+ return 0;
520
+
521
+ switch (ret) {
522
+ default:
523
+ break;
524
+ case -EFAULT:
525
+ case -EMSGSIZE:
526
+ msg->msg_flags |= MSG_TRUNC;
527
+ return read;
528
+ case 0:
529
+ return -ESHUTDOWN;
530
+ }
531
+ return ret < 0 ? ret : read;
532
+}
533
+
534
+static size_t
535
+xs_read_stream_headersize(bool isfrag)
536
+{
537
+ if (isfrag)
538
+ return sizeof(__be32);
539
+ return 3 * sizeof(__be32);
540
+}
541
+
542
+static ssize_t
543
+xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
544
+ int flags, size_t want, size_t seek)
545
+{
546
+ struct kvec kvec = {
547
+ .iov_base = &transport->recv.fraghdr,
548
+ .iov_len = want,
549
+ };
550
+ return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
551
+}
552
+
553
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
554
+static ssize_t
555
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
556
+{
557
+ struct rpc_xprt *xprt = &transport->xprt;
558
+ struct rpc_rqst *req;
559
+ ssize_t ret;
560
+
561
+ /* Look up and lock the request corresponding to the given XID */
562
+ req = xprt_lookup_bc_request(xprt, transport->recv.xid);
563
+ if (!req) {
564
+ printk(KERN_WARNING "Callback slot table overflowed\n");
565
+ return -ESHUTDOWN;
566
+ }
567
+ if (transport->recv.copied && !req->rq_private_buf.len)
568
+ return -ESHUTDOWN;
569
+
570
+ ret = xs_read_stream_request(transport, msg, flags, req);
571
+ if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
572
+ xprt_complete_bc_request(req, transport->recv.copied);
573
+ else
574
+ req->rq_private_buf.len = transport->recv.copied;
575
+
576
+ return ret;
577
+}
578
+#else /* CONFIG_SUNRPC_BACKCHANNEL */
579
+static ssize_t
580
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
581
+{
582
+ return -ESHUTDOWN;
583
+}
584
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
585
+
586
+static ssize_t
587
+xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
588
+{
589
+ struct rpc_xprt *xprt = &transport->xprt;
590
+ struct rpc_rqst *req;
591
+ ssize_t ret = 0;
592
+
593
+ /* Look up and lock the request corresponding to the given XID */
594
+ spin_lock(&xprt->queue_lock);
595
+ req = xprt_lookup_rqst(xprt, transport->recv.xid);
596
+ if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
597
+ msg->msg_flags |= MSG_TRUNC;
598
+ goto out;
599
+ }
600
+ xprt_pin_rqst(req);
601
+ spin_unlock(&xprt->queue_lock);
602
+
603
+ ret = xs_read_stream_request(transport, msg, flags, req);
604
+
605
+ spin_lock(&xprt->queue_lock);
606
+ if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
607
+ xprt_complete_rqst(req->rq_task, transport->recv.copied);
608
+ else
609
+ req->rq_private_buf.len = transport->recv.copied;
610
+ xprt_unpin_rqst(req);
611
+out:
612
+ spin_unlock(&xprt->queue_lock);
613
+ return ret;
614
+}
615
+
616
+static ssize_t
617
+xs_read_stream(struct sock_xprt *transport, int flags)
618
+{
619
+ struct msghdr msg = { 0 };
620
+ size_t want, read = 0;
621
+ ssize_t ret = 0;
622
+
623
+ if (transport->recv.len == 0) {
624
+ want = xs_read_stream_headersize(transport->recv.copied != 0);
625
+ ret = xs_read_stream_header(transport, &msg, flags, want,
626
+ transport->recv.offset);
627
+ if (ret <= 0)
628
+ goto out_err;
629
+ transport->recv.offset = ret;
630
+ if (transport->recv.offset != want)
631
+ return transport->recv.offset;
632
+ transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
633
+ RPC_FRAGMENT_SIZE_MASK;
634
+ transport->recv.offset -= sizeof(transport->recv.fraghdr);
635
+ read = ret;
636
+ }
637
+
638
+ switch (be32_to_cpu(transport->recv.calldir)) {
639
+ default:
640
+ msg.msg_flags |= MSG_TRUNC;
641
+ break;
642
+ case RPC_CALL:
643
+ ret = xs_read_stream_call(transport, &msg, flags);
644
+ break;
645
+ case RPC_REPLY:
646
+ ret = xs_read_stream_reply(transport, &msg, flags);
647
+ }
648
+ if (msg.msg_flags & MSG_TRUNC) {
649
+ transport->recv.calldir = cpu_to_be32(-1);
650
+ transport->recv.copied = -1;
651
+ }
652
+ if (ret < 0)
653
+ goto out_err;
654
+ read += ret;
655
+ if (transport->recv.offset < transport->recv.len) {
656
+ if (!(msg.msg_flags & MSG_TRUNC))
657
+ return read;
658
+ msg.msg_flags = 0;
659
+ ret = xs_read_discard(transport->sock, &msg, flags,
660
+ transport->recv.len - transport->recv.offset);
661
+ if (ret <= 0)
662
+ goto out_err;
663
+ transport->recv.offset += ret;
664
+ read += ret;
665
+ if (transport->recv.offset != transport->recv.len)
666
+ return read;
667
+ }
668
+ if (xs_read_stream_request_done(transport)) {
669
+ trace_xs_stream_read_request(transport);
670
+ transport->recv.copied = 0;
671
+ }
672
+ transport->recv.offset = 0;
673
+ transport->recv.len = 0;
674
+ return read;
675
+out_err:
676
+ return ret != 0 ? ret : -ESHUTDOWN;
677
+}
678
+
679
+static __poll_t xs_poll_socket(struct sock_xprt *transport)
680
+{
681
+ return transport->sock->ops->poll(transport->file, transport->sock,
682
+ NULL);
683
+}
684
+
685
+static bool xs_poll_socket_readable(struct sock_xprt *transport)
686
+{
687
+ __poll_t events = xs_poll_socket(transport);
688
+
689
+ return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
690
+}
691
+
692
+static void xs_poll_check_readable(struct sock_xprt *transport)
693
+{
694
+
695
+ clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
696
+ if (!xs_poll_socket_readable(transport))
697
+ return;
698
+ if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
699
+ queue_work(xprtiod_workqueue, &transport->recv_worker);
700
+}
701
+
702
+static void xs_stream_data_receive(struct sock_xprt *transport)
703
+{
704
+ size_t read = 0;
705
+ ssize_t ret = 0;
706
+
707
+ mutex_lock(&transport->recv_mutex);
708
+ if (transport->sock == NULL)
709
+ goto out;
710
+ for (;;) {
711
+ ret = xs_read_stream(transport, MSG_DONTWAIT);
712
+ if (ret < 0)
713
+ break;
714
+ read += ret;
715
+ cond_resched();
716
+ }
717
+ if (ret == -ESHUTDOWN)
718
+ kernel_sock_shutdown(transport->sock, SHUT_RDWR);
719
+ else
720
+ xs_poll_check_readable(transport);
721
+out:
722
+ mutex_unlock(&transport->recv_mutex);
723
+ trace_xs_stream_read_data(&transport->xprt, ret, read);
724
+}
725
+
726
+static void xs_stream_data_receive_workfn(struct work_struct *work)
727
+{
728
+ struct sock_xprt *transport =
729
+ container_of(work, struct sock_xprt, recv_worker);
730
+ unsigned int pflags = memalloc_nofs_save();
731
+
732
+ xs_stream_data_receive(transport);
733
+ memalloc_nofs_restore(pflags);
734
+}
735
+
736
+static void
737
+xs_stream_reset_connect(struct sock_xprt *transport)
738
+{
739
+ transport->recv.offset = 0;
740
+ transport->recv.len = 0;
741
+ transport->recv.copied = 0;
742
+ transport->xmit.offset = 0;
743
+}
744
+
745
+static void
746
+xs_stream_start_connect(struct sock_xprt *transport)
747
+{
748
+ transport->xprt.stat.connect_count++;
749
+ transport->xprt.stat.connect_start = jiffies;
750
+}
751
+
328752 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
329753
330
-static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
331
-{
332
- struct msghdr msg = {
333
- .msg_name = addr,
334
- .msg_namelen = addrlen,
335
- .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
336
- };
337
- struct kvec iov = {
338
- .iov_base = vec->iov_base + base,
339
- .iov_len = vec->iov_len - base,
340
- };
341
-
342
- if (iov.iov_len != 0)
343
- return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
344
- return kernel_sendmsg(sock, &msg, NULL, 0, 0);
345
-}
346
-
347
-static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
348
-{
349
- ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
350
- int offset, size_t size, int flags);
351
- struct page **ppage;
352
- unsigned int remainder;
353
- int err;
354
-
355
- remainder = xdr->page_len - base;
356
- base += xdr->page_base;
357
- ppage = xdr->pages + (base >> PAGE_SHIFT);
358
- base &= ~PAGE_MASK;
359
- do_sendpage = sock->ops->sendpage;
360
- if (!zerocopy)
361
- do_sendpage = sock_no_sendpage;
362
- for(;;) {
363
- unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
364
- int flags = XS_SENDMSG_FLAGS;
365
-
366
- remainder -= len;
367
- if (more)
368
- flags |= MSG_MORE;
369
- if (remainder != 0)
370
- flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
371
- err = do_sendpage(sock, *ppage, base, len, flags);
372
- if (remainder == 0 || err != len)
373
- break;
374
- *sent_p += err;
375
- ppage++;
376
- base = 0;
377
- }
378
- if (err > 0) {
379
- *sent_p += err;
380
- err = 0;
381
- }
382
- return err;
383
-}
384
-
385754 /**
386
- * xs_sendpages - write pages directly to a socket
387
- * @sock: socket to send on
388
- * @addr: UDP only -- address of destination
389
- * @addrlen: UDP only -- length of destination address
390
- * @xdr: buffer containing this request
391
- * @base: starting position in the buffer
392
- * @zerocopy: true if it is safe to use sendpage()
393
- * @sent_p: return the total number of bytes successfully queued for sending
755
+ * xs_nospace - handle transmit was incomplete
756
+ * @req: pointer to RPC request
757
+ * @transport: pointer to struct sock_xprt
394758 *
395759 */
396
-static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p)
760
+static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
397761 {
398
- unsigned int remainder = xdr->len - base;
399
- int err = 0;
400
- int sent = 0;
401
-
402
- if (unlikely(!sock))
403
- return -ENOTSOCK;
404
-
405
- if (base != 0) {
406
- addr = NULL;
407
- addrlen = 0;
408
- }
409
-
410
- if (base < xdr->head[0].iov_len || addr != NULL) {
411
- unsigned int len = xdr->head[0].iov_len - base;
412
- remainder -= len;
413
- err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
414
- if (remainder == 0 || err != len)
415
- goto out;
416
- *sent_p += err;
417
- base = 0;
418
- } else
419
- base -= xdr->head[0].iov_len;
420
-
421
- if (base < xdr->page_len) {
422
- unsigned int len = xdr->page_len - base;
423
- remainder -= len;
424
- err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent);
425
- *sent_p += sent;
426
- if (remainder == 0 || sent != len)
427
- goto out;
428
- base = 0;
429
- } else
430
- base -= xdr->page_len;
431
-
432
- if (base >= xdr->tail[0].iov_len)
433
- return 0;
434
- err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
435
-out:
436
- if (err > 0) {
437
- *sent_p += err;
438
- err = 0;
439
- }
440
- return err;
441
-}
442
-
443
-static void xs_nospace_callback(struct rpc_task *task)
444
-{
445
- struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
446
-
447
- transport->inet->sk_write_pending--;
448
-}
449
-
450
-/**
451
- * xs_nospace - place task on wait queue if transmit was incomplete
452
- * @task: task to put to sleep
453
- *
454
- */
455
-static int xs_nospace(struct rpc_task *task)
456
-{
457
- struct rpc_rqst *req = task->tk_rqstp;
458
- struct rpc_xprt *xprt = req->rq_xprt;
459
- struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
762
+ struct rpc_xprt *xprt = &transport->xprt;
460763 struct sock *sk = transport->inet;
461764 int ret = -EAGAIN;
462765
463
- dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
464
- task->tk_pid, req->rq_slen - req->rq_bytes_sent,
465
- req->rq_slen);
766
+ trace_rpc_socket_nospace(req, transport);
466767
467768 /* Protect against races with write_space */
468
- spin_lock_bh(&xprt->transport_lock);
769
+ spin_lock(&xprt->transport_lock);
469770
470771 /* Don't race with disconnect */
471772 if (xprt_connected(xprt)) {
472
- /* wait for more buffer space */
473
- sk->sk_write_pending++;
474
- xprt_wait_for_buffer_space(task, xs_nospace_callback);
475
- } else
476
- ret = -ENOTCONN;
477
-
478
- spin_unlock_bh(&xprt->transport_lock);
479
-
480
- /* Race breaker in case memory is freed before above code is called */
481
- if (ret == -EAGAIN) {
482773 struct socket_wq *wq;
483774
484775 rcu_read_lock();
....@@ -486,24 +777,76 @@
486777 set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags);
487778 rcu_read_unlock();
488779
489
- sk->sk_write_space(sk);
490
- }
780
+ /* wait for more buffer space */
781
+ set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
782
+ sk->sk_write_pending++;
783
+ xprt_wait_for_buffer_space(xprt);
784
+ } else
785
+ ret = -ENOTCONN;
786
+
787
+ spin_unlock(&xprt->transport_lock);
491788 return ret;
492789 }
493790
494
-/*
495
- * Construct a stream transport record marker in @buf.
496
- */
497
-static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
791
+static int xs_sock_nospace(struct rpc_rqst *req)
498792 {
499
- u32 reclen = buf->len - sizeof(rpc_fraghdr);
500
- rpc_fraghdr *base = buf->head[0].iov_base;
501
- *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
793
+ struct sock_xprt *transport =
794
+ container_of(req->rq_xprt, struct sock_xprt, xprt);
795
+ struct sock *sk = transport->inet;
796
+ int ret = -EAGAIN;
797
+
798
+ lock_sock(sk);
799
+ if (!sock_writeable(sk))
800
+ ret = xs_nospace(req, transport);
801
+ release_sock(sk);
802
+ return ret;
803
+}
804
+
805
+static int xs_stream_nospace(struct rpc_rqst *req)
806
+{
807
+ struct sock_xprt *transport =
808
+ container_of(req->rq_xprt, struct sock_xprt, xprt);
809
+ struct sock *sk = transport->inet;
810
+ int ret = -EAGAIN;
811
+
812
+ lock_sock(sk);
813
+ if (!sk_stream_memory_free(sk))
814
+ ret = xs_nospace(req, transport);
815
+ release_sock(sk);
816
+ return ret;
817
+}
818
+
819
+static void
820
+xs_stream_prepare_request(struct rpc_rqst *req)
821
+{
822
+ xdr_free_bvec(&req->rq_rcv_buf);
823
+ req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL);
824
+}
825
+
826
+/*
827
+ * Determine if the previous message in the stream was aborted before it
828
+ * could complete transmission.
829
+ */
830
+static bool
831
+xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
832
+{
833
+ return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
834
+}
835
+
836
+/*
837
+ * Return the stream record marker field for a record of length < 2^31-1
838
+ */
839
+static rpc_fraghdr
840
+xs_stream_record_marker(struct xdr_buf *xdr)
841
+{
842
+ if (!xdr->len)
843
+ return 0;
844
+ return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len);
502845 }
503846
504847 /**
505848 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
506
- * @task: RPC task that manages the state of an RPC request
849
+ * @req: pointer to RPC request
507850 *
508851 * Return values:
509852 * 0: The request has been sent
....@@ -512,35 +855,44 @@
512855 * ENOTCONN: Caller needs to invoke connect logic then call again
513856 * other: Some other error occured, the request was not sent
514857 */
515
-static int xs_local_send_request(struct rpc_task *task)
858
+static int xs_local_send_request(struct rpc_rqst *req)
516859 {
517
- struct rpc_rqst *req = task->tk_rqstp;
518860 struct rpc_xprt *xprt = req->rq_xprt;
519861 struct sock_xprt *transport =
520862 container_of(xprt, struct sock_xprt, xprt);
521863 struct xdr_buf *xdr = &req->rq_snd_buf;
864
+ rpc_fraghdr rm = xs_stream_record_marker(xdr);
865
+ unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
866
+ struct msghdr msg = {
867
+ .msg_flags = XS_SENDMSG_FLAGS,
868
+ };
869
+ unsigned int sent;
522870 int status;
523
- int sent = 0;
524871
525
- xs_encode_stream_record_marker(&req->rq_snd_buf);
872
+ /* Close the stream if the previous transmission was incomplete */
873
+ if (xs_send_request_was_aborted(transport, req)) {
874
+ xprt_force_disconnect(xprt);
875
+ return -ENOTCONN;
876
+ }
526877
527878 xs_pktdump("packet data:",
528879 req->rq_svec->iov_base, req->rq_svec->iov_len);
529880
530881 req->rq_xtime = ktime_get();
531
- status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
532
- true, &sent);
882
+ status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
883
+ transport->xmit.offset, rm, &sent);
533884 dprintk("RPC: %s(%u) = %d\n",
534
- __func__, xdr->len - req->rq_bytes_sent, status);
885
+ __func__, xdr->len - transport->xmit.offset, status);
535886
536887 if (status == -EAGAIN && sock_writeable(transport->inet))
537888 status = -ENOBUFS;
538889
539890 if (likely(sent > 0) || status == 0) {
540
- req->rq_bytes_sent += sent;
541
- req->rq_xmit_bytes_sent += sent;
542
- if (likely(req->rq_bytes_sent >= req->rq_slen)) {
543
- req->rq_bytes_sent = 0;
891
+ transport->xmit.offset += sent;
892
+ req->rq_bytes_sent = transport->xmit.offset;
893
+ if (likely(req->rq_bytes_sent >= msglen)) {
894
+ req->rq_xmit_bytes_sent += transport->xmit.offset;
895
+ transport->xmit.offset = 0;
544896 return 0;
545897 }
546898 status = -EAGAIN;
....@@ -550,14 +902,14 @@
550902 case -ENOBUFS:
551903 break;
552904 case -EAGAIN:
553
- status = xs_nospace(task);
905
+ status = xs_stream_nospace(req);
554906 break;
555907 default:
556908 dprintk("RPC: sendmsg returned unrecognized error %d\n",
557909 -status);
558
- /* fall through */
910
+ fallthrough;
559911 case -EPIPE:
560
- xs_close(xprt);
912
+ xprt_force_disconnect(xprt);
561913 status = -ENOTCONN;
562914 }
563915
....@@ -566,7 +918,7 @@
566918
567919 /**
568920 * xs_udp_send_request - write an RPC request to a UDP socket
569
- * @task: address of RPC task that manages the state of an RPC request
921
+ * @req: pointer to RPC request
570922 *
571923 * Return values:
572924 * 0: The request has been sent
....@@ -575,13 +927,17 @@
575927 * ENOTCONN: Caller needs to invoke connect logic then call again
576928 * other: Some other error occurred, the request was not sent
577929 */
578
-static int xs_udp_send_request(struct rpc_task *task)
930
+static int xs_udp_send_request(struct rpc_rqst *req)
579931 {
580
- struct rpc_rqst *req = task->tk_rqstp;
581932 struct rpc_xprt *xprt = req->rq_xprt;
582933 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
583934 struct xdr_buf *xdr = &req->rq_snd_buf;
584
- int sent = 0;
935
+ struct msghdr msg = {
936
+ .msg_name = xs_addr(xprt),
937
+ .msg_namelen = xprt->addrlen,
938
+ .msg_flags = XS_SENDMSG_FLAGS,
939
+ };
940
+ unsigned int sent;
585941 int status;
586942
587943 xs_pktdump("packet data:",
....@@ -590,12 +946,15 @@
590946
591947 if (!xprt_bound(xprt))
592948 return -ENOTCONN;
949
+
950
+ if (!xprt_request_get_cong(xprt, req))
951
+ return -EBADSLT;
952
+
593953 req->rq_xtime = ktime_get();
594
- status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
595
- xdr, req->rq_bytes_sent, true, &sent);
954
+ status = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, 0, &sent);
596955
597956 dprintk("RPC: xs_udp_send_request(%u) = %d\n",
598
- xdr->len - req->rq_bytes_sent, status);
957
+ xdr->len, status);
599958
600959 /* firewall is blocking us, don't return -EAGAIN or we end up looping */
601960 if (status == -EPERM)
....@@ -619,7 +978,7 @@
619978 /* Should we call xs_close() here? */
620979 break;
621980 case -EAGAIN:
622
- status = xs_nospace(task);
981
+ status = xs_sock_nospace(req);
623982 break;
624983 case -ENETUNREACH:
625984 case -ENOBUFS:
....@@ -639,7 +998,7 @@
639998
640999 /**
6411000 * xs_tcp_send_request - write an RPC request to a TCP socket
642
- * @task: address of RPC task that manages the state of an RPC request
1001
+ * @req: pointer to RPC request
6431002 *
6441003 * Return values:
6451004 * 0: The request has been sent
....@@ -651,28 +1010,30 @@
6511010 * XXX: In the case of soft timeouts, should we eventually give up
6521011 * if sendmsg is not able to make progress?
6531012 */
654
-static int xs_tcp_send_request(struct rpc_task *task)
1013
+static int xs_tcp_send_request(struct rpc_rqst *req)
6551014 {
656
- struct rpc_rqst *req = task->tk_rqstp;
6571015 struct rpc_xprt *xprt = req->rq_xprt;
6581016 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
6591017 struct xdr_buf *xdr = &req->rq_snd_buf;
660
- bool zerocopy = true;
1018
+ rpc_fraghdr rm = xs_stream_record_marker(xdr);
1019
+ unsigned int msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen;
1020
+ struct msghdr msg = {
1021
+ .msg_flags = XS_SENDMSG_FLAGS,
1022
+ };
6611023 bool vm_wait = false;
1024
+ unsigned int sent;
6621025 int status;
663
- int sent;
6641026
665
- xs_encode_stream_record_marker(&req->rq_snd_buf);
1027
+ /* Close the stream if the previous transmission was incomplete */
1028
+ if (xs_send_request_was_aborted(transport, req)) {
1029
+ if (transport->sock != NULL)
1030
+ kernel_sock_shutdown(transport->sock, SHUT_RDWR);
1031
+ return -ENOTCONN;
1032
+ }
6661033
6671034 xs_pktdump("packet data:",
6681035 req->rq_svec->iov_base,
6691036 req->rq_svec->iov_len);
670
- /* Don't use zero copy if this is a resend. If the RPC call
671
- * completes while the socket holds a reference to the pages,
672
- * then we may end up resending corrupted data.
673
- */
674
- if (task->tk_flags & RPC_TASK_SENT)
675
- zerocopy = false;
6761037
6771038 if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
6781039 xs_tcp_set_socket_timeouts(xprt, transport->sock);
....@@ -682,19 +1043,19 @@
6821043 * called sendmsg(). */
6831044 req->rq_xtime = ktime_get();
6841045 while (1) {
685
- sent = 0;
686
- status = xs_sendpages(transport->sock, NULL, 0, xdr,
687
- req->rq_bytes_sent, zerocopy, &sent);
1046
+ status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
1047
+ transport->xmit.offset, rm, &sent);
6881048
6891049 dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
690
- xdr->len - req->rq_bytes_sent, status);
1050
+ xdr->len - transport->xmit.offset, status);
6911051
6921052 /* If we've sent the entire packet, immediately
6931053 * reset the count of bytes sent. */
694
- req->rq_bytes_sent += sent;
695
- req->rq_xmit_bytes_sent += sent;
696
- if (likely(req->rq_bytes_sent >= req->rq_slen)) {
697
- req->rq_bytes_sent = 0;
1054
+ transport->xmit.offset += sent;
1055
+ req->rq_bytes_sent = transport->xmit.offset;
1056
+ if (likely(req->rq_bytes_sent >= msglen)) {
1057
+ req->rq_xmit_bytes_sent += transport->xmit.offset;
1058
+ transport->xmit.offset = 0;
6981059 return 0;
6991060 }
7001061
....@@ -732,7 +1093,7 @@
7321093 /* Should we call xs_close() here? */
7331094 break;
7341095 case -EAGAIN:
735
- status = xs_nospace(task);
1096
+ status = xs_stream_nospace(req);
7361097 break;
7371098 case -ECONNRESET:
7381099 case -ECONNREFUSED:
....@@ -747,35 +1108,6 @@
7471108 }
7481109
7491110 return status;
750
-}
751
-
752
-/**
753
- * xs_tcp_release_xprt - clean up after a tcp transmission
754
- * @xprt: transport
755
- * @task: rpc task
756
- *
757
- * This cleans up if an error causes us to abort the transmission of a request.
758
- * In this case, the socket may need to be reset in order to avoid confusing
759
- * the server.
760
- */
761
-static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
762
-{
763
- struct rpc_rqst *req;
764
-
765
- if (task != xprt->snd_task)
766
- return;
767
- if (task == NULL)
768
- goto out_release;
769
- req = task->tk_rqstp;
770
- if (req == NULL)
771
- goto out_release;
772
- if (req->rq_bytes_sent == 0)
773
- goto out_release;
774
- if (req->rq_bytes_sent == req->rq_snd_buf.len)
775
- goto out_release;
776
- set_bit(XPRT_CLOSE_WAIT, &xprt->state);
777
-out_release:
778
- xprt_release_xprt(xprt, task);
7791111 }
7801112
7811113 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
....@@ -799,6 +1131,15 @@
7991131 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
8001132
8011133 clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
1134
+ clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
1135
+ clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
1136
+ clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
1137
+}
1138
+
1139
+static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
1140
+{
1141
+ set_bit(nr, &transport->sock_state);
1142
+ queue_work(xprtiod_workqueue, &transport->error_worker);
8021143 }
8031144
8041145 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
....@@ -819,20 +1160,24 @@
8191160 */
8201161 static void xs_error_report(struct sock *sk)
8211162 {
1163
+ struct sock_xprt *transport;
8221164 struct rpc_xprt *xprt;
823
- int err;
8241165
8251166 read_lock_bh(&sk->sk_callback_lock);
8261167 if (!(xprt = xprt_from_sock(sk)))
8271168 goto out;
8281169
829
- err = -sk->sk_err;
830
- if (err == 0)
1170
+ transport = container_of(xprt, struct sock_xprt, xprt);
1171
+ transport->xprt_err = -sk->sk_err;
1172
+ if (transport->xprt_err == 0)
8311173 goto out;
8321174 dprintk("RPC: xs_error_report client %p, error=%d...\n",
833
- xprt, -err);
834
- trace_rpc_socket_error(xprt, sk->sk_socket, err);
835
- xprt_wake_pending_tasks(xprt, err);
1175
+ xprt, -transport->xprt_err);
1176
+ trace_rpc_socket_error(xprt, sk->sk_socket, transport->xprt_err);
1177
+
1178
+ /* barrier ensures xprt_err is set before XPRT_SOCK_WAKE_ERROR */
1179
+ smp_mb__before_atomic();
1180
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
8361181 out:
8371182 read_unlock_bh(&sk->sk_callback_lock);
8381183 }
....@@ -842,9 +1187,20 @@
8421187 struct socket *sock = transport->sock;
8431188 struct sock *sk = transport->inet;
8441189 struct rpc_xprt *xprt = &transport->xprt;
1190
+ struct file *filp = transport->file;
8451191
8461192 if (sk == NULL)
8471193 return;
1194
+ /*
1195
+ * Make sure we're calling this in a context from which it is safe
1196
+ * to call __fput_sync(). In practice that means rpciod and the
1197
+ * system workqueue.
1198
+ */
1199
+ if (!(current->flags & PF_WQ_WORKER)) {
1200
+ WARN_ON_ONCE(1);
1201
+ set_bit(XPRT_CLOSE_WAIT, &xprt->state);
1202
+ return;
1203
+ }
8481204
8491205 if (atomic_read(&transport->xprt.swapper))
8501206 sk_clear_memalloc(sk);
....@@ -855,6 +1211,7 @@
8551211 write_lock_bh(&sk->sk_callback_lock);
8561212 transport->inet = NULL;
8571213 transport->sock = NULL;
1214
+ transport->file = NULL;
8581215
8591216 sk->sk_user_data = NULL;
8601217
....@@ -862,10 +1219,14 @@
8621219 xprt_clear_connected(xprt);
8631220 write_unlock_bh(&sk->sk_callback_lock);
8641221 xs_sock_reset_connection_flags(xprt);
1222
+ /* Reset stream record info */
1223
+ xs_stream_reset_connect(transport);
8651224 mutex_unlock(&transport->recv_mutex);
8661225
8671226 trace_rpc_socket_close(xprt, sock);
868
- sock_release(sock);
1227
+ __fput_sync(filp);
1228
+
1229
+ xprt_disconnect_done(xprt);
8691230 }
8701231
8711232 /**
....@@ -886,8 +1247,6 @@
8861247
8871248 xs_reset_transport(transport);
8881249 xprt->reestablish_timeout = 0;
889
-
890
- xprt_disconnect_done(xprt);
8911250 }
8921251
8931252 static void xs_inject_disconnect(struct rpc_xprt *xprt)
....@@ -917,116 +1276,9 @@
9171276 cancel_delayed_work_sync(&transport->connect_worker);
9181277 xs_close(xprt);
9191278 cancel_work_sync(&transport->recv_worker);
1279
+ cancel_work_sync(&transport->error_worker);
9201280 xs_xprt_free(xprt);
9211281 module_put(THIS_MODULE);
922
-}
923
-
924
-static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
925
-{
926
- struct xdr_skb_reader desc = {
927
- .skb = skb,
928
- .offset = sizeof(rpc_fraghdr),
929
- .count = skb->len - sizeof(rpc_fraghdr),
930
- };
931
-
932
- if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
933
- return -1;
934
- if (desc.count)
935
- return -1;
936
- return 0;
937
-}
938
-
939
-/**
940
- * xs_local_data_read_skb
941
- * @xprt: transport
942
- * @sk: socket
943
- * @skb: skbuff
944
- *
945
- * Currently this assumes we can read the whole reply in a single gulp.
946
- */
947
-static void xs_local_data_read_skb(struct rpc_xprt *xprt,
948
- struct sock *sk,
949
- struct sk_buff *skb)
950
-{
951
- struct rpc_task *task;
952
- struct rpc_rqst *rovr;
953
- int repsize, copied;
954
- u32 _xid;
955
- __be32 *xp;
956
-
957
- repsize = skb->len - sizeof(rpc_fraghdr);
958
- if (repsize < 4) {
959
- dprintk("RPC: impossible RPC reply size %d\n", repsize);
960
- return;
961
- }
962
-
963
- /* Copy the XID from the skb... */
964
- xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
965
- if (xp == NULL)
966
- return;
967
-
968
- /* Look up and lock the request corresponding to the given XID */
969
- spin_lock(&xprt->recv_lock);
970
- rovr = xprt_lookup_rqst(xprt, *xp);
971
- if (!rovr)
972
- goto out_unlock;
973
- xprt_pin_rqst(rovr);
974
- spin_unlock(&xprt->recv_lock);
975
- task = rovr->rq_task;
976
-
977
- copied = rovr->rq_private_buf.buflen;
978
- if (copied > repsize)
979
- copied = repsize;
980
-
981
- if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
982
- dprintk("RPC: sk_buff copy failed\n");
983
- spin_lock(&xprt->recv_lock);
984
- goto out_unpin;
985
- }
986
-
987
- spin_lock(&xprt->recv_lock);
988
- xprt_complete_rqst(task, copied);
989
-out_unpin:
990
- xprt_unpin_rqst(rovr);
991
- out_unlock:
992
- spin_unlock(&xprt->recv_lock);
993
-}
994
-
995
-static void xs_local_data_receive(struct sock_xprt *transport)
996
-{
997
- struct sk_buff *skb;
998
- struct sock *sk;
999
- int err;
1000
-
1001
-restart:
1002
- mutex_lock(&transport->recv_mutex);
1003
- sk = transport->inet;
1004
- if (sk == NULL)
1005
- goto out;
1006
- for (;;) {
1007
- skb = skb_recv_datagram(sk, 0, 1, &err);
1008
- if (skb != NULL) {
1009
- xs_local_data_read_skb(&transport->xprt, sk, skb);
1010
- skb_free_datagram(sk, skb);
1011
- continue;
1012
- }
1013
- if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
1014
- break;
1015
- if (need_resched()) {
1016
- mutex_unlock(&transport->recv_mutex);
1017
- cond_resched();
1018
- goto restart;
1019
- }
1020
- }
1021
-out:
1022
- mutex_unlock(&transport->recv_mutex);
1023
-}
1024
-
1025
-static void xs_local_data_receive_workfn(struct work_struct *work)
1026
-{
1027
- struct sock_xprt *transport =
1028
- container_of(work, struct sock_xprt, recv_worker);
1029
- xs_local_data_receive(transport);
10301282 }
10311283
10321284 /**
....@@ -1058,13 +1310,13 @@
10581310 return;
10591311
10601312 /* Look up and lock the request corresponding to the given XID */
1061
- spin_lock(&xprt->recv_lock);
1313
+ spin_lock(&xprt->queue_lock);
10621314 rovr = xprt_lookup_rqst(xprt, *xp);
10631315 if (!rovr)
10641316 goto out_unlock;
10651317 xprt_pin_rqst(rovr);
10661318 xprt_update_rtt(rovr->rq_task);
1067
- spin_unlock(&xprt->recv_lock);
1319
+ spin_unlock(&xprt->queue_lock);
10681320 task = rovr->rq_task;
10691321
10701322 if ((copied = rovr->rq_private_buf.buflen) > repsize)
....@@ -1072,22 +1324,22 @@
10721324
10731325 /* Suck it into the iovec, verify checksum if not done by hw. */
10741326 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1075
- spin_lock(&xprt->recv_lock);
1327
+ spin_lock(&xprt->queue_lock);
10761328 __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
10771329 goto out_unpin;
10781330 }
10791331
10801332
1081
- spin_lock_bh(&xprt->transport_lock);
1333
+ spin_lock(&xprt->transport_lock);
10821334 xprt_adjust_cwnd(xprt, task, copied);
1083
- spin_unlock_bh(&xprt->transport_lock);
1084
- spin_lock(&xprt->recv_lock);
1335
+ spin_unlock(&xprt->transport_lock);
1336
+ spin_lock(&xprt->queue_lock);
10851337 xprt_complete_rqst(task, copied);
10861338 __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
10871339 out_unpin:
10881340 xprt_unpin_rqst(rovr);
10891341 out_unlock:
1090
- spin_unlock(&xprt->recv_lock);
1342
+ spin_unlock(&xprt->queue_lock);
10911343 }
10921344
10931345 static void xs_udp_data_receive(struct sock_xprt *transport)
....@@ -1096,26 +1348,19 @@
10961348 struct sock *sk;
10971349 int err;
10981350
1099
-restart:
11001351 mutex_lock(&transport->recv_mutex);
11011352 sk = transport->inet;
11021353 if (sk == NULL)
11031354 goto out;
11041355 for (;;) {
11051356 skb = skb_recv_udp(sk, 0, 1, &err);
1106
- if (skb != NULL) {
1107
- xs_udp_data_read_skb(&transport->xprt, sk, skb);
1108
- consume_skb(skb);
1109
- continue;
1110
- }
1111
- if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
1357
+ if (skb == NULL)
11121358 break;
1113
- if (need_resched()) {
1114
- mutex_unlock(&transport->recv_mutex);
1115
- cond_resched();
1116
- goto restart;
1117
- }
1359
+ xs_udp_data_read_skb(&transport->xprt, sk, skb);
1360
+ consume_skb(skb);
1361
+ cond_resched();
11181362 }
1363
+ xs_poll_check_readable(transport);
11191364 out:
11201365 mutex_unlock(&transport->recv_mutex);
11211366 }
....@@ -1124,7 +1369,10 @@
11241369 {
11251370 struct sock_xprt *transport =
11261371 container_of(work, struct sock_xprt, recv_worker);
1372
+ unsigned int pflags = memalloc_nofs_save();
1373
+
11271374 xs_udp_data_receive(transport);
1375
+ memalloc_nofs_restore(pflags);
11281376 }
11291377
11301378 /**
....@@ -1163,416 +1411,12 @@
11631411 xprt_force_disconnect(xprt);
11641412 }
11651413
1166
-static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1167
-{
1168
- struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1169
- size_t len, used;
1170
- char *p;
1171
-
1172
- p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1173
- len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1174
- used = xdr_skb_read_bits(desc, p, len);
1175
- transport->tcp_offset += used;
1176
- if (used != len)
1177
- return;
1178
-
1179
- transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1180
- if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1181
- transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1182
- else
1183
- transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1184
- transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1185
-
1186
- transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1187
- transport->tcp_offset = 0;
1188
-
1189
- /* Sanity check of the record length */
1190
- if (unlikely(transport->tcp_reclen < 8)) {
1191
- dprintk("RPC: invalid TCP record fragment length\n");
1192
- xs_tcp_force_close(xprt);
1193
- return;
1194
- }
1195
- dprintk("RPC: reading TCP record fragment of length %d\n",
1196
- transport->tcp_reclen);
1197
-}
1198
-
1199
-static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1200
-{
1201
- if (transport->tcp_offset == transport->tcp_reclen) {
1202
- transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1203
- transport->tcp_offset = 0;
1204
- if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1205
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1206
- transport->tcp_flags |= TCP_RCV_COPY_XID;
1207
- transport->tcp_copied = 0;
1208
- }
1209
- }
1210
-}
1211
-
1212
-static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1213
-{
1214
- size_t len, used;
1215
- char *p;
1216
-
1217
- len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1218
- dprintk("RPC: reading XID (%zu bytes)\n", len);
1219
- p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1220
- used = xdr_skb_read_bits(desc, p, len);
1221
- transport->tcp_offset += used;
1222
- if (used != len)
1223
- return;
1224
- transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1225
- transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1226
- transport->tcp_copied = 4;
1227
- dprintk("RPC: reading %s XID %08x\n",
1228
- (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1229
- : "request with",
1230
- ntohl(transport->tcp_xid));
1231
- xs_tcp_check_fraghdr(transport);
1232
-}
1233
-
1234
-static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1235
- struct xdr_skb_reader *desc)
1236
-{
1237
- size_t len, used;
1238
- u32 offset;
1239
- char *p;
1240
-
1241
- /*
1242
- * We want transport->tcp_offset to be 8 at the end of this routine
1243
- * (4 bytes for the xid and 4 bytes for the call/reply flag).
1244
- * When this function is called for the first time,
1245
- * transport->tcp_offset is 4 (after having already read the xid).
1246
- */
1247
- offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1248
- len = sizeof(transport->tcp_calldir) - offset;
1249
- dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len);
1250
- p = ((char *) &transport->tcp_calldir) + offset;
1251
- used = xdr_skb_read_bits(desc, p, len);
1252
- transport->tcp_offset += used;
1253
- if (used != len)
1254
- return;
1255
- transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1256
- /*
1257
- * We don't yet have the XDR buffer, so we will write the calldir
1258
- * out after we get the buffer from the 'struct rpc_rqst'
1259
- */
1260
- switch (ntohl(transport->tcp_calldir)) {
1261
- case RPC_REPLY:
1262
- transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1263
- transport->tcp_flags |= TCP_RCV_COPY_DATA;
1264
- transport->tcp_flags |= TCP_RPC_REPLY;
1265
- break;
1266
- case RPC_CALL:
1267
- transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1268
- transport->tcp_flags |= TCP_RCV_COPY_DATA;
1269
- transport->tcp_flags &= ~TCP_RPC_REPLY;
1270
- break;
1271
- default:
1272
- dprintk("RPC: invalid request message type\n");
1273
- xs_tcp_force_close(&transport->xprt);
1274
- }
1275
- xs_tcp_check_fraghdr(transport);
1276
-}
1277
-
1278
-static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1279
- struct xdr_skb_reader *desc,
1280
- struct rpc_rqst *req)
1281
-{
1282
- struct sock_xprt *transport =
1283
- container_of(xprt, struct sock_xprt, xprt);
1284
- struct xdr_buf *rcvbuf;
1285
- size_t len;
1286
- ssize_t r;
1287
-
1288
- rcvbuf = &req->rq_private_buf;
1289
-
1290
- if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1291
- /*
1292
- * Save the RPC direction in the XDR buffer
1293
- */
1294
- memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1295
- &transport->tcp_calldir,
1296
- sizeof(transport->tcp_calldir));
1297
- transport->tcp_copied += sizeof(transport->tcp_calldir);
1298
- transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1299
- }
1300
-
1301
- len = desc->count;
1302
- if (len > transport->tcp_reclen - transport->tcp_offset)
1303
- desc->count = transport->tcp_reclen - transport->tcp_offset;
1304
- r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1305
- desc, xdr_skb_read_bits);
1306
-
1307
- if (desc->count) {
1308
- /* Error when copying to the receive buffer,
1309
- * usually because we weren't able to allocate
1310
- * additional buffer pages. All we can do now
1311
- * is turn off TCP_RCV_COPY_DATA, so the request
1312
- * will not receive any additional updates,
1313
- * and time out.
1314
- * Any remaining data from this record will
1315
- * be discarded.
1316
- */
1317
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1318
- dprintk("RPC: XID %08x truncated request\n",
1319
- ntohl(transport->tcp_xid));
1320
- dprintk("RPC: xprt = %p, tcp_copied = %lu, "
1321
- "tcp_offset = %u, tcp_reclen = %u\n",
1322
- xprt, transport->tcp_copied,
1323
- transport->tcp_offset, transport->tcp_reclen);
1324
- return;
1325
- }
1326
-
1327
- transport->tcp_copied += r;
1328
- transport->tcp_offset += r;
1329
- desc->count = len - r;
1330
-
1331
- dprintk("RPC: XID %08x read %zd bytes\n",
1332
- ntohl(transport->tcp_xid), r);
1333
- dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1334
- "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1335
- transport->tcp_offset, transport->tcp_reclen);
1336
-
1337
- if (transport->tcp_copied == req->rq_private_buf.buflen)
1338
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1339
- else if (transport->tcp_offset == transport->tcp_reclen) {
1340
- if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1341
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1342
- }
1343
-}
1344
-
1345
-/*
1346
- * Finds the request corresponding to the RPC xid and invokes the common
1347
- * tcp read code to read the data.
1348
- */
1349
-static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1350
- struct xdr_skb_reader *desc)
1351
-{
1352
- struct sock_xprt *transport =
1353
- container_of(xprt, struct sock_xprt, xprt);
1354
- struct rpc_rqst *req;
1355
-
1356
- dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
1357
-
1358
- /* Find and lock the request corresponding to this xid */
1359
- spin_lock(&xprt->recv_lock);
1360
- req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1361
- if (!req) {
1362
- dprintk("RPC: XID %08x request not found!\n",
1363
- ntohl(transport->tcp_xid));
1364
- spin_unlock(&xprt->recv_lock);
1365
- return -1;
1366
- }
1367
- xprt_pin_rqst(req);
1368
- spin_unlock(&xprt->recv_lock);
1369
-
1370
- xs_tcp_read_common(xprt, desc, req);
1371
-
1372
- spin_lock(&xprt->recv_lock);
1373
- if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1374
- xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1375
- xprt_unpin_rqst(req);
1376
- spin_unlock(&xprt->recv_lock);
1377
- return 0;
1378
-}
1379
-
13801414 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1381
-/*
1382
- * Obtains an rpc_rqst previously allocated and invokes the common
1383
- * tcp read code to read the data. The result is placed in the callback
1384
- * queue.
1385
- * If we're unable to obtain the rpc_rqst we schedule the closing of the
1386
- * connection and return -1.
1387
- */
1388
-static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1389
- struct xdr_skb_reader *desc)
1390
-{
1391
- struct sock_xprt *transport =
1392
- container_of(xprt, struct sock_xprt, xprt);
1393
- struct rpc_rqst *req;
1394
-
1395
- /* Look up the request corresponding to the given XID */
1396
- req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1397
- if (req == NULL) {
1398
- printk(KERN_WARNING "Callback slot table overflowed\n");
1399
- xprt_force_disconnect(xprt);
1400
- return -1;
1401
- }
1402
-
1403
- dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid));
1404
- xs_tcp_read_common(xprt, desc, req);
1405
-
1406
- if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1407
- xprt_complete_bc_request(req, transport->tcp_copied);
1408
-
1409
- return 0;
1410
-}
1411
-
1412
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1413
- struct xdr_skb_reader *desc)
1414
-{
1415
- struct sock_xprt *transport =
1416
- container_of(xprt, struct sock_xprt, xprt);
1417
-
1418
- return (transport->tcp_flags & TCP_RPC_REPLY) ?
1419
- xs_tcp_read_reply(xprt, desc) :
1420
- xs_tcp_read_callback(xprt, desc);
1421
-}
1422
-
1423
-static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
1424
-{
1425
- int ret;
1426
-
1427
- ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
1428
- SVC_SOCK_ANONYMOUS);
1429
- if (ret < 0)
1430
- return ret;
1431
- return 0;
1432
-}
1433
-
14341415 static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
14351416 {
14361417 return PAGE_SIZE;
14371418 }
1438
-#else
1439
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1440
- struct xdr_skb_reader *desc)
1441
-{
1442
- return xs_tcp_read_reply(xprt, desc);
1443
-}
14441419 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1445
-
1446
-/*
1447
- * Read data off the transport. This can be either an RPC_CALL or an
1448
- * RPC_REPLY. Relay the processing to helper functions.
1449
- */
1450
-static void xs_tcp_read_data(struct rpc_xprt *xprt,
1451
- struct xdr_skb_reader *desc)
1452
-{
1453
- struct sock_xprt *transport =
1454
- container_of(xprt, struct sock_xprt, xprt);
1455
-
1456
- if (_xs_tcp_read_data(xprt, desc) == 0)
1457
- xs_tcp_check_fraghdr(transport);
1458
- else {
1459
- /*
1460
- * The transport_lock protects the request handling.
1461
- * There's no need to hold it to update the tcp_flags.
1462
- */
1463
- transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1464
- }
1465
-}
1466
-
1467
-static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1468
-{
1469
- size_t len;
1470
-
1471
- len = transport->tcp_reclen - transport->tcp_offset;
1472
- if (len > desc->count)
1473
- len = desc->count;
1474
- desc->count -= len;
1475
- desc->offset += len;
1476
- transport->tcp_offset += len;
1477
- dprintk("RPC: discarded %zu bytes\n", len);
1478
- xs_tcp_check_fraghdr(transport);
1479
-}
1480
-
1481
-static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1482
-{
1483
- struct rpc_xprt *xprt = rd_desc->arg.data;
1484
- struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1485
- struct xdr_skb_reader desc = {
1486
- .skb = skb,
1487
- .offset = offset,
1488
- .count = len,
1489
- };
1490
- size_t ret;
1491
-
1492
- dprintk("RPC: xs_tcp_data_recv started\n");
1493
- do {
1494
- trace_xs_tcp_data_recv(transport);
1495
- /* Read in a new fragment marker if necessary */
1496
- /* Can we ever really expect to get completely empty fragments? */
1497
- if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1498
- xs_tcp_read_fraghdr(xprt, &desc);
1499
- continue;
1500
- }
1501
- /* Read in the xid if necessary */
1502
- if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1503
- xs_tcp_read_xid(transport, &desc);
1504
- continue;
1505
- }
1506
- /* Read in the call/reply flag */
1507
- if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1508
- xs_tcp_read_calldir(transport, &desc);
1509
- continue;
1510
- }
1511
- /* Read in the request data */
1512
- if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1513
- xs_tcp_read_data(xprt, &desc);
1514
- continue;
1515
- }
1516
- /* Skip over any trailing bytes on short reads */
1517
- xs_tcp_read_discard(transport, &desc);
1518
- } while (desc.count);
1519
- ret = len - desc.count;
1520
- if (ret < rd_desc->count)
1521
- rd_desc->count -= ret;
1522
- else
1523
- rd_desc->count = 0;
1524
- trace_xs_tcp_data_recv(transport);
1525
- dprintk("RPC: xs_tcp_data_recv done\n");
1526
- return ret;
1527
-}
1528
-
1529
-static void xs_tcp_data_receive(struct sock_xprt *transport)
1530
-{
1531
- struct rpc_xprt *xprt = &transport->xprt;
1532
- struct sock *sk;
1533
- read_descriptor_t rd_desc = {
1534
- .arg.data = xprt,
1535
- };
1536
- unsigned long total = 0;
1537
- int read = 0;
1538
-
1539
-restart:
1540
- mutex_lock(&transport->recv_mutex);
1541
- sk = transport->inet;
1542
- if (sk == NULL)
1543
- goto out;
1544
-
1545
- /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1546
- for (;;) {
1547
- rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
1548
- lock_sock(sk);
1549
- read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1550
- if (rd_desc.count != 0 || read < 0) {
1551
- clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
1552
- release_sock(sk);
1553
- break;
1554
- }
1555
- release_sock(sk);
1556
- total += read;
1557
- if (need_resched()) {
1558
- mutex_unlock(&transport->recv_mutex);
1559
- cond_resched();
1560
- goto restart;
1561
- }
1562
- }
1563
- if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
1564
- queue_work(xprtiod_workqueue, &transport->recv_worker);
1565
-out:
1566
- mutex_unlock(&transport->recv_mutex);
1567
- trace_xs_tcp_data_ready(xprt, read, total);
1568
-}
1569
-
1570
-static void xs_tcp_data_receive_workfn(struct work_struct *work)
1571
-{
1572
- struct sock_xprt *transport =
1573
- container_of(work, struct sock_xprt, recv_worker);
1574
- xs_tcp_data_receive(transport);
1575
-}
15761420
15771421 /**
15781422 * xs_tcp_state_change - callback to handle TCP socket state changes
....@@ -1598,15 +1442,7 @@
15981442 trace_rpc_socket_state_change(xprt, sk->sk_socket);
15991443 switch (sk->sk_state) {
16001444 case TCP_ESTABLISHED:
1601
- spin_lock(&xprt->transport_lock);
16021445 if (!xprt_test_and_set_connected(xprt)) {
1603
-
1604
- /* Reset TCP record info */
1605
- transport->tcp_offset = 0;
1606
- transport->tcp_reclen = 0;
1607
- transport->tcp_copied = 0;
1608
- transport->tcp_flags =
1609
- TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
16101446 xprt->connect_cookie++;
16111447 clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
16121448 xprt_clear_connecting(xprt);
....@@ -1614,9 +1450,8 @@
16141450 xprt->stat.connect_count++;
16151451 xprt->stat.connect_time += (long)jiffies -
16161452 xprt->stat.connect_start;
1617
- xprt_wake_pending_tasks(xprt, -EAGAIN);
1453
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
16181454 }
1619
- spin_unlock(&xprt->transport_lock);
16201455 break;
16211456 case TCP_FIN_WAIT1:
16221457 /* The client initiated a shutdown of the socket */
....@@ -1632,8 +1467,8 @@
16321467 /* The server initiated a shutdown of the socket */
16331468 xprt->connect_cookie++;
16341469 clear_bit(XPRT_CONNECTED, &xprt->state);
1635
- xs_tcp_force_close(xprt);
1636
- /* fall through */
1470
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
1471
+ fallthrough;
16371472 case TCP_CLOSING:
16381473 /*
16391474 * If the server closed down the connection, make sure that
....@@ -1653,10 +1488,8 @@
16531488 &transport->sock_state))
16541489 xprt_clear_connecting(xprt);
16551490 clear_bit(XPRT_CLOSING, &xprt->state);
1656
- if (sk->sk_err)
1657
- xprt_wake_pending_tasks(xprt, -sk->sk_err);
16581491 /* Trigger the socket release */
1659
- xs_tcp_force_close(xprt);
1492
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
16601493 }
16611494 out:
16621495 read_unlock_bh(&sk->sk_callback_lock);
....@@ -1665,6 +1498,7 @@
16651498 static void xs_write_space(struct sock *sk)
16661499 {
16671500 struct socket_wq *wq;
1501
+ struct sock_xprt *transport;
16681502 struct rpc_xprt *xprt;
16691503
16701504 if (!sk->sk_socket)
....@@ -1673,12 +1507,14 @@
16731507
16741508 if (unlikely(!(xprt = xprt_from_sock(sk))))
16751509 return;
1510
+ transport = container_of(xprt, struct sock_xprt, xprt);
16761511 rcu_read_lock();
16771512 wq = rcu_dereference(sk->sk_wq);
16781513 if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
16791514 goto out;
16801515
1681
- xprt_write_space(xprt);
1516
+ xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
1517
+ sk->sk_write_pending--;
16821518 out:
16831519 rcu_read_unlock();
16841520 }
....@@ -1765,15 +1601,16 @@
17651601
17661602 /**
17671603 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1604
+ * @xprt: controlling transport
17681605 * @task: task that timed out
17691606 *
17701607 * Adjust the congestion window after a retransmit timeout has occurred.
17711608 */
17721609 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
17731610 {
1774
- spin_lock_bh(&xprt->transport_lock);
1611
+ spin_lock(&xprt->transport_lock);
17751612 xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
1776
- spin_unlock_bh(&xprt->transport_lock);
1613
+ spin_unlock(&xprt->transport_lock);
17771614 }
17781615
17791616 static int xs_get_random_port(void)
....@@ -1787,21 +1624,6 @@
17871624 range = max - min + 1;
17881625 rand = (unsigned short) prandom_u32() % range;
17891626 return rand + min;
1790
-}
1791
-
1792
-/**
1793
- * xs_set_reuseaddr_port - set the socket's port and address reuse options
1794
- * @sock: socket
1795
- *
1796
- * Note that this function has to be called on all sockets that share the
1797
- * same port, and it must be called before binding.
1798
- */
1799
-static void xs_sock_set_reuseport(struct socket *sock)
1800
-{
1801
- int opt = 1;
1802
-
1803
- kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT,
1804
- (char *)&opt, sizeof(opt));
18051627 }
18061628
18071629 static unsigned short xs_sock_getport(struct socket *sock)
....@@ -1838,7 +1660,7 @@
18381660
18391661 static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
18401662 {
1841
- if (transport->srcport == 0)
1663
+ if (transport->srcport == 0 && transport->xprt.reuseport)
18421664 transport->srcport = xs_sock_getport(sock);
18431665 }
18441666
....@@ -1850,6 +1672,13 @@
18501672 port = xs_get_random_port();
18511673 return port;
18521674 }
1675
+
1676
+unsigned short get_srcport(struct rpc_xprt *xprt)
1677
+{
1678
+ struct sock_xprt *sock = container_of(xprt, struct sock_xprt, xprt);
1679
+ return xs_sock_getport(sock->sock);
1680
+}
1681
+EXPORT_SYMBOL(get_srcport);
18531682
18541683 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
18551684 {
....@@ -1892,7 +1721,8 @@
18921721 err = kernel_bind(sock, (struct sockaddr *)&myaddr,
18931722 transport->xprt.addrlen);
18941723 if (err == 0) {
1895
- transport->srcport = port;
1724
+ if (transport->xprt.reuseport)
1725
+ transport->srcport = port;
18961726 break;
18971727 }
18981728 last = port;
....@@ -1983,6 +1813,7 @@
19831813 struct sock_xprt *transport, int family, int type,
19841814 int protocol, bool reuseport)
19851815 {
1816
+ struct file *filp;
19861817 struct socket *sock;
19871818 int err;
19881819
....@@ -1995,13 +1826,18 @@
19951826 xs_reclassify_socket(family, sock);
19961827
19971828 if (reuseport)
1998
- xs_sock_set_reuseport(sock);
1829
+ sock_set_reuseport(sock->sk);
19991830
20001831 err = xs_bind(transport, sock);
20011832 if (err) {
20021833 sock_release(sock);
20031834 goto out;
20041835 }
1836
+
1837
+ filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
1838
+ if (IS_ERR(filp))
1839
+ return ERR_CAST(filp);
1840
+ transport->file = filp;
20051841
20061842 return sock;
20071843 out:
....@@ -2026,7 +1862,6 @@
20261862 sk->sk_write_space = xs_udp_write_space;
20271863 sock_set_flag(sk, SOCK_FASYNC);
20281864 sk->sk_error_report = xs_error_report;
2029
- sk->sk_allocation = GFP_NOIO;
20301865
20311866 xprt_clear_connected(xprt);
20321867
....@@ -2037,7 +1872,8 @@
20371872 write_unlock_bh(&sk->sk_callback_lock);
20381873 }
20391874
2040
- /* Tell the socket layer to start connecting... */
1875
+ xs_stream_start_connect(transport);
1876
+
20411877 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
20421878 }
20431879
....@@ -2048,8 +1884,9 @@
20481884 static int xs_local_setup_socket(struct sock_xprt *transport)
20491885 {
20501886 struct rpc_xprt *xprt = &transport->xprt;
1887
+ struct file *filp;
20511888 struct socket *sock;
2052
- int status = -EIO;
1889
+ int status;
20531890
20541891 status = __sock_create(xprt->xprt_net, AF_LOCAL,
20551892 SOCK_STREAM, 0, &sock, 1);
....@@ -2059,6 +1896,13 @@
20591896 goto out;
20601897 }
20611898 xs_reclassify_socket(AF_LOCAL, sock);
1899
+
1900
+ filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
1901
+ if (IS_ERR(filp)) {
1902
+ status = PTR_ERR(filp);
1903
+ goto out;
1904
+ }
1905
+ transport->file = filp;
20621906
20631907 dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n",
20641908 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
....@@ -2073,6 +1917,7 @@
20731917 xprt->stat.connect_time += (long)jiffies -
20741918 xprt->stat.connect_start;
20751919 xprt_set_connected(xprt);
1920
+ break;
20761921 case -ENOBUFS:
20771922 break;
20781923 case -ENOENT:
....@@ -2110,6 +1955,7 @@
21101955 * we'll need to figure out how to pass a namespace to
21111956 * connect.
21121957 */
1958
+ task->tk_rpc_status = -ENOTCONN;
21131959 rpc_exit(task, -ENOTCONN);
21141960 return;
21151961 }
....@@ -2213,7 +2059,6 @@
22132059 sk->sk_data_ready = xs_data_ready;
22142060 sk->sk_write_space = xs_udp_write_space;
22152061 sock_set_flag(sk, SOCK_FASYNC);
2216
- sk->sk_allocation = GFP_NOIO;
22172062
22182063 xprt_set_connected(xprt);
22192064
....@@ -2291,30 +2136,24 @@
22912136 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
22922137 unsigned int keepidle;
22932138 unsigned int keepcnt;
2294
- unsigned int opt_on = 1;
22952139 unsigned int timeo;
22962140
2297
- spin_lock_bh(&xprt->transport_lock);
2141
+ spin_lock(&xprt->transport_lock);
22982142 keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
22992143 keepcnt = xprt->timeout->to_retries + 1;
23002144 timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
23012145 (xprt->timeout->to_retries + 1);
23022146 clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
2303
- spin_unlock_bh(&xprt->transport_lock);
2147
+ spin_unlock(&xprt->transport_lock);
23042148
23052149 /* TCP Keepalive options */
2306
- kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
2307
- (char *)&opt_on, sizeof(opt_on));
2308
- kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
2309
- (char *)&keepidle, sizeof(keepidle));
2310
- kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
2311
- (char *)&keepidle, sizeof(keepidle));
2312
- kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
2313
- (char *)&keepcnt, sizeof(keepcnt));
2150
+ sock_set_keepalive(sock->sk);
2151
+ tcp_sock_set_keepidle(sock->sk, keepidle);
2152
+ tcp_sock_set_keepintvl(sock->sk, keepidle);
2153
+ tcp_sock_set_keepcnt(sock->sk, keepcnt);
23142154
23152155 /* TCP user timeout (see RFC5482) */
2316
- kernel_setsockopt(sock, SOL_TCP, TCP_USER_TIMEOUT,
2317
- (char *)&timeo, sizeof(timeo));
2156
+ tcp_sock_set_user_timeout(sock->sk, timeo);
23182157 }
23192158
23202159 static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
....@@ -2325,7 +2164,7 @@
23252164 struct rpc_timeout to;
23262165 unsigned long initval;
23272166
2328
- spin_lock_bh(&xprt->transport_lock);
2167
+ spin_lock(&xprt->transport_lock);
23292168 if (reconnect_timeout < xprt->max_reconnect_timeout)
23302169 xprt->max_reconnect_timeout = reconnect_timeout;
23312170 if (connect_timeout < xprt->connect_timeout) {
....@@ -2342,7 +2181,7 @@
23422181 xprt->connect_timeout = connect_timeout;
23432182 }
23442183 set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
2345
- spin_unlock_bh(&xprt->transport_lock);
2184
+ spin_unlock(&xprt->transport_lock);
23462185 }
23472186
23482187 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
....@@ -2352,7 +2191,6 @@
23522191
23532192 if (!transport->inet) {
23542193 struct sock *sk = sock->sk;
2355
- unsigned int addr_pref = IPV6_PREFER_SRC_PUBLIC;
23562194
23572195 /* Avoid temporary address, they are bad for long-lived
23582196 * connections such as NFS mounts.
....@@ -2361,8 +2199,10 @@
23612199 * knowledge about the normal duration of connections,
23622200 * MAY override this as appropriate.
23632201 */
2364
- kernel_setsockopt(sock, SOL_IPV6, IPV6_ADDR_PREFERENCES,
2365
- (char *)&addr_pref, sizeof(addr_pref));
2202
+ if (xs_addr(xprt)->sa_family == PF_INET6) {
2203
+ ip6_sock_set_addr_preferences(sk,
2204
+ IPV6_PREFER_SRC_PUBLIC);
2205
+ }
23662206
23672207 xs_tcp_set_socket_timeouts(xprt, sock);
23682208
....@@ -2376,7 +2216,6 @@
23762216 sk->sk_write_space = xs_tcp_write_space;
23772217 sock_set_flag(sk, SOCK_FASYNC);
23782218 sk->sk_error_report = xs_error_report;
2379
- sk->sk_allocation = GFP_NOIO;
23802219
23812220 /* socket options */
23822221 sock_reset_flag(sk, SOCK_LINGER);
....@@ -2396,13 +2235,15 @@
23962235
23972236 xs_set_memalloc(xprt);
23982237
2238
+ xs_stream_start_connect(transport);
2239
+
23992240 /* Tell the socket layer to start connecting... */
24002241 set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
24012242 ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
24022243 switch (ret) {
24032244 case 0:
24042245 xs_set_srcport(transport, sock);
2405
- /* fall through */
2246
+ fallthrough;
24062247 case -EINPROGRESS:
24072248 /* SYN_SENT! */
24082249 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
....@@ -2418,6 +2259,7 @@
24182259
24192260 /**
24202261 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2262
+ * @work: queued work item
24212263 *
24222264 * Invoked by a work queue tasklet.
24232265 */
....@@ -2429,10 +2271,14 @@
24292271 struct rpc_xprt *xprt = &transport->xprt;
24302272 int status = -EIO;
24312273
2432
- if (!sock) {
2433
- sock = xs_create_sock(xprt, transport,
2434
- xs_addr(xprt)->sa_family, SOCK_STREAM,
2435
- IPPROTO_TCP, true);
2274
+ if (xprt_connected(xprt))
2275
+ goto out;
2276
+ if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT,
2277
+ &transport->sock_state) ||
2278
+ !sock) {
2279
+ xs_reset_transport(transport);
2280
+ sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family,
2281
+ SOCK_STREAM, IPPROTO_TCP, true);
24362282 if (IS_ERR(sock)) {
24372283 status = PTR_ERR(sock);
24382284 goto out;
....@@ -2454,7 +2300,7 @@
24542300 default:
24552301 printk("%s: connect returned unhandled error %d\n",
24562302 __func__, status);
2457
- /* fall through */
2303
+ fallthrough;
24582304 case -EADDRNOTAVAIL:
24592305 /* We're probably in TIME_WAIT. Get rid of existing socket,
24602306 * and retry
....@@ -2463,6 +2309,8 @@
24632309 break;
24642310 case 0:
24652311 case -EINPROGRESS:
2312
+ set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state);
2313
+ fallthrough;
24662314 case -EALREADY:
24672315 xprt_unlock_connect(xprt, transport);
24682316 return;
....@@ -2493,25 +2341,6 @@
24932341 xprt_wake_pending_tasks(xprt, status);
24942342 }
24952343
2496
-static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt)
2497
-{
2498
- unsigned long start, now = jiffies;
2499
-
2500
- start = xprt->stat.connect_start + xprt->reestablish_timeout;
2501
- if (time_after(start, now))
2502
- return start - now;
2503
- return 0;
2504
-}
2505
-
2506
-static void xs_reconnect_backoff(struct rpc_xprt *xprt)
2507
-{
2508
- xprt->reestablish_timeout <<= 1;
2509
- if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
2510
- xprt->reestablish_timeout = xprt->max_reconnect_timeout;
2511
- if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2512
- xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2513
-}
2514
-
25152344 /**
25162345 * xs_connect - connect a socket to a remote endpoint
25172346 * @xprt: pointer to transport structure
....@@ -2535,14 +2364,10 @@
25352364
25362365 if (transport->sock != NULL) {
25372366 dprintk("RPC: xs_connect delayed xprt %p for %lu "
2538
- "seconds\n",
2539
- xprt, xprt->reestablish_timeout / HZ);
2367
+ "seconds\n", xprt, xprt->reestablish_timeout / HZ);
25402368
2541
- /* Start by resetting any existing state */
2542
- xs_reset_transport(transport);
2543
-
2544
- delay = xs_reconnect_delay(xprt);
2545
- xs_reconnect_backoff(xprt);
2369
+ delay = xprt_reconnect_delay(xprt);
2370
+ xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO);
25462371
25472372 } else
25482373 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt);
....@@ -2550,6 +2375,53 @@
25502375 queue_delayed_work(xprtiod_workqueue,
25512376 &transport->connect_worker,
25522377 delay);
2378
+}
2379
+
2380
+static void xs_wake_disconnect(struct sock_xprt *transport)
2381
+{
2382
+ if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
2383
+ xs_tcp_force_close(&transport->xprt);
2384
+}
2385
+
2386
+static void xs_wake_write(struct sock_xprt *transport)
2387
+{
2388
+ if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
2389
+ xprt_write_space(&transport->xprt);
2390
+}
2391
+
2392
+static void xs_wake_error(struct sock_xprt *transport)
2393
+{
2394
+ int sockerr;
2395
+
2396
+ if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
2397
+ return;
2398
+ mutex_lock(&transport->recv_mutex);
2399
+ if (transport->sock == NULL)
2400
+ goto out;
2401
+ if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
2402
+ goto out;
2403
+ sockerr = xchg(&transport->xprt_err, 0);
2404
+ if (sockerr < 0)
2405
+ xprt_wake_pending_tasks(&transport->xprt, sockerr);
2406
+out:
2407
+ mutex_unlock(&transport->recv_mutex);
2408
+}
2409
+
2410
+static void xs_wake_pending(struct sock_xprt *transport)
2411
+{
2412
+ if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
2413
+ xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
2414
+}
2415
+
2416
+static void xs_error_handle(struct work_struct *work)
2417
+{
2418
+ struct sock_xprt *transport = container_of(work,
2419
+ struct sock_xprt, error_worker);
2420
+
2421
+ xs_wake_disconnect(transport);
2422
+ xs_wake_write(transport);
2423
+ xs_wake_error(transport);
2424
+ xs_wake_pending(transport);
25532425 }
25542426
25552427 /**
....@@ -2569,7 +2441,7 @@
25692441 "%llu %llu %lu %llu %llu\n",
25702442 xprt->stat.bind_count,
25712443 xprt->stat.connect_count,
2572
- xprt->stat.connect_time,
2444
+ xprt->stat.connect_time / HZ,
25732445 idle_time,
25742446 xprt->stat.sends,
25752447 xprt->stat.recvs,
....@@ -2624,7 +2496,7 @@
26242496 transport->srcport,
26252497 xprt->stat.bind_count,
26262498 xprt->stat.connect_count,
2627
- xprt->stat.connect_time,
2499
+ xprt->stat.connect_time / HZ,
26282500 idle_time,
26292501 xprt->stat.sends,
26302502 xprt->stat.recvs,
....@@ -2678,47 +2550,43 @@
26782550 free_page((unsigned long)buf);
26792551 }
26802552
2681
-/*
2682
- * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2683
- * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2684
- */
26852553 static int bc_sendto(struct rpc_rqst *req)
26862554 {
2687
- int len;
2688
- struct xdr_buf *xbufp = &req->rq_snd_buf;
2689
- struct rpc_xprt *xprt = req->rq_xprt;
2555
+ struct xdr_buf *xdr = &req->rq_snd_buf;
26902556 struct sock_xprt *transport =
2691
- container_of(xprt, struct sock_xprt, xprt);
2692
- struct socket *sock = transport->sock;
2693
- unsigned long headoff;
2694
- unsigned long tailoff;
2557
+ container_of(req->rq_xprt, struct sock_xprt, xprt);
2558
+ struct msghdr msg = {
2559
+ .msg_flags = 0,
2560
+ };
2561
+ rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
2562
+ (u32)xdr->len);
2563
+ unsigned int sent = 0;
2564
+ int err;
26952565
2696
- xs_encode_stream_record_marker(xbufp);
2697
-
2698
- tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2699
- headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2700
- len = svc_send_common(sock, xbufp,
2701
- virt_to_page(xbufp->head[0].iov_base), headoff,
2702
- xbufp->tail[0].iov_base, tailoff);
2703
-
2704
- if (len != xbufp->len) {
2705
- printk(KERN_NOTICE "Error sending entire callback!\n");
2706
- len = -EAGAIN;
2707
- }
2708
-
2709
- return len;
2566
+ req->rq_xtime = ktime_get();
2567
+ err = xprt_sock_sendmsg(transport->sock, &msg, xdr, 0, marker, &sent);
2568
+ xdr_free_bvec(xdr);
2569
+ if (err < 0 || sent != (xdr->len + sizeof(marker)))
2570
+ return -EAGAIN;
2571
+ return sent;
27102572 }
27112573
2712
-/*
2713
- * The send routine. Borrows from svc_send
2574
+/**
2575
+ * bc_send_request - Send a backchannel Call on a TCP socket
2576
+ * @req: rpc_rqst containing Call message to be sent
2577
+ *
2578
+ * xpt_mutex ensures @rqstp's whole message is written to the socket
2579
+ * without interruption.
2580
+ *
2581
+ * Return values:
2582
+ * %0 if the message was sent successfully
2583
+ * %ENOTCONN if the message was not sent
27142584 */
2715
-static int bc_send_request(struct rpc_task *task)
2585
+static int bc_send_request(struct rpc_rqst *req)
27162586 {
2717
- struct rpc_rqst *req = task->tk_rqstp;
27182587 struct svc_xprt *xprt;
27192588 int len;
27202589
2721
- dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
27222590 /*
27232591 * Get the server socket associated with this callback xprt
27242592 */
....@@ -2728,12 +2596,7 @@
27282596 * Grab the mutex to serialize data as the connection is shared
27292597 * with the fore channel
27302598 */
2731
- if (!mutex_trylock(&xprt->xpt_mutex)) {
2732
- rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2733
- if (!mutex_trylock(&xprt->xpt_mutex))
2734
- return -EAGAIN;
2735
- rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2736
- }
2599
+ mutex_lock(&xprt->xpt_mutex);
27372600 if (test_bit(XPT_DEAD, &xprt->xpt_flags))
27382601 len = -ENOTCONN;
27392602 else
....@@ -2752,6 +2615,7 @@
27522615
27532616 static void bc_close(struct rpc_xprt *xprt)
27542617 {
2618
+ xprt_disconnect_done(xprt);
27552619 }
27562620
27572621 /*
....@@ -2769,7 +2633,7 @@
27692633
27702634 static const struct rpc_xprt_ops xs_local_ops = {
27712635 .reserve_xprt = xprt_reserve_xprt,
2772
- .release_xprt = xs_tcp_release_xprt,
2636
+ .release_xprt = xprt_release_xprt,
27732637 .alloc_slot = xprt_alloc_slot,
27742638 .free_slot = xprt_free_slot,
27752639 .rpcbind = xs_local_rpcbind,
....@@ -2777,8 +2641,9 @@
27772641 .connect = xs_local_connect,
27782642 .buf_alloc = rpc_malloc,
27792643 .buf_free = rpc_free,
2644
+ .prepare_request = xs_stream_prepare_request,
27802645 .send_request = xs_local_send_request,
2781
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
2646
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
27822647 .close = xs_close,
27832648 .destroy = xs_destroy,
27842649 .print_stats = xs_local_print_stats,
....@@ -2798,7 +2663,7 @@
27982663 .buf_alloc = rpc_malloc,
27992664 .buf_free = rpc_free,
28002665 .send_request = xs_udp_send_request,
2801
- .set_retrans_timeout = xprt_set_retrans_timeout_rtt,
2666
+ .wait_for_reply_request = xprt_wait_for_reply_request_rtt,
28022667 .timer = xs_udp_timer,
28032668 .release_request = xprt_release_rqst_cong,
28042669 .close = xs_close,
....@@ -2811,16 +2676,17 @@
28112676
28122677 static const struct rpc_xprt_ops xs_tcp_ops = {
28132678 .reserve_xprt = xprt_reserve_xprt,
2814
- .release_xprt = xs_tcp_release_xprt,
2815
- .alloc_slot = xprt_lock_and_alloc_slot,
2679
+ .release_xprt = xprt_release_xprt,
2680
+ .alloc_slot = xprt_alloc_slot,
28162681 .free_slot = xprt_free_slot,
28172682 .rpcbind = rpcb_getport_async,
28182683 .set_port = xs_set_port,
28192684 .connect = xs_connect,
28202685 .buf_alloc = rpc_malloc,
28212686 .buf_free = rpc_free,
2687
+ .prepare_request = xs_stream_prepare_request,
28222688 .send_request = xs_tcp_send_request,
2823
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
2689
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
28242690 .close = xs_tcp_shutdown,
28252691 .destroy = xs_destroy,
28262692 .set_connect_timeout = xs_tcp_set_connect_timeout,
....@@ -2830,8 +2696,8 @@
28302696 .inject_disconnect = xs_inject_disconnect,
28312697 #ifdef CONFIG_SUNRPC_BACKCHANNEL
28322698 .bc_setup = xprt_setup_bc,
2833
- .bc_up = xs_tcp_bc_up,
28342699 .bc_maxpayload = xs_tcp_bc_maxpayload,
2700
+ .bc_num_slots = xprt_bc_max_slots,
28352701 .bc_free_rqst = xprt_free_bc_rqst,
28362702 .bc_destroy = xprt_destroy_bc,
28372703 #endif
....@@ -2849,7 +2715,7 @@
28492715 .buf_alloc = bc_malloc,
28502716 .buf_free = bc_free,
28512717 .send_request = bc_send_request,
2852
- .set_retrans_timeout = xprt_set_retrans_timeout_def,
2718
+ .wait_for_reply_request = xprt_wait_for_reply_request_def,
28532719 .close = bc_close,
28542720 .destroy = bc_destroy,
28552721 .print_stats = xs_tcp_print_stats,
....@@ -2950,7 +2816,6 @@
29502816 transport = container_of(xprt, struct sock_xprt, xprt);
29512817
29522818 xprt->prot = 0;
2953
- xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
29542819 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
29552820
29562821 xprt->bind_timeout = XS_BIND_TO;
....@@ -2960,9 +2825,9 @@
29602825 xprt->ops = &xs_local_ops;
29612826 xprt->timeout = &xs_local_default_timeout;
29622827
2963
- INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
2964
- INIT_DELAYED_WORK(&transport->connect_worker,
2965
- xs_dummy_setup_socket);
2828
+ INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
2829
+ INIT_WORK(&transport->error_worker, xs_error_handle);
2830
+ INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
29662831
29672832 switch (sun->sun_family) {
29682833 case AF_LOCAL:
....@@ -2974,9 +2839,6 @@
29742839 }
29752840 xprt_set_bound(xprt);
29762841 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2977
- ret = ERR_PTR(xs_local_setup_socket(transport));
2978
- if (ret)
2979
- goto out_err;
29802842 break;
29812843 default:
29822844 ret = ERR_PTR(-EAFNOSUPPORT);
....@@ -3020,7 +2882,6 @@
30202882 transport = container_of(xprt, struct sock_xprt, xprt);
30212883
30222884 xprt->prot = IPPROTO_UDP;
3023
- xprt->tsh_size = 0;
30242885 /* XXX: header size can vary due to auth type, IPv6, etc. */
30252886 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
30262887
....@@ -3033,6 +2894,7 @@
30332894 xprt->timeout = &xs_udp_default_timeout;
30342895
30352896 INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
2897
+ INIT_WORK(&transport->error_worker, xs_error_handle);
30362898 INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
30372899
30382900 switch (addr->sa_family) {
....@@ -3100,7 +2962,6 @@
31002962 transport = container_of(xprt, struct sock_xprt, xprt);
31012963
31022964 xprt->prot = IPPROTO_TCP;
3103
- xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
31042965 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
31052966
31062967 xprt->bind_timeout = XS_BIND_TO;
....@@ -3114,7 +2975,8 @@
31142975 xprt->connect_timeout = xprt->timeout->to_initval *
31152976 (xprt->timeout->to_retries + 1);
31162977
3117
- INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
2978
+ INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
2979
+ INIT_WORK(&transport->error_worker, xs_error_handle);
31182980 INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
31192981
31202982 switch (addr->sa_family) {
....@@ -3173,7 +3035,6 @@
31733035 transport = container_of(xprt, struct sock_xprt, xprt);
31743036
31753037 xprt->prot = IPPROTO_TCP;
3176
- xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
31773038 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
31783039 xprt->timeout = &xs_tcp_default_timeout;
31793040
....@@ -3277,10 +3138,8 @@
32773138 */
32783139 int init_socket_xprt(void)
32793140 {
3280
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
32813141 if (!sunrpc_table_header)
32823142 sunrpc_table_header = register_sysctl_table(sunrpc_table);
3283
-#endif
32843143
32853144 xprt_register_transport(&xs_local_transport);
32863145 xprt_register_transport(&xs_udp_transport);
....@@ -3296,12 +3155,10 @@
32963155 */
32973156 void cleanup_socket_xprt(void)
32983157 {
3299
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
33003158 if (sunrpc_table_header) {
33013159 unregister_sysctl_table(sunrpc_table_header);
33023160 sunrpc_table_header = NULL;
33033161 }
3304
-#endif
33053162
33063163 xprt_unregister_transport(&xs_local_transport);
33073164 xprt_unregister_transport(&xs_udp_transport);