From 01573e231f18eb2d99162747186f59511f56b64d Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Fri, 08 Dec 2023 10:40:48 +0000
Subject: [PATCH] 移去rt

---
 kernel/net/sunrpc/sched.c |  477 ++++++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 292 insertions(+), 185 deletions(-)

diff --git a/kernel/net/sunrpc/sched.c b/kernel/net/sunrpc/sched.c
index e339f8d..f0f55fb 100644
--- a/kernel/net/sunrpc/sched.c
+++ b/kernel/net/sunrpc/sched.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-2.0-only
 /*
  * linux/net/sunrpc/sched.c
  *
@@ -19,14 +20,12 @@
 #include <linux/spinlock.h>
 #include <linux/mutex.h>
 #include <linux/freezer.h>
+#include <linux/sched/mm.h>
 
 #include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/metrics.h>
 
 #include "sunrpc.h"
-
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-#define RPCDBG_FACILITY		RPCDBG_SCHED
-#endif
 
 #define CREATE_TRACE_POINTS
 #include <trace/events/sunrpc.h>
@@ -44,7 +43,7 @@
 
 static void			rpc_async_schedule(struct work_struct *);
 static void			 rpc_release_task(struct rpc_task *task);
-static void __rpc_queue_timer_fn(struct timer_list *t);
+static void __rpc_queue_timer_fn(struct work_struct *);
 
 /*
  * RPC tasks sit here while waiting for conditions to improve.
@@ -56,6 +55,21 @@
  */
 struct workqueue_struct *rpciod_workqueue __read_mostly;
 struct workqueue_struct *xprtiod_workqueue __read_mostly;
+EXPORT_SYMBOL_GPL(xprtiod_workqueue);
+
+unsigned long
+rpc_task_timeout(const struct rpc_task *task)
+{
+	unsigned long timeout = READ_ONCE(task->tk_timeout);
+
+	if (timeout != 0) {
+		unsigned long now = jiffies;
+		if (time_before(now, timeout))
+			return timeout - now;
+	}
+	return 0;
+}
+EXPORT_SYMBOL_GPL(rpc_task_timeout);
 
 /*
  * Disable the timer for a given RPC task. Should be called with
@@ -65,37 +79,36 @@
 static void
 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	if (task->tk_timeout == 0)
+	if (list_empty(&task->u.tk_wait.timer_list))
 		return;
-	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
 	task->tk_timeout = 0;
 	list_del(&task->u.tk_wait.timer_list);
 	if (list_empty(&queue->timer_list.list))
-		del_timer(&queue->timer_list.timer);
+		cancel_delayed_work(&queue->timer_list.dwork);
 }
 
 static void
 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
 {
+	unsigned long now = jiffies;
 	queue->timer_list.expires = expires;
-	mod_timer(&queue->timer_list.timer, expires);
+	if (time_before_eq(expires, now))
+		expires = 0;
+	else
+		expires -= now;
+	mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
 }
 
 /*
  * Set up a timer for the current task.
  */
 static void
-__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
+__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
+		unsigned long timeout)
 {
-	if (!task->tk_timeout)
-		return;
-
-	dprintk("RPC: %5u setting alarm for %u ms\n",
-		task->tk_pid, jiffies_to_msecs(task->tk_timeout));
-
-	task->u.tk_wait.expires = jiffies + task->tk_timeout;
-	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
-		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
+	task->tk_timeout = timeout;
+	if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
+		rpc_set_queue_timer(queue, timeout);
 	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
 }
 
@@ -173,24 +186,14 @@
 
 /*
  * Add new request to wait queue.
- *
- * Swapper tasks always get inserted at the head of the queue.
- * This should avoid many nasty memory deadlocks and hopefully
- * improve overall performance.
- * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
 		struct rpc_task *task,
 		unsigned char queue_priority)
 {
-	WARN_ON_ONCE(RPC_IS_QUEUED(task));
-	if (RPC_IS_QUEUED(task))
-		return;
-
+	INIT_LIST_HEAD(&task->u.tk_wait.timer_list);
 	if (RPC_IS_PRIORITY(queue))
 		__rpc_add_wait_queue_priority(queue, task, queue_priority);
-	else if (RPC_IS_SWAPPER(task))
-		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
 	else
 		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
 	task->tk_waitqueue = queue;
@@ -198,9 +201,6 @@
 	/* barrier matches the read in rpc_wake_up_task_queue_locked() */
 	smp_wmb();
 	rpc_set_queued(task);
-
-	dprintk("RPC: %5u added to queue %p \"%s\"\n",
-			task->tk_pid, queue, rpc_qname(queue));
 }
 
 /*
@@ -223,8 +223,6 @@
 	else
 		list_del(&task->u.tk_wait.list);
 	queue->qlen--;
-	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
-			task->tk_pid, queue, rpc_qname(queue));
 }
 
 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
@@ -237,7 +235,8 @@
 	queue->maxpriority = nr_queues - 1;
 	rpc_reset_waitqueue_priority(queue);
 	queue->qlen = 0;
-	timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0);
+	queue->timer_list.expires = 0;
+	INIT_DELAYED_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
 	INIT_LIST_HEAD(&queue->timer_list.list);
 	rpc_assign_waitqueue_name(queue, qname);
 }
@@ -256,7 +255,7 @@
 
 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
 {
-	del_timer_sync(&queue->timer_list.timer);
+	cancel_delayed_work_sync(&queue->timer_list.dwork);
 }
 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
 
@@ -359,60 +358,119 @@
  * NB: An RPC task will only receive interrupt-driven events as long
  * as it's on a wait queue.
  */
-static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
+static void __rpc_do_sleep_on_priority(struct rpc_wait_queue *q,
 		struct rpc_task *task,
-		rpc_action action,
 		unsigned char queue_priority)
 {
-	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
-			task->tk_pid, rpc_qname(q), jiffies);
-
 	trace_rpc_task_sleep(task, q);
 
 	__rpc_add_wait_queue(q, task, queue_priority);
-
-	WARN_ON_ONCE(task->tk_callback != NULL);
-	task->tk_callback = action;
-	__rpc_add_timer(q, task);
 }
+
+static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
+		struct rpc_task *task,
+		unsigned char queue_priority)
+{
+	if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
+		return;
+	__rpc_do_sleep_on_priority(q, task, queue_priority);
+}
+
+static void __rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
+		struct rpc_task *task, unsigned long timeout,
+		unsigned char queue_priority)
+{
+	if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
+		return;
+	if (time_is_after_jiffies(timeout)) {
+		__rpc_do_sleep_on_priority(q, task, queue_priority);
+		__rpc_add_timer(q, task, timeout);
+	} else
+		task->tk_status = -ETIMEDOUT;
+}
+
+static void rpc_set_tk_callback(struct rpc_task *task, rpc_action action)
+{
+	if (action && !WARN_ON_ONCE(task->tk_callback != NULL))
+		task->tk_callback = action;
+}
+
+static bool rpc_sleep_check_activated(struct rpc_task *task)
+{
+	/* We shouldn't ever put an inactive task to sleep */
+	if (WARN_ON_ONCE(!RPC_IS_ACTIVATED(task))) {
+		task->tk_status = -EIO;
+		rpc_put_task_async(task);
+		return false;
+	}
+	return true;
+}
+
+void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task,
+				rpc_action action, unsigned long timeout)
+{
+	if (!rpc_sleep_check_activated(task))
+		return;
+
+	rpc_set_tk_callback(task, action);
+
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock(&q->lock);
+	__rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority);
+	spin_unlock(&q->lock);
+}
+EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout);
 
 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 				rpc_action action)
 {
-	/* We shouldn't ever put an inactive task to sleep */
-	WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
-	if (!RPC_IS_ACTIVATED(task)) {
-		task->tk_status = -EIO;
-		rpc_put_task_async(task);
+	if (!rpc_sleep_check_activated(task))
 		return;
-	}
 
+	rpc_set_tk_callback(task, action);
+
+	WARN_ON_ONCE(task->tk_timeout != 0);
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&q->lock);
-	__rpc_sleep_on_priority(q, task, action, task->tk_priority);
-	spin_unlock_bh(&q->lock);
+	spin_lock(&q->lock);
+	__rpc_sleep_on_priority(q, task, task->tk_priority);
+	spin_unlock(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on);
 
-void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
-		rpc_action action, int priority)
+void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
+		struct rpc_task *task, unsigned long timeout, int priority)
 {
-	/* We shouldn't ever put an inactive task to sleep */
-	WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
-	if (!RPC_IS_ACTIVATED(task)) {
-		task->tk_status = -EIO;
-		rpc_put_task_async(task);
+	if (!rpc_sleep_check_activated(task))
 		return;
-	}
 
+	priority -= RPC_PRIORITY_LOW;
 	/*
 	 * Protect the queue operations.
 	 */
-	spin_lock_bh(&q->lock);
-	__rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
-	spin_unlock_bh(&q->lock);
+	spin_lock(&q->lock);
+	__rpc_sleep_on_priority_timeout(q, task, timeout, priority);
+	spin_unlock(&q->lock);
+}
+EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout);
+
+void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
+		int priority)
+{
+	if (!rpc_sleep_check_activated(task))
+		return;
+
+	WARN_ON_ONCE(task->tk_timeout != 0);
+	priority -= RPC_PRIORITY_LOW;
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock(&q->lock);
+	__rpc_sleep_on_priority(q, task, priority);
+	spin_unlock(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
 
@@ -428,9 +486,6 @@
 		struct rpc_wait_queue *queue,
 		struct rpc_task *task)
 {
-	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
-			task->tk_pid, jiffies);
-
 	/* Has the task been executed yet? If not, we cannot wake it up! */
 	if (!RPC_IS_ACTIVATED(task)) {
 		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
@@ -442,41 +497,36 @@
 	__rpc_remove_wait_queue(queue, task);
 
 	rpc_make_runnable(wq, task);
-
-	dprintk("RPC:       __rpc_wake_up_task done\n");
 }
 
 /*
  * Wake up a queued task while the queue lock is being held
  */
-static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
-		struct rpc_wait_queue *queue, struct rpc_task *task)
+static struct rpc_task *
+rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue, struct rpc_task *task,
+		bool (*action)(struct rpc_task *, void *), void *data)
 {
 	if (RPC_IS_QUEUED(task)) {
 		smp_rmb();
-		if (task->tk_waitqueue == queue)
-			__rpc_do_wake_up_task_on_wq(wq, queue, task);
+		if (task->tk_waitqueue == queue) {
+			if (action == NULL || action(task, data)) {
+				__rpc_do_wake_up_task_on_wq(wq, queue, task);
+				return task;
+			}
+		}
 	}
+	return NULL;
 }
 
 /*
  * Wake up a queued task while the queue lock is being held
  */
-static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue,
+					  struct rpc_task *task)
 {
-	rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
-}
-
-/*
- * Wake up a task on a specific queue
- */
-void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
-		struct rpc_wait_queue *queue,
-		struct rpc_task *task)
-{
-	spin_lock_bh(&queue->lock);
-	rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
-	spin_unlock_bh(&queue->lock);
+	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
+						   task, NULL, NULL);
 }
 
 /*
@@ -484,11 +534,47 @@
  */
 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	spin_lock_bh(&queue->lock);
+	if (!RPC_IS_QUEUED(task))
+		return;
+	spin_lock(&queue->lock);
 	rpc_wake_up_task_queue_locked(queue, task);
-	spin_unlock_bh(&queue->lock);
+	spin_unlock(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
+
+static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
+{
+	task->tk_status = *(int *)status;
+	return true;
+}
+
+static void
+rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
+		struct rpc_task *task, int status)
+{
+	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
+			task, rpc_task_action_set_status, &status);
+}
+
+/**
+ * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
+ * @queue: pointer to rpc_wait_queue
+ * @task: pointer to rpc_task
+ * @status: integer error value
+ *
+ * If @task is queued on @queue, then it is woken up, and @task->tk_status is
+ * set to the value of @status.
+ */
+void
+rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
+		struct rpc_task *task, int status)
+{
+	if (!RPC_IS_QUEUED(task))
+		return;
+	spin_lock(&queue->lock);
+	rpc_wake_up_task_queue_set_status_locked(queue, task, status);
+	spin_unlock(&queue->lock);
+}
 
 /*
  * Wake up the next task on a priority queue.
@@ -558,17 +644,12 @@
 {
 	struct rpc_task	*task = NULL;
 
-	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
-			queue, rpc_qname(queue));
-	spin_lock_bh(&queue->lock);
+	spin_lock(&queue->lock);
 	task = __rpc_find_next_queued(queue);
-	if (task != NULL) {
-		if (func(task, data))
-			rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
-		else
-			task = NULL;
-	}
-	spin_unlock_bh(&queue->lock);
+	if (task != NULL)
+		task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
+				task, func, data);
+	spin_unlock(&queue->lock);
 
 	return task;
 }
@@ -598,6 +679,23 @@
 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
 
 /**
+ * rpc_wake_up_locked - wake up all rpc_tasks
+ * @queue: rpc_wait_queue on which the tasks are sleeping
+ *
+ */
+static void rpc_wake_up_locked(struct rpc_wait_queue *queue)
+{
+	struct rpc_task *task;
+
+	for (;;) {
+		task = __rpc_find_next_queued(queue);
+		if (task == NULL)
+			break;
+		rpc_wake_up_task_queue_locked(queue, task);
+	}
+}
+
+/**
  * rpc_wake_up - wake up all rpc_tasks
  * @queue: rpc_wait_queue on which the tasks are sleeping
  *
@@ -605,25 +703,28 @@
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
-	struct list_head *head;
-
-	spin_lock_bh(&queue->lock);
-	head = &queue->tasks[queue->maxpriority];
-	for (;;) {
-		while (!list_empty(head)) {
-			struct rpc_task *task;
-			task = list_first_entry(head,
-					struct rpc_task,
-					u.tk_wait.list);
-			rpc_wake_up_task_queue_locked(queue, task);
-		}
-		if (head == &queue->tasks[0])
-			break;
-		head--;
-	}
-	spin_unlock_bh(&queue->lock);
+	spin_lock(&queue->lock);
+	rpc_wake_up_locked(queue);
+	spin_unlock(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up);
+
+/**
+ * rpc_wake_up_status_locked - wake up all rpc_tasks and set their status value.
+ * @queue: rpc_wait_queue on which the tasks are sleeping
+ * @status: status value to set
+ */
+static void rpc_wake_up_status_locked(struct rpc_wait_queue *queue, int status)
+{
+	struct rpc_task *task;
+
+	for (;;) {
+		task = __rpc_find_next_queued(queue);
+		if (task == NULL)
+			break;
+		rpc_wake_up_task_queue_set_status_locked(queue, task, status);
+	}
+}
 
 /**
  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
@@ -634,39 +735,26 @@
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
-	struct list_head *head;
-
-	spin_lock_bh(&queue->lock);
-	head = &queue->tasks[queue->maxpriority];
-	for (;;) {
-		while (!list_empty(head)) {
-			struct rpc_task *task;
-			task = list_first_entry(head,
-					struct rpc_task,
-					u.tk_wait.list);
-			task->tk_status = status;
-			rpc_wake_up_task_queue_locked(queue, task);
-		}
-		if (head == &queue->tasks[0])
-			break;
-		head--;
-	}
-	spin_unlock_bh(&queue->lock);
+	spin_lock(&queue->lock);
+	rpc_wake_up_status_locked(queue, status);
+	spin_unlock(&queue->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 
-static void __rpc_queue_timer_fn(struct timer_list *t)
+static void __rpc_queue_timer_fn(struct work_struct *work)
 {
-	struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer);
+	struct rpc_wait_queue *queue = container_of(work,
+			struct rpc_wait_queue,
+			timer_list.dwork.work);
 	struct rpc_task *task, *n;
 	unsigned long expires, now, timeo;
 
 	spin_lock(&queue->lock);
 	expires = now = jiffies;
 	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
-		timeo = task->u.tk_wait.expires;
+		timeo = task->tk_timeout;
 		if (time_after_eq(now, timeo)) {
-			dprintk("RPC: %5u timeout\n", task->tk_pid);
+			trace_rpc_task_timeout(task, task->tk_action);
 			task->tk_status = -ETIMEDOUT;
 			rpc_wake_up_task_queue_locked(queue, task);
 			continue;
@@ -690,8 +778,7 @@
  */
 void rpc_delay(struct rpc_task *task, unsigned long delay)
 {
-	task->tk_timeout = delay;
-	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
+	rpc_sleep_on_timeout(&delay_queue, task, __rpc_atrun, jiffies + delay);
 }
 EXPORT_SYMBOL_GPL(rpc_delay);
 
@@ -719,8 +806,7 @@
 rpc_reset_task_statistics(struct rpc_task *task)
 {
 	task->tk_timeouts = 0;
-	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT);
-
+	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_SENT);
 	rpc_init_task_statistics(task);
 }
 
@@ -729,11 +815,15 @@
  */
 void rpc_exit_task(struct rpc_task *task)
 {
+	trace_rpc_task_end(task, task->tk_action);
 	task->tk_action = NULL;
+	if (task->tk_ops->rpc_count_stats)
+		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
+	else if (task->tk_client)
+		rpc_count_iostats(task, task->tk_client->cl_metrics);
 	if (task->tk_ops->rpc_call_done != NULL) {
 		task->tk_ops->rpc_call_done(task, task->tk_calldata);
 		if (task->tk_action != NULL) {
-			WARN_ON(RPC_ASSASSINATED(task));
 			/* Always release the RPC slot and buffer memory */
 			xprt_release(task);
 			rpc_reset_task_statistics(task);
@@ -741,12 +831,26 @@
 	}
 }
 
+void rpc_signal_task(struct rpc_task *task)
+{
+	struct rpc_wait_queue *queue;
+
+	if (!RPC_IS_ACTIVATED(task))
+		return;
+
+	trace_rpc_task_signalled(task, task->tk_action);
+	set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
+	smp_mb__after_atomic();
+	queue = READ_ONCE(task->tk_waitqueue);
+	if (queue)
+		rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS);
+}
+
 void rpc_exit(struct rpc_task *task, int status)
 {
 	task->tk_status = status;
 	task->tk_action = rpc_exit_task;
-	if (RPC_IS_QUEUED(task))
-		rpc_wake_up_queued_task(task->tk_waitqueue, task);
+	rpc_wake_up_queued_task(task->tk_waitqueue, task);
 }
 EXPORT_SYMBOL_GPL(rpc_exit);
 
@@ -764,9 +868,6 @@
 	struct rpc_wait_queue *queue;
 	int task_is_async = RPC_IS_ASYNC(task);
 	int status = 0;
-
-	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
-			task->tk_pid, task->tk_flags);
 
 	WARN_ON_ONCE(RPC_IS_QUEUED(task));
 	if (RPC_IS_QUEUED(task))
@@ -797,6 +898,15 @@
 		 */
 		if (!RPC_IS_QUEUED(task))
 			continue;
+
+		/*
+		 * Signalled tasks should exit rather than sleep.
+		 */
+		if (RPC_SIGNALLED(task)) {
+			task->tk_rpc_status = -ERESTARTSYS;
+			rpc_exit(task, -ERESTARTSYS);
+		}
+
 		/*
 		 * The queue->lock protects against races with
 		 * rpc_make_runnable().
@@ -807,37 +917,36 @@
 		 * rpc_task pointer may still be dereferenced.
 		 */
 		queue = task->tk_waitqueue;
-		spin_lock_bh(&queue->lock);
+		spin_lock(&queue->lock);
 		if (!RPC_IS_QUEUED(task)) {
-			spin_unlock_bh(&queue->lock);
+			spin_unlock(&queue->lock);
 			continue;
 		}
 		rpc_clear_running(task);
-		spin_unlock_bh(&queue->lock);
+		spin_unlock(&queue->lock);
 		if (task_is_async)
 			return;
 
 		/* sync task: sleep here */
-		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
+		trace_rpc_task_sync_sleep(task, task->tk_action);
 		status = out_of_line_wait_on_bit(&task->tk_runstate,
 				RPC_TASK_QUEUED, rpc_wait_bit_killable,
 				TASK_KILLABLE);
-		if (status == -ERESTARTSYS) {
+		if (status < 0) {
 			/*
 			 * When a sync task receives a signal, it exits with
 			 * -ERESTARTSYS. In order to catch any callbacks that
 			 * clean up after sleeping on some queue, we don't
 			 * break the loop here, but go around once more.
 			 */
-			dprintk("RPC: %5u got signal\n", task->tk_pid);
-			task->tk_flags |= RPC_TASK_KILLED;
+			trace_rpc_task_signalled(task, task->tk_action);
+			set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
+			task->tk_rpc_status = -ERESTARTSYS;
 			rpc_exit(task, -ERESTARTSYS);
 		}
-		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
+		trace_rpc_task_sync_wake(task, task->tk_action);
 	}
 
-	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
-			task->tk_status);
 	/* Release all resources associated with the task */
 	rpc_release_task(task);
 }
@@ -857,13 +966,19 @@
 
 	rpc_set_active(task);
 	rpc_make_runnable(rpciod_workqueue, task);
-	if (!is_async)
+	if (!is_async) {
+		unsigned int pflags = memalloc_nofs_save();
 		__rpc_execute(task);
+		memalloc_nofs_restore(pflags);
+	}
 }
 
 static void rpc_async_schedule(struct work_struct *work)
 {
+	unsigned int pflags = memalloc_nofs_save();
+
 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
+	memalloc_nofs_restore(pflags);
 }
 
 /**
@@ -882,19 +997,18 @@
  * Most requests are 'small' (under 2KiB) and can be serviced from a
  * mempool, ensuring that NFS reads and writes can always proceed,
  * and that there is good locality of reference for these buffers.
- *
- * In order to avoid memory starvation triggering more writebacks of
- * NFS requests, we avoid using GFP_KERNEL.
  */
 int rpc_malloc(struct rpc_task *task)
 {
 	struct rpc_rqst *rqst = task->tk_rqstp;
 	size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
 	struct rpc_buffer *buf;
-	gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
+	gfp_t gfp = GFP_NOFS;
 
+	if (RPC_IS_ASYNC(task))
+		gfp = GFP_NOWAIT | __GFP_NOWARN;
 	if (RPC_IS_SWAPPER(task))
-		gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
+		gfp |= __GFP_MEMALLOC;
 
 	size += sizeof(struct rpc_buffer);
 	if (size <= RPC_BUFFER_MAXSIZE)
@@ -906,8 +1020,6 @@
 		return -ENOMEM;
 
 	buf->len = size;
-	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
-			task->tk_pid, size, buf);
 	rqst->rq_buffer = buf->data;
 	rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
 	return 0;
@@ -927,9 +1039,6 @@
 
 	buf = container_of(buffer, struct rpc_buffer, data);
 	size = buf->len;
-
-	dprintk("RPC:       freeing buffer of size %zu at %p\n",
-			size, buf);
 
 	if (size <= RPC_BUFFER_MAXSIZE)
 		mempool_free(buf, rpc_buffer_mempool);
@@ -956,21 +1065,21 @@
 	/* Initialize workqueue for async tasks */
 	task->tk_workqueue = task_setup_data->workqueue;
 
-	task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
+	task->tk_xprt = rpc_task_get_xprt(task_setup_data->rpc_client,
+			xprt_get(task_setup_data->rpc_xprt));
+
+	task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred);
 
 	if (task->tk_ops->rpc_call_prepare != NULL)
 		task->tk_action = rpc_prepare_task;
 
 	rpc_init_task_statistics(task);
-
-	dprintk("RPC:       new task initialized, procpid %u\n",
-				task_pid_nr(current));
 }
 
 static struct rpc_task *
 rpc_alloc_task(void)
 {
-	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO);
+	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
 /*
@@ -988,7 +1097,6 @@
 
 	rpc_init_task(task, setup_data);
 	task->tk_flags |= flags;
-	dprintk("RPC:       allocated task %p\n", task);
 	return task;
 }
 
@@ -1015,24 +1123,27 @@
 {
 	unsigned short tk_flags = task->tk_flags;
 
+	put_rpccred(task->tk_op_cred);
 	rpc_release_calldata(task->tk_ops, task->tk_calldata);
 
-	if (tk_flags & RPC_TASK_DYNAMIC) {
-		dprintk("RPC: %5u freeing task\n", task->tk_pid);
+	if (tk_flags & RPC_TASK_DYNAMIC)
 		mempool_free(task, rpc_task_mempool);
-	}
 }
 
 static void rpc_async_release(struct work_struct *work)
 {
+	unsigned int pflags = memalloc_nofs_save();
+
 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
+	memalloc_nofs_restore(pflags);
 }
 
 static void rpc_release_resources_task(struct rpc_task *task)
 {
 	xprt_release(task);
 	if (task->tk_msg.rpc_cred) {
-		put_rpccred(task->tk_msg.rpc_cred);
+		if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
+			put_cred(task->tk_msg.rpc_cred);
 		task->tk_msg.rpc_cred = NULL;
 	}
 	rpc_task_release_client(task);
@@ -1070,8 +1181,6 @@
 
 static void rpc_release_task(struct rpc_task *task)
 {
-	dprintk("RPC: %5u release task\n", task->tk_pid);
-
 	WARN_ON_ONCE(RPC_IS_QUEUED(task));
 
 	rpc_release_resources_task(task);
@@ -1112,7 +1221,6 @@
 	/*
 	 * Create the rpciod thread and wait for it to start.
 	 */
-	dprintk("RPC:       creating workqueue rpciod\n");
 	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
 	if (!wq)
 		goto out_failed;
@@ -1137,7 +1245,6 @@
 
 	if (rpciod_workqueue == NULL)
 		return;
-	dprintk("RPC:       destroying workqueue rpciod\n");
 
 	wq = rpciod_workqueue;
 	rpciod_workqueue = NULL;

--
Gitblit v1.6.2