hc
2024-10-22 8ac6c7a54ed1b98d142dce24b11c6de6a1e239a5
kernel/net/sunrpc/xprt.c
....@@ -1,3 +1,4 @@
1
+// SPDX-License-Identifier: GPL-2.0-only
12 /*
23 * linux/net/sunrpc/xprt.c
34 *
....@@ -49,6 +50,7 @@
4950 #include <linux/sunrpc/metrics.h>
5051 #include <linux/sunrpc/bc_xprt.h>
5152 #include <linux/rcupdate.h>
53
+#include <linux/sched/mm.h>
5254
5355 #include <trace/events/sunrpc.h>
5456
....@@ -67,13 +69,20 @@
6769 */
6870 static void xprt_init(struct rpc_xprt *xprt, struct net *net);
6971 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
70
-static void xprt_connect_status(struct rpc_task *task);
71
-static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
72
-static void __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
7372 static void xprt_destroy(struct rpc_xprt *xprt);
73
+static void xprt_request_init(struct rpc_task *task);
7474
7575 static DEFINE_SPINLOCK(xprt_list_lock);
7676 static LIST_HEAD(xprt_list);
77
+
78
+static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
79
+{
80
+ unsigned long timeout = jiffies + req->rq_timeout;
81
+
82
+ if (time_before(timeout, req->rq_majortimeo))
83
+ return timeout;
84
+ return req->rq_majortimeo;
85
+}
7786
7887 /**
7988 * xprt_register_transport - register a transport implementation
....@@ -204,6 +213,17 @@
204213 }
205214 EXPORT_SYMBOL_GPL(xprt_load_transport);
206215
216
+static void xprt_clear_locked(struct rpc_xprt *xprt)
217
+{
218
+ xprt->snd_task = NULL;
219
+ if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
220
+ smp_mb__before_atomic();
221
+ clear_bit(XPRT_LOCKED, &xprt->state);
222
+ smp_mb__after_atomic();
223
+ } else
224
+ queue_work(xprtiod_workqueue, &xprt->task_cleanup);
225
+}
226
+
207227 /**
208228 * xprt_reserve_xprt - serialize write access to transports
209229 * @task: task that is requesting access to the transport
....@@ -216,44 +236,56 @@
216236 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
217237 {
218238 struct rpc_rqst *req = task->tk_rqstp;
219
- int priority;
220239
221240 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
222241 if (task == xprt->snd_task)
223
- return 1;
242
+ goto out_locked;
224243 goto out_sleep;
225244 }
245
+ if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
246
+ goto out_unlock;
226247 xprt->snd_task = task;
227
- if (req != NULL)
228
- req->rq_ntrans++;
229248
249
+out_locked:
250
+ trace_xprt_reserve_xprt(xprt, task);
230251 return 1;
231252
253
+out_unlock:
254
+ xprt_clear_locked(xprt);
232255 out_sleep:
233
- dprintk("RPC: %5u failed to lock transport %p\n",
234
- task->tk_pid, xprt);
235
- task->tk_timeout = 0;
236256 task->tk_status = -EAGAIN;
237
- if (req == NULL)
238
- priority = RPC_PRIORITY_LOW;
239
- else if (!req->rq_ntrans)
240
- priority = RPC_PRIORITY_NORMAL;
257
+ if (RPC_IS_SOFT(task))
258
+ rpc_sleep_on_timeout(&xprt->sending, task, NULL,
259
+ xprt_request_timeout(req));
241260 else
242
- priority = RPC_PRIORITY_HIGH;
243
- rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
261
+ rpc_sleep_on(&xprt->sending, task, NULL);
244262 return 0;
245263 }
246264 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
247265
248
-static void xprt_clear_locked(struct rpc_xprt *xprt)
266
+static bool
267
+xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
249268 {
250
- xprt->snd_task = NULL;
251
- if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
252
- smp_mb__before_atomic();
253
- clear_bit(XPRT_LOCKED, &xprt->state);
254
- smp_mb__after_atomic();
255
- } else
256
- queue_work(xprtiod_workqueue, &xprt->task_cleanup);
269
+ return test_bit(XPRT_CWND_WAIT, &xprt->state);
270
+}
271
+
272
+static void
273
+xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
274
+{
275
+ if (!list_empty(&xprt->xmit_queue)) {
276
+ /* Peek at head of queue to see if it can make progress */
277
+ if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
278
+ rq_xmit)->rq_cong)
279
+ return;
280
+ }
281
+ set_bit(XPRT_CWND_WAIT, &xprt->state);
282
+}
283
+
284
+static void
285
+xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
286
+{
287
+ if (!RPCXPRT_CONGESTED(xprt))
288
+ clear_bit(XPRT_CWND_WAIT, &xprt->state);
257289 }
258290
259291 /*
....@@ -263,41 +295,40 @@
263295 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
264296 * integrated into the decision of whether a request is allowed to be
265297 * woken up and given access to the transport.
298
+ * Note that the lock is only granted if we know there are free slots.
266299 */
267300 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
268301 {
269302 struct rpc_rqst *req = task->tk_rqstp;
270
- int priority;
271303
272304 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
273305 if (task == xprt->snd_task)
274
- return 1;
306
+ goto out_locked;
275307 goto out_sleep;
276308 }
277309 if (req == NULL) {
278310 xprt->snd_task = task;
279
- return 1;
311
+ goto out_locked;
280312 }
281
- if (__xprt_get_cong(xprt, task)) {
313
+ if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
314
+ goto out_unlock;
315
+ if (!xprt_need_congestion_window_wait(xprt)) {
282316 xprt->snd_task = task;
283
- req->rq_ntrans++;
284
- return 1;
317
+ goto out_locked;
285318 }
319
+out_unlock:
286320 xprt_clear_locked(xprt);
287321 out_sleep:
288
- if (req)
289
- __xprt_put_cong(xprt, req);
290
- dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
291
- task->tk_timeout = 0;
292322 task->tk_status = -EAGAIN;
293
- if (req == NULL)
294
- priority = RPC_PRIORITY_LOW;
295
- else if (!req->rq_ntrans)
296
- priority = RPC_PRIORITY_NORMAL;
323
+ if (RPC_IS_SOFT(task))
324
+ rpc_sleep_on_timeout(&xprt->sending, task, NULL,
325
+ xprt_request_timeout(req));
297326 else
298
- priority = RPC_PRIORITY_HIGH;
299
- rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
327
+ rpc_sleep_on(&xprt->sending, task, NULL);
300328 return 0;
329
+out_locked:
330
+ trace_xprt_reserve_cong(xprt, task);
331
+ return 1;
301332 }
302333 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
303334
....@@ -305,21 +336,19 @@
305336 {
306337 int retval;
307338
308
- spin_lock_bh(&xprt->transport_lock);
339
+ if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
340
+ return 1;
341
+ spin_lock(&xprt->transport_lock);
309342 retval = xprt->ops->reserve_xprt(xprt, task);
310
- spin_unlock_bh(&xprt->transport_lock);
343
+ spin_unlock(&xprt->transport_lock);
311344 return retval;
312345 }
313346
314347 static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
315348 {
316349 struct rpc_xprt *xprt = data;
317
- struct rpc_rqst *req;
318350
319
- req = task->tk_rqstp;
320351 xprt->snd_task = task;
321
- if (req)
322
- req->rq_ntrans++;
323352 return true;
324353 }
325354
....@@ -327,51 +356,28 @@
327356 {
328357 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
329358 return;
330
-
359
+ if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
360
+ goto out_unlock;
331361 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
332362 __xprt_lock_write_func, xprt))
333363 return;
364
+out_unlock:
334365 xprt_clear_locked(xprt);
335
-}
336
-
337
-static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
338
-{
339
- struct rpc_xprt *xprt = data;
340
- struct rpc_rqst *req;
341
-
342
- req = task->tk_rqstp;
343
- if (req == NULL) {
344
- xprt->snd_task = task;
345
- return true;
346
- }
347
- if (__xprt_get_cong(xprt, task)) {
348
- xprt->snd_task = task;
349
- req->rq_ntrans++;
350
- return true;
351
- }
352
- return false;
353366 }
354367
355368 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
356369 {
357370 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
358371 return;
359
- if (RPCXPRT_CONGESTED(xprt))
372
+ if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
373
+ goto out_unlock;
374
+ if (xprt_need_congestion_window_wait(xprt))
360375 goto out_unlock;
361376 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
362
- __xprt_lock_write_cong_func, xprt))
377
+ __xprt_lock_write_func, xprt))
363378 return;
364379 out_unlock:
365380 xprt_clear_locked(xprt);
366
-}
367
-
368
-static void xprt_task_clear_bytes_sent(struct rpc_task *task)
369
-{
370
- if (task != NULL) {
371
- struct rpc_rqst *req = task->tk_rqstp;
372
- if (req != NULL)
373
- req->rq_bytes_sent = 0;
374
- }
375381 }
376382
377383 /**
....@@ -384,10 +390,10 @@
384390 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
385391 {
386392 if (xprt->snd_task == task) {
387
- xprt_task_clear_bytes_sent(task);
388393 xprt_clear_locked(xprt);
389394 __xprt_lock_write_next(xprt);
390395 }
396
+ trace_xprt_release_xprt(xprt, task);
391397 }
392398 EXPORT_SYMBOL_GPL(xprt_release_xprt);
393399
....@@ -402,18 +408,20 @@
402408 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
403409 {
404410 if (xprt->snd_task == task) {
405
- xprt_task_clear_bytes_sent(task);
406411 xprt_clear_locked(xprt);
407412 __xprt_lock_write_next_cong(xprt);
408413 }
414
+ trace_xprt_release_cong(xprt, task);
409415 }
410416 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
411417
412418 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
413419 {
414
- spin_lock_bh(&xprt->transport_lock);
420
+ if (xprt->snd_task != task)
421
+ return;
422
+ spin_lock(&xprt->transport_lock);
415423 xprt->ops->release_xprt(xprt, task);
416
- spin_unlock_bh(&xprt->transport_lock);
424
+ spin_unlock(&xprt->transport_lock);
417425 }
418426
419427 /*
....@@ -421,16 +429,15 @@
421429 * overflowed. Put the task to sleep if this is the case.
422430 */
423431 static int
424
-__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
432
+__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
425433 {
426
- struct rpc_rqst *req = task->tk_rqstp;
427
-
428434 if (req->rq_cong)
429435 return 1;
430
- dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
431
- task->tk_pid, xprt->cong, xprt->cwnd);
432
- if (RPCXPRT_CONGESTED(xprt))
436
+ trace_xprt_get_cong(xprt, req->rq_task);
437
+ if (RPCXPRT_CONGESTED(xprt)) {
438
+ xprt_set_congestion_window_wait(xprt);
433439 return 0;
440
+ }
434441 req->rq_cong = 1;
435442 xprt->cong += RPC_CWNDSCALE;
436443 return 1;
....@@ -447,8 +454,31 @@
447454 return;
448455 req->rq_cong = 0;
449456 xprt->cong -= RPC_CWNDSCALE;
457
+ xprt_test_and_clear_congestion_window_wait(xprt);
458
+ trace_xprt_put_cong(xprt, req->rq_task);
450459 __xprt_lock_write_next_cong(xprt);
451460 }
461
+
462
+/**
463
+ * xprt_request_get_cong - Request congestion control credits
464
+ * @xprt: pointer to transport
465
+ * @req: pointer to RPC request
466
+ *
467
+ * Useful for transports that require congestion control.
468
+ */
469
+bool
470
+xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
471
+{
472
+ bool ret = false;
473
+
474
+ if (req->rq_cong)
475
+ return true;
476
+ spin_lock(&xprt->transport_lock);
477
+ ret = __xprt_get_cong(xprt, req) != 0;
478
+ spin_unlock(&xprt->transport_lock);
479
+ return ret;
480
+}
481
+EXPORT_SYMBOL_GPL(xprt_request_get_cong);
452482
453483 /**
454484 * xprt_release_rqst_cong - housekeeping when request is complete
....@@ -463,6 +493,26 @@
463493 __xprt_put_cong(req->rq_xprt, req);
464494 }
465495 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
496
+
497
+static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
498
+{
499
+ if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
500
+ __xprt_lock_write_next_cong(xprt);
501
+}
502
+
503
+/*
504
+ * Clear the congestion window wait flag and wake up the next
505
+ * entry on xprt->sending
506
+ */
507
+static void
508
+xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
509
+{
510
+ if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
511
+ spin_lock(&xprt->transport_lock);
512
+ __xprt_lock_write_next_cong(xprt);
513
+ spin_unlock(&xprt->transport_lock);
514
+ }
515
+}
466516
467517 /**
468518 * xprt_adjust_cwnd - adjust transport congestion window
....@@ -521,22 +571,29 @@
521571
522572 /**
523573 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
524
- * @task: task to be put to sleep
525
- * @action: function pointer to be executed after wait
574
+ * @xprt: transport
526575 *
527576 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
528577 * we don't in general want to force a socket disconnection due to
529578 * an incomplete RPC call transmission.
530579 */
531
-void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
580
+void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
532581 {
533
- struct rpc_rqst *req = task->tk_rqstp;
534
- struct rpc_xprt *xprt = req->rq_xprt;
535
-
536
- task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
537
- rpc_sleep_on(&xprt->pending, task, action);
582
+ set_bit(XPRT_WRITE_SPACE, &xprt->state);
538583 }
539584 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
585
+
586
+static bool
587
+xprt_clear_write_space_locked(struct rpc_xprt *xprt)
588
+{
589
+ if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
590
+ __xprt_lock_write_next(xprt);
591
+ dprintk("RPC: write space: waking waiting task on "
592
+ "xprt %p\n", xprt);
593
+ return true;
594
+ }
595
+ return false;
596
+}
540597
541598 /**
542599 * xprt_write_space - wake the task waiting for transport output buffer space
....@@ -544,66 +601,63 @@
544601 *
545602 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
546603 */
547
-void xprt_write_space(struct rpc_xprt *xprt)
604
+bool xprt_write_space(struct rpc_xprt *xprt)
548605 {
549
- spin_lock_bh(&xprt->transport_lock);
550
- if (xprt->snd_task) {
551
- dprintk("RPC: write space: waking waiting task on "
552
- "xprt %p\n", xprt);
553
- rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
554
- &xprt->pending, xprt->snd_task);
555
- }
556
- spin_unlock_bh(&xprt->transport_lock);
606
+ bool ret;
607
+
608
+ if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
609
+ return false;
610
+ spin_lock(&xprt->transport_lock);
611
+ ret = xprt_clear_write_space_locked(xprt);
612
+ spin_unlock(&xprt->transport_lock);
613
+ return ret;
557614 }
558615 EXPORT_SYMBOL_GPL(xprt_write_space);
559616
560
-/**
561
- * xprt_set_retrans_timeout_def - set a request's retransmit timeout
562
- * @task: task whose timeout is to be set
563
- *
564
- * Set a request's retransmit timeout based on the transport's
565
- * default timeout parameters. Used by transports that don't adjust
566
- * the retransmit timeout based on round-trip time estimation.
567
- */
568
-void xprt_set_retrans_timeout_def(struct rpc_task *task)
617
+static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
569618 {
570
- task->tk_timeout = task->tk_rqstp->rq_timeout;
619
+ s64 delta = ktime_to_ns(ktime_get() - abstime);
620
+ return likely(delta >= 0) ?
621
+ jiffies - nsecs_to_jiffies(delta) :
622
+ jiffies + nsecs_to_jiffies(-delta);
571623 }
572
-EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
573624
574
-/**
575
- * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
576
- * @task: task whose timeout is to be set
577
- *
578
- * Set a request's retransmit timeout using the RTT estimator.
579
- */
580
-void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
625
+static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
581626 {
582
- int timer = task->tk_msg.rpc_proc->p_timer;
583
- struct rpc_clnt *clnt = task->tk_client;
584
- struct rpc_rtt *rtt = clnt->cl_rtt;
585
- struct rpc_rqst *req = task->tk_rqstp;
586
- unsigned long max_timeout = clnt->cl_timeout->to_maxval;
627
+ const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
628
+ unsigned long majortimeo = req->rq_timeout;
587629
588
- task->tk_timeout = rpc_calc_rto(rtt, timer);
589
- task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
590
- if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
591
- task->tk_timeout = max_timeout;
630
+ if (to->to_exponential)
631
+ majortimeo <<= to->to_retries;
632
+ else
633
+ majortimeo += to->to_increment * to->to_retries;
634
+ if (majortimeo > to->to_maxval || majortimeo == 0)
635
+ majortimeo = to->to_maxval;
636
+ return majortimeo;
592637 }
593
-EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
594638
595639 static void xprt_reset_majortimeo(struct rpc_rqst *req)
596640 {
597
- const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
641
+ req->rq_majortimeo += xprt_calc_majortimeo(req);
642
+}
598643
599
- req->rq_majortimeo = req->rq_timeout;
600
- if (to->to_exponential)
601
- req->rq_majortimeo <<= to->to_retries;
644
+static void xprt_reset_minortimeo(struct rpc_rqst *req)
645
+{
646
+ req->rq_minortimeo += req->rq_timeout;
647
+}
648
+
649
+static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
650
+{
651
+ unsigned long time_init;
652
+ struct rpc_xprt *xprt = req->rq_xprt;
653
+
654
+ if (likely(xprt && xprt_connected(xprt)))
655
+ time_init = jiffies;
602656 else
603
- req->rq_majortimeo += to->to_increment * to->to_retries;
604
- if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
605
- req->rq_majortimeo = to->to_maxval;
606
- req->rq_majortimeo += jiffies;
657
+ time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
658
+ req->rq_timeout = task->tk_client->cl_timeout->to_initval;
659
+ req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
660
+ req->rq_minortimeo = time_init + req->rq_timeout;
607661 }
608662
609663 /**
....@@ -618,6 +672,8 @@
618672 int status = 0;
619673
620674 if (time_before(jiffies, req->rq_majortimeo)) {
675
+ if (time_before(jiffies, req->rq_minortimeo))
676
+ return status;
621677 if (to->to_exponential)
622678 req->rq_timeout <<= 1;
623679 else
....@@ -630,11 +686,12 @@
630686 req->rq_retries = 0;
631687 xprt_reset_majortimeo(req);
632688 /* Reset the RTT counters == "slow start" */
633
- spin_lock_bh(&xprt->transport_lock);
689
+ spin_lock(&xprt->transport_lock);
634690 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
635
- spin_unlock_bh(&xprt->transport_lock);
691
+ spin_unlock(&xprt->transport_lock);
636692 status = -ETIMEDOUT;
637693 }
694
+ xprt_reset_minortimeo(req);
638695
639696 if (req->rq_timeout == 0) {
640697 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
....@@ -647,11 +704,14 @@
647704 {
648705 struct rpc_xprt *xprt =
649706 container_of(work, struct rpc_xprt, task_cleanup);
707
+ unsigned int pflags = memalloc_nofs_save();
650708
709
+ trace_xprt_disconnect_auto(xprt);
651710 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
652711 xprt->ops->close(xprt);
653712 xprt_release_write(xprt, NULL);
654713 wake_up_bit(&xprt->state, XPRT_LOCKED);
714
+ memalloc_nofs_restore(pflags);
655715 }
656716
657717 /**
....@@ -661,13 +721,30 @@
661721 */
662722 void xprt_disconnect_done(struct rpc_xprt *xprt)
663723 {
664
- dprintk("RPC: disconnected transport %p\n", xprt);
665
- spin_lock_bh(&xprt->transport_lock);
724
+ trace_xprt_disconnect_done(xprt);
725
+ spin_lock(&xprt->transport_lock);
666726 xprt_clear_connected(xprt);
667
- xprt_wake_pending_tasks(xprt, -EAGAIN);
668
- spin_unlock_bh(&xprt->transport_lock);
727
+ xprt_clear_write_space_locked(xprt);
728
+ xprt_clear_congestion_window_wait_locked(xprt);
729
+ xprt_wake_pending_tasks(xprt, -ENOTCONN);
730
+ spin_unlock(&xprt->transport_lock);
669731 }
670732 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
733
+
734
+/**
735
+ * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call
736
+ * @xprt: transport to disconnect
737
+ */
738
+static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
739
+{
740
+ if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state))
741
+ return;
742
+ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
743
+ queue_work(xprtiod_workqueue, &xprt->task_cleanup);
744
+ else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
745
+ rpc_wake_up_queued_task_set_status(&xprt->pending,
746
+ xprt->snd_task, -ENOTCONN);
747
+}
671748
672749 /**
673750 * xprt_force_disconnect - force a transport to disconnect
....@@ -676,16 +753,30 @@
676753 */
677754 void xprt_force_disconnect(struct rpc_xprt *xprt)
678755 {
756
+ trace_xprt_disconnect_force(xprt);
757
+
679758 /* Don't race with the test_bit() in xprt_clear_locked() */
680
- spin_lock_bh(&xprt->transport_lock);
681
- set_bit(XPRT_CLOSE_WAIT, &xprt->state);
682
- /* Try to schedule an autoclose RPC call */
683
- if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
684
- queue_work(xprtiod_workqueue, &xprt->task_cleanup);
685
- xprt_wake_pending_tasks(xprt, -EAGAIN);
686
- spin_unlock_bh(&xprt->transport_lock);
759
+ spin_lock(&xprt->transport_lock);
760
+ xprt_schedule_autoclose_locked(xprt);
761
+ spin_unlock(&xprt->transport_lock);
687762 }
688763 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
764
+
765
+static unsigned int
766
+xprt_connect_cookie(struct rpc_xprt *xprt)
767
+{
768
+ return READ_ONCE(xprt->connect_cookie);
769
+}
770
+
771
+static bool
772
+xprt_request_retransmit_after_disconnect(struct rpc_task *task)
773
+{
774
+ struct rpc_rqst *req = task->tk_rqstp;
775
+ struct rpc_xprt *xprt = req->rq_xprt;
776
+
777
+ return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
778
+ !xprt_connected(xprt);
779
+}
689780
690781 /**
691782 * xprt_conditional_disconnect - force a transport to disconnect
....@@ -701,18 +792,14 @@
701792 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
702793 {
703794 /* Don't race with the test_bit() in xprt_clear_locked() */
704
- spin_lock_bh(&xprt->transport_lock);
795
+ spin_lock(&xprt->transport_lock);
705796 if (cookie != xprt->connect_cookie)
706797 goto out;
707798 if (test_bit(XPRT_CLOSING, &xprt->state))
708799 goto out;
709
- set_bit(XPRT_CLOSE_WAIT, &xprt->state);
710
- /* Try to schedule an autoclose RPC call */
711
- if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
712
- queue_work(xprtiod_workqueue, &xprt->task_cleanup);
713
- xprt_wake_pending_tasks(xprt, -EAGAIN);
800
+ xprt_schedule_autoclose_locked(xprt);
714801 out:
715
- spin_unlock_bh(&xprt->transport_lock);
802
+ spin_unlock(&xprt->transport_lock);
716803 }
717804
718805 static bool
....@@ -725,7 +812,8 @@
725812 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
726813 __must_hold(&xprt->transport_lock)
727814 {
728
- if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
815
+ xprt->last_used = jiffies;
816
+ if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
729817 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
730818 }
731819
....@@ -734,18 +822,13 @@
734822 {
735823 struct rpc_xprt *xprt = from_timer(xprt, t, timer);
736824
737
- spin_lock(&xprt->transport_lock);
738
- if (!list_empty(&xprt->recv))
739
- goto out_abort;
825
+ if (!RB_EMPTY_ROOT(&xprt->recv_queue))
826
+ return;
740827 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
741828 xprt->last_used = jiffies;
742829 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
743
- goto out_abort;
744
- spin_unlock(&xprt->transport_lock);
830
+ return;
745831 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
746
- return;
747
-out_abort:
748
- spin_unlock(&xprt->transport_lock);
749832 }
750833
751834 bool xprt_lock_connect(struct rpc_xprt *xprt,
....@@ -754,33 +837,36 @@
754837 {
755838 bool ret = false;
756839
757
- spin_lock_bh(&xprt->transport_lock);
840
+ spin_lock(&xprt->transport_lock);
758841 if (!test_bit(XPRT_LOCKED, &xprt->state))
759842 goto out;
760843 if (xprt->snd_task != task)
761844 goto out;
762
- xprt_task_clear_bytes_sent(task);
845
+ set_bit(XPRT_SND_IS_COOKIE, &xprt->state);
763846 xprt->snd_task = cookie;
764847 ret = true;
765848 out:
766
- spin_unlock_bh(&xprt->transport_lock);
849
+ spin_unlock(&xprt->transport_lock);
767850 return ret;
768851 }
852
+EXPORT_SYMBOL_GPL(xprt_lock_connect);
769853
770854 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
771855 {
772
- spin_lock_bh(&xprt->transport_lock);
856
+ spin_lock(&xprt->transport_lock);
773857 if (xprt->snd_task != cookie)
774858 goto out;
775859 if (!test_bit(XPRT_LOCKED, &xprt->state))
776860 goto out;
777861 xprt->snd_task =NULL;
862
+ clear_bit(XPRT_SND_IS_COOKIE, &xprt->state);
778863 xprt->ops->release_xprt(xprt, NULL);
779864 xprt_schedule_autodisconnect(xprt);
780865 out:
781
- spin_unlock_bh(&xprt->transport_lock);
866
+ spin_unlock(&xprt->transport_lock);
782867 wake_up_bit(&xprt->state, XPRT_LOCKED);
783868 }
869
+EXPORT_SYMBOL_GPL(xprt_unlock_connect);
784870
785871 /**
786872 * xprt_connect - schedule a transport connect operation
....@@ -791,8 +877,7 @@
791877 {
792878 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
793879
794
- dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
795
- xprt, (xprt_connected(xprt) ? "is" : "is not"));
880
+ trace_xprt_connect(xprt);
796881
797882 if (!xprt_bound(xprt)) {
798883 task->tk_status = -EAGAIN;
....@@ -801,14 +886,10 @@
801886 if (!xprt_lock_write(xprt, task))
802887 return;
803888
804
- if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
805
- xprt->ops->close(xprt);
806
-
807
- if (!xprt_connected(xprt)) {
808
- task->tk_rqstp->rq_bytes_sent = 0;
809
- task->tk_timeout = task->tk_rqstp->rq_timeout;
889
+ if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
810890 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
811
- rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
891
+ rpc_sleep_on_timeout(&xprt->pending, task, NULL,
892
+ xprt_request_timeout(task->tk_rqstp));
812893
813894 if (test_bit(XPRT_CLOSING, &xprt->state))
814895 return;
....@@ -827,32 +908,105 @@
827908 xprt_release_write(xprt, task);
828909 }
829910
830
-static void xprt_connect_status(struct rpc_task *task)
911
+/**
912
+ * xprt_reconnect_delay - compute the wait before scheduling a connect
913
+ * @xprt: transport instance
914
+ *
915
+ */
916
+unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
831917 {
832
- switch (task->tk_status) {
833
- case 0:
834
- dprintk("RPC: %5u xprt_connect_status: connection established\n",
835
- task->tk_pid);
836
- break;
837
- case -ECONNREFUSED:
838
- case -ECONNRESET:
839
- case -ECONNABORTED:
840
- case -ENETUNREACH:
841
- case -EHOSTUNREACH:
842
- case -EPIPE:
843
- case -EAGAIN:
844
- dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
845
- break;
846
- case -ETIMEDOUT:
847
- dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
848
- "out\n", task->tk_pid);
849
- break;
850
- default:
851
- dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
852
- "server %s\n", task->tk_pid, -task->tk_status,
853
- task->tk_rqstp->rq_xprt->servername);
854
- task->tk_status = -EIO;
918
+ unsigned long start, now = jiffies;
919
+
920
+ start = xprt->stat.connect_start + xprt->reestablish_timeout;
921
+ if (time_after(start, now))
922
+ return start - now;
923
+ return 0;
924
+}
925
+EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
926
+
927
+/**
928
+ * xprt_reconnect_backoff - compute the new re-establish timeout
929
+ * @xprt: transport instance
930
+ * @init_to: initial reestablish timeout
931
+ *
932
+ */
933
+void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
934
+{
935
+ xprt->reestablish_timeout <<= 1;
936
+ if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
937
+ xprt->reestablish_timeout = xprt->max_reconnect_timeout;
938
+ if (xprt->reestablish_timeout < init_to)
939
+ xprt->reestablish_timeout = init_to;
940
+}
941
+EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
942
+
943
+enum xprt_xid_rb_cmp {
944
+ XID_RB_EQUAL,
945
+ XID_RB_LEFT,
946
+ XID_RB_RIGHT,
947
+};
948
+static enum xprt_xid_rb_cmp
949
+xprt_xid_cmp(__be32 xid1, __be32 xid2)
950
+{
951
+ if (xid1 == xid2)
952
+ return XID_RB_EQUAL;
953
+ if ((__force u32)xid1 < (__force u32)xid2)
954
+ return XID_RB_LEFT;
955
+ return XID_RB_RIGHT;
956
+}
957
+
958
+static struct rpc_rqst *
959
+xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
960
+{
961
+ struct rb_node *n = xprt->recv_queue.rb_node;
962
+ struct rpc_rqst *req;
963
+
964
+ while (n != NULL) {
965
+ req = rb_entry(n, struct rpc_rqst, rq_recv);
966
+ switch (xprt_xid_cmp(xid, req->rq_xid)) {
967
+ case XID_RB_LEFT:
968
+ n = n->rb_left;
969
+ break;
970
+ case XID_RB_RIGHT:
971
+ n = n->rb_right;
972
+ break;
973
+ case XID_RB_EQUAL:
974
+ return req;
975
+ }
855976 }
977
+ return NULL;
978
+}
979
+
980
+static void
981
+xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
982
+{
983
+ struct rb_node **p = &xprt->recv_queue.rb_node;
984
+ struct rb_node *n = NULL;
985
+ struct rpc_rqst *req;
986
+
987
+ while (*p != NULL) {
988
+ n = *p;
989
+ req = rb_entry(n, struct rpc_rqst, rq_recv);
990
+ switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
991
+ case XID_RB_LEFT:
992
+ p = &n->rb_left;
993
+ break;
994
+ case XID_RB_RIGHT:
995
+ p = &n->rb_right;
996
+ break;
997
+ case XID_RB_EQUAL:
998
+ WARN_ON_ONCE(new != req);
999
+ return;
1000
+ }
1001
+ }
1002
+ rb_link_node(&new->rq_recv, n, p);
1003
+ rb_insert_color(&new->rq_recv, &xprt->recv_queue);
1004
+}
1005
+
1006
+static void
1007
+xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
1008
+{
1009
+ rb_erase(&req->rq_recv, &xprt->recv_queue);
8561010 }
8571011
8581012 /**
....@@ -860,18 +1014,18 @@
8601014 * @xprt: transport on which the original request was transmitted
8611015 * @xid: RPC XID of incoming reply
8621016 *
863
- * Caller holds xprt->recv_lock.
1017
+ * Caller holds xprt->queue_lock.
8641018 */
8651019 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
8661020 {
8671021 struct rpc_rqst *entry;
8681022
869
- list_for_each_entry(entry, &xprt->recv, rq_list)
870
- if (entry->rq_xid == xid) {
871
- trace_xprt_lookup_rqst(xprt, xid, 0);
872
- entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
873
- return entry;
874
- }
1023
+ entry = xprt_request_rb_find(xprt, xid);
1024
+ if (entry != NULL) {
1025
+ trace_xprt_lookup_rqst(xprt, xid, 0);
1026
+ entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
1027
+ return entry;
1028
+ }
8751029
8761030 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
8771031 ntohl(xid));
....@@ -881,16 +1035,22 @@
8811035 }
8821036 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
8831037
1038
+static bool
1039
+xprt_is_pinned_rqst(struct rpc_rqst *req)
1040
+{
1041
+ return atomic_read(&req->rq_pin) != 0;
1042
+}
1043
+
8841044 /**
8851045 * xprt_pin_rqst - Pin a request on the transport receive list
8861046 * @req: Request to pin
8871047 *
8881048 * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
889
- * so should be holding the xprt transport lock.
1049
+ * so should be holding xprt->queue_lock.
8901050 */
8911051 void xprt_pin_rqst(struct rpc_rqst *req)
8921052 {
893
- set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
1053
+ atomic_inc(&req->rq_pin);
8941054 }
8951055 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
8961056
....@@ -898,38 +1058,88 @@
8981058 * xprt_unpin_rqst - Unpin a request on the transport receive list
8991059 * @req: Request to pin
9001060 *
901
- * Caller should be holding the xprt transport lock.
1061
+ * Caller should be holding xprt->queue_lock.
9021062 */
9031063 void xprt_unpin_rqst(struct rpc_rqst *req)
9041064 {
905
- struct rpc_task *task = req->rq_task;
906
-
907
- clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
908
- if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
909
- wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
1065
+ if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1066
+ atomic_dec(&req->rq_pin);
1067
+ return;
1068
+ }
1069
+ if (atomic_dec_and_test(&req->rq_pin))
1070
+ wake_up_var(&req->rq_pin);
9101071 }
9111072 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
9121073
9131074 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
914
-__must_hold(&req->rq_xprt->recv_lock)
9151075 {
916
- struct rpc_task *task = req->rq_task;
1076
+ wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1077
+}
9171078
918
- if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
919
- spin_unlock(&req->rq_xprt->recv_lock);
920
- set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
921
- wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
922
- TASK_UNINTERRUPTIBLE);
923
- clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
924
- spin_lock(&req->rq_xprt->recv_lock);
925
- }
1079
+static bool
1080
+xprt_request_data_received(struct rpc_task *task)
1081
+{
1082
+ return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1083
+ READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1084
+}
1085
+
1086
+static bool
1087
+xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1088
+{
1089
+ return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1090
+ READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1091
+}
1092
+
1093
+/**
1094
+ * xprt_request_enqueue_receive - Add an request to the receive queue
1095
+ * @task: RPC task
1096
+ *
1097
+ */
1098
+void
1099
+xprt_request_enqueue_receive(struct rpc_task *task)
1100
+{
1101
+ struct rpc_rqst *req = task->tk_rqstp;
1102
+ struct rpc_xprt *xprt = req->rq_xprt;
1103
+
1104
+ if (!xprt_request_need_enqueue_receive(task, req))
1105
+ return;
1106
+
1107
+ xprt_request_prepare(task->tk_rqstp);
1108
+ spin_lock(&xprt->queue_lock);
1109
+
1110
+ /* Update the softirq receive buffer */
1111
+ memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1112
+ sizeof(req->rq_private_buf));
1113
+
1114
+ /* Add request to the receive list */
1115
+ xprt_request_rb_insert(xprt, req);
1116
+ set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1117
+ spin_unlock(&xprt->queue_lock);
1118
+
1119
+ /* Turn off autodisconnect */
1120
+ del_singleshot_timer_sync(&xprt->timer);
1121
+}
1122
+
1123
+/**
1124
+ * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1125
+ * @task: RPC task
1126
+ *
1127
+ * Caller must hold xprt->queue_lock.
1128
+ */
1129
+static void
1130
+xprt_request_dequeue_receive_locked(struct rpc_task *task)
1131
+{
1132
+ struct rpc_rqst *req = task->tk_rqstp;
1133
+
1134
+ if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1135
+ xprt_request_rb_remove(req->rq_xprt, req);
9261136 }
9271137
9281138 /**
9291139 * xprt_update_rtt - Update RPC RTT statistics
9301140 * @task: RPC request that recently completed
9311141 *
932
- * Caller holds xprt->recv_lock.
1142
+ * Caller holds xprt->queue_lock.
9331143 */
9341144 void xprt_update_rtt(struct rpc_task *task)
9351145 {
....@@ -951,25 +1161,21 @@
9511161 * @task: RPC request that recently completed
9521162 * @copied: actual number of bytes received from the transport
9531163 *
954
- * Caller holds xprt->recv_lock.
1164
+ * Caller holds xprt->queue_lock.
9551165 */
9561166 void xprt_complete_rqst(struct rpc_task *task, int copied)
9571167 {
9581168 struct rpc_rqst *req = task->tk_rqstp;
9591169 struct rpc_xprt *xprt = req->rq_xprt;
9601170
961
- dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
962
- task->tk_pid, ntohl(req->rq_xid), copied);
963
- trace_xprt_complete_rqst(xprt, req->rq_xid, copied);
964
-
9651171 xprt->stat.recvs++;
9661172
967
- list_del_init(&req->rq_list);
9681173 req->rq_private_buf.len = copied;
9691174 /* Ensure all writes are done before we update */
9701175 /* req->rq_reply_bytes_recvd */
9711176 smp_wmb();
9721177 req->rq_reply_bytes_recvd = copied;
1178
+ xprt_request_dequeue_receive_locked(task);
9731179 rpc_wake_up_queued_task(&xprt->pending, task);
9741180 }
9751181 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
....@@ -991,6 +1197,234 @@
9911197 }
9921198
9931199 /**
1200
+ * xprt_wait_for_reply_request_def - wait for reply
1201
+ * @task: pointer to rpc_task
1202
+ *
1203
+ * Set a request's retransmit timeout based on the transport's
1204
+ * default timeout parameters. Used by transports that don't adjust
1205
+ * the retransmit timeout based on round-trip time estimation,
1206
+ * and put the task to sleep on the pending queue.
1207
+ */
1208
+void xprt_wait_for_reply_request_def(struct rpc_task *task)
1209
+{
1210
+ struct rpc_rqst *req = task->tk_rqstp;
1211
+
1212
+ rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1213
+ xprt_request_timeout(req));
1214
+}
1215
+EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1216
+
1217
+/**
1218
+ * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1219
+ * @task: pointer to rpc_task
1220
+ *
1221
+ * Set a request's retransmit timeout using the RTT estimator,
1222
+ * and put the task to sleep on the pending queue.
1223
+ */
1224
+void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1225
+{
1226
+ int timer = task->tk_msg.rpc_proc->p_timer;
1227
+ struct rpc_clnt *clnt = task->tk_client;
1228
+ struct rpc_rtt *rtt = clnt->cl_rtt;
1229
+ struct rpc_rqst *req = task->tk_rqstp;
1230
+ unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1231
+ unsigned long timeout;
1232
+
1233
+ timeout = rpc_calc_rto(rtt, timer);
1234
+ timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1235
+ if (timeout > max_timeout || timeout == 0)
1236
+ timeout = max_timeout;
1237
+ rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1238
+ jiffies + timeout);
1239
+}
1240
+EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1241
+
1242
+/**
1243
+ * xprt_request_wait_receive - wait for the reply to an RPC request
1244
+ * @task: RPC task about to send a request
1245
+ *
1246
+ */
1247
+void xprt_request_wait_receive(struct rpc_task *task)
1248
+{
1249
+ struct rpc_rqst *req = task->tk_rqstp;
1250
+ struct rpc_xprt *xprt = req->rq_xprt;
1251
+
1252
+ if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1253
+ return;
1254
+ /*
1255
+ * Sleep on the pending queue if we're expecting a reply.
1256
+ * The spinlock ensures atomicity between the test of
1257
+ * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1258
+ */
1259
+ spin_lock(&xprt->queue_lock);
1260
+ if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1261
+ xprt->ops->wait_for_reply_request(task);
1262
+ /*
1263
+ * Send an extra queue wakeup call if the
1264
+ * connection was dropped in case the call to
1265
+ * rpc_sleep_on() raced.
1266
+ */
1267
+ if (xprt_request_retransmit_after_disconnect(task))
1268
+ rpc_wake_up_queued_task_set_status(&xprt->pending,
1269
+ task, -ENOTCONN);
1270
+ }
1271
+ spin_unlock(&xprt->queue_lock);
1272
+}
1273
+
1274
+static bool
1275
+xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1276
+{
1277
+ return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1278
+}
1279
+
1280
+/**
1281
+ * xprt_request_enqueue_transmit - queue a task for transmission
1282
+ * @task: pointer to rpc_task
1283
+ *
1284
+ * Add a task to the transmission queue.
1285
+ */
1286
+void
1287
+xprt_request_enqueue_transmit(struct rpc_task *task)
1288
+{
1289
+ struct rpc_rqst *pos, *req = task->tk_rqstp;
1290
+ struct rpc_xprt *xprt = req->rq_xprt;
1291
+
1292
+ if (xprt_request_need_enqueue_transmit(task, req)) {
1293
+ req->rq_bytes_sent = 0;
1294
+ spin_lock(&xprt->queue_lock);
1295
+ /*
1296
+ * Requests that carry congestion control credits are added
1297
+ * to the head of the list to avoid starvation issues.
1298
+ */
1299
+ if (req->rq_cong) {
1300
+ xprt_clear_congestion_window_wait(xprt);
1301
+ list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1302
+ if (pos->rq_cong)
1303
+ continue;
1304
+ /* Note: req is added _before_ pos */
1305
+ list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1306
+ INIT_LIST_HEAD(&req->rq_xmit2);
1307
+ goto out;
1308
+ }
1309
+ } else if (!req->rq_seqno) {
1310
+ list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1311
+ if (pos->rq_task->tk_owner != task->tk_owner)
1312
+ continue;
1313
+ list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1314
+ INIT_LIST_HEAD(&req->rq_xmit);
1315
+ goto out;
1316
+ }
1317
+ }
1318
+ list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1319
+ INIT_LIST_HEAD(&req->rq_xmit2);
1320
+out:
1321
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1322
+ spin_unlock(&xprt->queue_lock);
1323
+ }
1324
+}
1325
+
1326
+/**
1327
+ * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1328
+ * @task: pointer to rpc_task
1329
+ *
1330
+ * Remove a task from the transmission queue
1331
+ * Caller must hold xprt->queue_lock
1332
+ */
1333
+static void
1334
+xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1335
+{
1336
+ struct rpc_rqst *req = task->tk_rqstp;
1337
+
1338
+ if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1339
+ return;
1340
+ if (!list_empty(&req->rq_xmit)) {
1341
+ list_del(&req->rq_xmit);
1342
+ if (!list_empty(&req->rq_xmit2)) {
1343
+ struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1344
+ struct rpc_rqst, rq_xmit2);
1345
+ list_del(&req->rq_xmit2);
1346
+ list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1347
+ }
1348
+ } else
1349
+ list_del(&req->rq_xmit2);
1350
+}
1351
+
1352
+/**
1353
+ * xprt_request_dequeue_transmit - remove a task from the transmission queue
1354
+ * @task: pointer to rpc_task
1355
+ *
1356
+ * Remove a task from the transmission queue
1357
+ */
1358
+static void
1359
+xprt_request_dequeue_transmit(struct rpc_task *task)
1360
+{
1361
+ struct rpc_rqst *req = task->tk_rqstp;
1362
+ struct rpc_xprt *xprt = req->rq_xprt;
1363
+
1364
+ spin_lock(&xprt->queue_lock);
1365
+ xprt_request_dequeue_transmit_locked(task);
1366
+ spin_unlock(&xprt->queue_lock);
1367
+}
1368
+
1369
+/**
1370
+ * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
1371
+ * @task: pointer to rpc_task
1372
+ *
1373
+ * Remove a task from the transmit and receive queues, and ensure that
1374
+ * it is not pinned by the receive work item.
1375
+ */
1376
+void
1377
+xprt_request_dequeue_xprt(struct rpc_task *task)
1378
+{
1379
+ struct rpc_rqst *req = task->tk_rqstp;
1380
+ struct rpc_xprt *xprt = req->rq_xprt;
1381
+
1382
+ if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1383
+ test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1384
+ xprt_is_pinned_rqst(req)) {
1385
+ spin_lock(&xprt->queue_lock);
1386
+ xprt_request_dequeue_transmit_locked(task);
1387
+ xprt_request_dequeue_receive_locked(task);
1388
+ while (xprt_is_pinned_rqst(req)) {
1389
+ set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1390
+ spin_unlock(&xprt->queue_lock);
1391
+ xprt_wait_on_pinned_rqst(req);
1392
+ spin_lock(&xprt->queue_lock);
1393
+ clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1394
+ }
1395
+ spin_unlock(&xprt->queue_lock);
1396
+ }
1397
+}
1398
+
1399
+/**
1400
+ * xprt_request_prepare - prepare an encoded request for transport
1401
+ * @req: pointer to rpc_rqst
1402
+ *
1403
+ * Calls into the transport layer to do whatever is needed to prepare
1404
+ * the request for transmission or receive.
1405
+ */
1406
+void
1407
+xprt_request_prepare(struct rpc_rqst *req)
1408
+{
1409
+ struct rpc_xprt *xprt = req->rq_xprt;
1410
+
1411
+ if (xprt->ops->prepare_request)
1412
+ xprt->ops->prepare_request(req);
1413
+}
1414
+
1415
+/**
1416
+ * xprt_request_need_retransmit - Test if a task needs retransmission
1417
+ * @task: pointer to rpc_task
1418
+ *
1419
+ * Test for whether a connection breakage requires the task to retransmit
1420
+ */
1421
+bool
1422
+xprt_request_need_retransmit(struct rpc_task *task)
1423
+{
1424
+ return xprt_request_retransmit_after_disconnect(task);
1425
+}
1426
+
1427
+/**
9941428 * xprt_prepare_transmit - reserve the transport before sending a request
9951429 * @task: RPC task about to send a request
9961430 *
....@@ -999,128 +1433,177 @@
9991433 {
10001434 struct rpc_rqst *req = task->tk_rqstp;
10011435 struct rpc_xprt *xprt = req->rq_xprt;
1002
- bool ret = false;
10031436
1004
- dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
1437
+ if (!xprt_lock_write(xprt, task)) {
1438
+ /* Race breaker: someone may have transmitted us */
1439
+ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1440
+ rpc_wake_up_queued_task_set_status(&xprt->sending,
1441
+ task, 0);
1442
+ return false;
10051443
1006
- spin_lock_bh(&xprt->transport_lock);
1007
- if (!req->rq_bytes_sent) {
1008
- if (req->rq_reply_bytes_recvd) {
1009
- task->tk_status = req->rq_reply_bytes_recvd;
1010
- goto out_unlock;
1011
- }
1012
- if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
1013
- && xprt_connected(xprt)
1014
- && req->rq_connect_cookie == xprt->connect_cookie) {
1015
- xprt->ops->set_retrans_timeout(task);
1016
- rpc_sleep_on(&xprt->pending, task, xprt_timer);
1017
- goto out_unlock;
1018
- }
10191444 }
1020
- if (!xprt->ops->reserve_xprt(xprt, task)) {
1021
- task->tk_status = -EAGAIN;
1022
- goto out_unlock;
1023
- }
1024
- ret = true;
1025
-out_unlock:
1026
- spin_unlock_bh(&xprt->transport_lock);
1027
- return ret;
1445
+ return true;
10281446 }
10291447
10301448 void xprt_end_transmit(struct rpc_task *task)
10311449 {
1032
- xprt_release_write(task->tk_rqstp->rq_xprt, task);
1450
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1451
+
1452
+ xprt_inject_disconnect(xprt);
1453
+ xprt_release_write(xprt, task);
10331454 }
10341455
10351456 /**
1036
- * xprt_transmit - send an RPC request on a transport
1037
- * @task: controlling RPC task
1457
+ * xprt_request_transmit - send an RPC request on a transport
1458
+ * @req: pointer to request to transmit
1459
+ * @snd_task: RPC task that owns the transport lock
10381460 *
1039
- * We have to copy the iovec because sendmsg fiddles with its contents.
1461
+ * This performs the transmission of a single request.
1462
+ * Note that if the request is not the same as snd_task, then it
1463
+ * does need to be pinned.
1464
+ * Returns '0' on success.
10401465 */
1041
-void xprt_transmit(struct rpc_task *task)
1466
+static int
1467
+xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
10421468 {
1043
- struct rpc_rqst *req = task->tk_rqstp;
1044
- struct rpc_xprt *xprt = req->rq_xprt;
1469
+ struct rpc_xprt *xprt = req->rq_xprt;
1470
+ struct rpc_task *task = req->rq_task;
10451471 unsigned int connect_cookie;
1472
+ int is_retrans = RPC_WAS_SENT(task);
10461473 int status;
10471474
1048
- dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
1049
-
1050
- if (!req->rq_reply_bytes_recvd) {
1051
- if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
1052
- /*
1053
- * Add to the list only if we're expecting a reply
1054
- */
1055
- /* Update the softirq receive buffer */
1056
- memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1057
- sizeof(req->rq_private_buf));
1058
- /* Add request to the receive list */
1059
- spin_lock(&xprt->recv_lock);
1060
- list_add_tail(&req->rq_list, &xprt->recv);
1061
- spin_unlock(&xprt->recv_lock);
1062
- xprt_reset_majortimeo(req);
1063
- /* Turn off autodisconnect */
1064
- del_singleshot_timer_sync(&xprt->timer);
1475
+ if (!req->rq_bytes_sent) {
1476
+ if (xprt_request_data_received(task)) {
1477
+ status = 0;
1478
+ goto out_dequeue;
10651479 }
1066
- } else if (!req->rq_bytes_sent)
1067
- return;
1068
-
1069
- connect_cookie = xprt->connect_cookie;
1070
- status = xprt->ops->send_request(task);
1071
- trace_xprt_transmit(xprt, req->rq_xid, status);
1072
- if (status != 0) {
1073
- task->tk_status = status;
1074
- return;
1480
+ /* Verify that our message lies in the RPCSEC_GSS window */
1481
+ if (rpcauth_xmit_need_reencode(task)) {
1482
+ status = -EBADMSG;
1483
+ goto out_dequeue;
1484
+ }
1485
+ if (RPC_SIGNALLED(task)) {
1486
+ status = -ERESTARTSYS;
1487
+ goto out_dequeue;
1488
+ }
10751489 }
1490
+
1491
+ /*
1492
+ * Update req->rq_ntrans before transmitting to avoid races with
1493
+ * xprt_update_rtt(), which needs to know that it is recording a
1494
+ * reply to the first transmission.
1495
+ */
1496
+ req->rq_ntrans++;
1497
+
1498
+ trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1499
+ connect_cookie = xprt->connect_cookie;
1500
+ status = xprt->ops->send_request(req);
1501
+ if (status != 0) {
1502
+ req->rq_ntrans--;
1503
+ trace_xprt_transmit(req, status);
1504
+ return status;
1505
+ }
1506
+
1507
+ if (is_retrans)
1508
+ task->tk_client->cl_stats->rpcretrans++;
1509
+
10761510 xprt_inject_disconnect(xprt);
10771511
1078
- dprintk("RPC: %5u xmit complete\n", task->tk_pid);
10791512 task->tk_flags |= RPC_TASK_SENT;
1080
- spin_lock_bh(&xprt->transport_lock);
1081
-
1082
- xprt->ops->set_retrans_timeout(task);
1513
+ spin_lock(&xprt->transport_lock);
10831514
10841515 xprt->stat.sends++;
10851516 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
10861517 xprt->stat.bklog_u += xprt->backlog.qlen;
10871518 xprt->stat.sending_u += xprt->sending.qlen;
10881519 xprt->stat.pending_u += xprt->pending.qlen;
1089
- spin_unlock_bh(&xprt->transport_lock);
1520
+ spin_unlock(&xprt->transport_lock);
10901521
10911522 req->rq_connect_cookie = connect_cookie;
1092
- if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) {
1093
- /*
1094
- * Sleep on the pending queue if we're expecting a reply.
1095
- * The spinlock ensures atomicity between the test of
1096
- * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1097
- */
1098
- spin_lock(&xprt->recv_lock);
1099
- if (!req->rq_reply_bytes_recvd) {
1100
- rpc_sleep_on(&xprt->pending, task, xprt_timer);
1101
- /*
1102
- * Send an extra queue wakeup call if the
1103
- * connection was dropped in case the call to
1104
- * rpc_sleep_on() raced.
1105
- */
1106
- if (!xprt_connected(xprt))
1107
- xprt_wake_pending_tasks(xprt, -ENOTCONN);
1108
- }
1109
- spin_unlock(&xprt->recv_lock);
1110
- }
1523
+out_dequeue:
1524
+ trace_xprt_transmit(req, status);
1525
+ xprt_request_dequeue_transmit(task);
1526
+ rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1527
+ return status;
11111528 }
11121529
1113
-static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1530
+/**
1531
+ * xprt_transmit - send an RPC request on a transport
1532
+ * @task: controlling RPC task
1533
+ *
1534
+ * Attempts to drain the transmit queue. On exit, either the transport
1535
+ * signalled an error that needs to be handled before transmission can
1536
+ * resume, or @task finished transmitting, and detected that it already
1537
+ * received a reply.
1538
+ */
1539
+void
1540
+xprt_transmit(struct rpc_task *task)
1541
+{
1542
+ struct rpc_rqst *next, *req = task->tk_rqstp;
1543
+ struct rpc_xprt *xprt = req->rq_xprt;
1544
+ int status;
1545
+
1546
+ spin_lock(&xprt->queue_lock);
1547
+ for (;;) {
1548
+ next = list_first_entry_or_null(&xprt->xmit_queue,
1549
+ struct rpc_rqst, rq_xmit);
1550
+ if (!next)
1551
+ break;
1552
+ xprt_pin_rqst(next);
1553
+ spin_unlock(&xprt->queue_lock);
1554
+ status = xprt_request_transmit(next, task);
1555
+ if (status == -EBADMSG && next != req)
1556
+ status = 0;
1557
+ spin_lock(&xprt->queue_lock);
1558
+ xprt_unpin_rqst(next);
1559
+ if (status < 0) {
1560
+ if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1561
+ task->tk_status = status;
1562
+ break;
1563
+ }
1564
+ /* Was @task transmitted, and has it received a reply? */
1565
+ if (xprt_request_data_received(task) &&
1566
+ !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1567
+ break;
1568
+ cond_resched_lock(&xprt->queue_lock);
1569
+ }
1570
+ spin_unlock(&xprt->queue_lock);
1571
+}
1572
+
1573
+static void xprt_complete_request_init(struct rpc_task *task)
1574
+{
1575
+ if (task->tk_rqstp)
1576
+ xprt_request_init(task);
1577
+}
1578
+
1579
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
11141580 {
11151581 set_bit(XPRT_CONGESTED, &xprt->state);
1116
- rpc_sleep_on(&xprt->backlog, task, NULL);
1582
+ rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
1583
+}
1584
+EXPORT_SYMBOL_GPL(xprt_add_backlog);
1585
+
1586
+static bool __xprt_set_rq(struct rpc_task *task, void *data)
1587
+{
1588
+ struct rpc_rqst *req = data;
1589
+
1590
+ if (task->tk_rqstp == NULL) {
1591
+ memset(req, 0, sizeof(*req)); /* mark unused */
1592
+ task->tk_rqstp = req;
1593
+ return true;
1594
+ }
1595
+ return false;
11171596 }
11181597
1119
-static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
1598
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
11201599 {
1121
- if (rpc_wake_up_next(&xprt->backlog) == NULL)
1600
+ if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
11221601 clear_bit(XPRT_CONGESTED, &xprt->state);
1602
+ return false;
1603
+ }
1604
+ return true;
11231605 }
1606
+EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
11241607
11251608 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
11261609 {
....@@ -1130,7 +1613,7 @@
11301613 goto out;
11311614 spin_lock(&xprt->reserve_lock);
11321615 if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1133
- rpc_sleep_on(&xprt->backlog, task, NULL);
1616
+ xprt_add_backlog(xprt, task);
11341617 ret = true;
11351618 }
11361619 spin_unlock(&xprt->reserve_lock);
....@@ -1141,12 +1624,15 @@
11411624 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
11421625 {
11431626 struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1627
+ gfp_t gfp_mask = GFP_KERNEL;
11441628
11451629 if (xprt->num_reqs >= xprt->max_reqs)
11461630 goto out;
11471631 ++xprt->num_reqs;
11481632 spin_unlock(&xprt->reserve_lock);
1149
- req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
1633
+ if (current->flags & PF_WQ_WORKER)
1634
+ gfp_mask |= __GFP_NORETRY | __GFP_NOWARN;
1635
+ req = kzalloc(sizeof(*req), gfp_mask);
11501636 spin_lock(&xprt->reserve_lock);
11511637 if (req != NULL)
11521638 goto out;
....@@ -1188,7 +1674,7 @@
11881674 case -EAGAIN:
11891675 xprt_add_backlog(xprt, task);
11901676 dprintk("RPC: waiting for request slot\n");
1191
- /* fall through */
1677
+ fallthrough;
11921678 default:
11931679 task->tk_status = -EAGAIN;
11941680 }
....@@ -1204,28 +1690,14 @@
12041690 }
12051691 EXPORT_SYMBOL_GPL(xprt_alloc_slot);
12061692
1207
-void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1208
-{
1209
- /* Note: grabbing the xprt_lock_write() ensures that we throttle
1210
- * new slot allocation if the transport is congested (i.e. when
1211
- * reconnecting a stream transport or when out of socket write
1212
- * buffer space).
1213
- */
1214
- if (xprt_lock_write(xprt, task)) {
1215
- xprt_alloc_slot(xprt, task);
1216
- xprt_release_write(xprt, task);
1217
- }
1218
-}
1219
-EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot);
1220
-
12211693 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
12221694 {
12231695 spin_lock(&xprt->reserve_lock);
1224
- if (!xprt_dynamic_free_slot(xprt, req)) {
1696
+ if (!xprt_wake_up_backlog(xprt, req) &&
1697
+ !xprt_dynamic_free_slot(xprt, req)) {
12251698 memset(req, 0, sizeof(*req)); /* mark unused */
12261699 list_add(&req->rq_list, &xprt->free);
12271700 }
1228
- xprt_wake_up_backlog(xprt);
12291701 spin_unlock(&xprt->reserve_lock);
12301702 }
12311703 EXPORT_SYMBOL_GPL(xprt_free_slot);
....@@ -1284,6 +1756,12 @@
12841756 }
12851757 EXPORT_SYMBOL_GPL(xprt_free);
12861758
1759
+static void
1760
+xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1761
+{
1762
+ req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1763
+}
1764
+
12871765 static __be32
12881766 xprt_alloc_xid(struct rpc_xprt *xprt)
12891767 {
....@@ -1307,22 +1785,21 @@
13071785 struct rpc_xprt *xprt = task->tk_xprt;
13081786 struct rpc_rqst *req = task->tk_rqstp;
13091787
1310
- INIT_LIST_HEAD(&req->rq_list);
1311
- req->rq_timeout = task->tk_client->cl_timeout->to_initval;
13121788 req->rq_task = task;
13131789 req->rq_xprt = xprt;
13141790 req->rq_buffer = NULL;
13151791 req->rq_xid = xprt_alloc_xid(xprt);
1316
- req->rq_connect_cookie = xprt->connect_cookie - 1;
1317
- req->rq_bytes_sent = 0;
1792
+ xprt_init_connect_cookie(req, xprt);
13181793 req->rq_snd_buf.len = 0;
13191794 req->rq_snd_buf.buflen = 0;
13201795 req->rq_rcv_buf.len = 0;
13211796 req->rq_rcv_buf.buflen = 0;
1797
+ req->rq_snd_buf.bvec = NULL;
1798
+ req->rq_rcv_buf.bvec = NULL;
13221799 req->rq_release_snd_buf = NULL;
1323
- xprt_reset_majortimeo(req);
1324
- dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
1325
- req, ntohl(req->rq_xid));
1800
+ xprt_init_majortimeo(task, req);
1801
+
1802
+ trace_xprt_reserve(req);
13261803 }
13271804
13281805 static void
....@@ -1349,7 +1826,6 @@
13491826 if (task->tk_rqstp != NULL)
13501827 return;
13511828
1352
- task->tk_timeout = 0;
13531829 task->tk_status = -EAGAIN;
13541830 if (!xprt_throttle_congested(xprt, task))
13551831 xprt_do_reserve(xprt, task);
....@@ -1372,7 +1848,6 @@
13721848 if (task->tk_rqstp != NULL)
13731849 return;
13741850
1375
- task->tk_timeout = 0;
13761851 task->tk_status = -EAGAIN;
13771852 xprt_do_reserve(xprt, task);
13781853 }
....@@ -1390,45 +1865,52 @@
13901865 if (req == NULL) {
13911866 if (task->tk_client) {
13921867 xprt = task->tk_xprt;
1393
- if (xprt->snd_task == task)
1394
- xprt_release_write(xprt, task);
1868
+ xprt_release_write(xprt, task);
13951869 }
13961870 return;
13971871 }
13981872
13991873 xprt = req->rq_xprt;
1400
- if (task->tk_ops->rpc_count_stats != NULL)
1401
- task->tk_ops->rpc_count_stats(task, task->tk_calldata);
1402
- else if (task->tk_client)
1403
- rpc_count_iostats(task, task->tk_client->cl_metrics);
1404
- spin_lock(&xprt->recv_lock);
1405
- if (!list_empty(&req->rq_list)) {
1406
- list_del_init(&req->rq_list);
1407
- xprt_wait_on_pinned_rqst(req);
1408
- }
1409
- spin_unlock(&xprt->recv_lock);
1410
- spin_lock_bh(&xprt->transport_lock);
1874
+ xprt_request_dequeue_xprt(task);
1875
+ spin_lock(&xprt->transport_lock);
14111876 xprt->ops->release_xprt(xprt, task);
14121877 if (xprt->ops->release_request)
14131878 xprt->ops->release_request(task);
1414
- xprt->last_used = jiffies;
14151879 xprt_schedule_autodisconnect(xprt);
1416
- spin_unlock_bh(&xprt->transport_lock);
1880
+ spin_unlock(&xprt->transport_lock);
14171881 if (req->rq_buffer)
14181882 xprt->ops->buf_free(task);
1419
- xprt_inject_disconnect(xprt);
1883
+ xdr_free_bvec(&req->rq_rcv_buf);
1884
+ xdr_free_bvec(&req->rq_snd_buf);
14201885 if (req->rq_cred != NULL)
14211886 put_rpccred(req->rq_cred);
1422
- task->tk_rqstp = NULL;
14231887 if (req->rq_release_snd_buf)
14241888 req->rq_release_snd_buf(req);
14251889
1426
- dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1890
+ task->tk_rqstp = NULL;
14271891 if (likely(!bc_prealloc(req)))
14281892 xprt->ops->free_slot(xprt, req);
14291893 else
14301894 xprt_free_bc_request(req);
14311895 }
1896
+
1897
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
1898
+void
1899
+xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1900
+{
1901
+ struct xdr_buf *xbufp = &req->rq_snd_buf;
1902
+
1903
+ task->tk_rqstp = req;
1904
+ req->rq_task = task;
1905
+ xprt_init_connect_cookie(req, req->rq_xprt);
1906
+ /*
1907
+ * Set up the xdr_buf length.
1908
+ * This also indicates that the buffer is XDR encoded already.
1909
+ */
1910
+ xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1911
+ xbufp->tail[0].iov_len;
1912
+}
1913
+#endif
14321914
14331915 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
14341916 {
....@@ -1436,10 +1918,11 @@
14361918
14371919 spin_lock_init(&xprt->transport_lock);
14381920 spin_lock_init(&xprt->reserve_lock);
1439
- spin_lock_init(&xprt->recv_lock);
1921
+ spin_lock_init(&xprt->queue_lock);
14401922
14411923 INIT_LIST_HEAD(&xprt->free);
1442
- INIT_LIST_HEAD(&xprt->recv);
1924
+ xprt->recv_queue = RB_ROOT;
1925
+ INIT_LIST_HEAD(&xprt->xmit_queue);
14431926 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
14441927 spin_lock_init(&xprt->bc_pa_lock);
14451928 INIT_LIST_HEAD(&xprt->bc_pa_list);
....@@ -1452,7 +1935,7 @@
14521935
14531936 rpc_init_wait_queue(&xprt->binding, "xprt_binding");
14541937 rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1455
- rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
1938
+ rpc_init_wait_queue(&xprt->sending, "xprt_sending");
14561939 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
14571940
14581941 xprt_init_xid(xprt);
....@@ -1483,11 +1966,8 @@
14831966
14841967 found:
14851968 xprt = t->setup(args);
1486
- if (IS_ERR(xprt)) {
1487
- dprintk("RPC: xprt_create_transport: failed, %ld\n",
1488
- -PTR_ERR(xprt));
1969
+ if (IS_ERR(xprt))
14891970 goto out;
1490
- }
14911971 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
14921972 xprt->idle_timeout = 0;
14931973 INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
....@@ -1508,8 +1988,7 @@
15081988
15091989 rpc_xprt_debugfs_register(xprt);
15101990
1511
- dprintk("RPC: created transport %p with %u slots\n", xprt,
1512
- xprt->max_reqs);
1991
+ trace_xprt_create(xprt);
15131992 out:
15141993 return xprt;
15151994 }
....@@ -1519,12 +1998,19 @@
15191998 struct rpc_xprt *xprt =
15201999 container_of(work, struct rpc_xprt, task_cleanup);
15212000
2001
+ trace_xprt_destroy(xprt);
2002
+
15222003 rpc_xprt_debugfs_unregister(xprt);
15232004 rpc_destroy_wait_queue(&xprt->binding);
15242005 rpc_destroy_wait_queue(&xprt->pending);
15252006 rpc_destroy_wait_queue(&xprt->sending);
15262007 rpc_destroy_wait_queue(&xprt->backlog);
15272008 kfree(xprt->servername);
2009
+ /*
2010
+ * Destroy any existing back channel
2011
+ */
2012
+ xprt_destroy_backchannel(xprt, UINT_MAX);
2013
+
15282014 /*
15292015 * Tear down transport state and free the rpc_xprt
15302016 */
....@@ -1538,14 +2024,19 @@
15382024 */
15392025 static void xprt_destroy(struct rpc_xprt *xprt)
15402026 {
1541
- dprintk("RPC: destroying transport %p\n", xprt);
1542
-
15432027 /*
15442028 * Exclude transport connect/disconnect handlers and autoclose
15452029 */
15462030 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
15472031
2032
+ /*
2033
+ * xprt_schedule_autodisconnect() can run after XPRT_LOCKED
2034
+ * is cleared. We use ->transport_lock to ensure the mod_timer()
2035
+ * can only run *before* del_time_sync(), never after.
2036
+ */
2037
+ spin_lock(&xprt->transport_lock);
15482038 del_timer_sync(&xprt->timer);
2039
+ spin_unlock(&xprt->transport_lock);
15492040
15502041 /*
15512042 * Destroy sockets etc from the system workqueue so they can