patch-2.3.99-pre3 linux/net/sunrpc/sched.c

Next file: linux/net/sunrpc/sunrpc_syms.c
Previous file: linux/net/sunrpc/pmap_clnt.c
Back to the patch index
Back to the overall index

diff -u --recursive --new-file v2.3.99-pre2/linux/net/sunrpc/sched.c linux/net/sunrpc/sched.c
@@ -69,6 +69,16 @@
 static int			rpc_inhibit = 0;
 
 /*
+ * Spinlock for wait queues. Access to the latter also has to be
+ * interrupt-safe in order to allow timers to wake up sleeping tasks.
+ */
+spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
+/*
+ * Spinlock for other critical sections of code.
+ */
+spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
+
+/*
  * This is the last-ditch buffer for NFS swap requests
  */
 static u32			swap_buffer[PAGE_SIZE >> 2];
@@ -87,14 +97,52 @@
 }
 
 /*
- * Spinlock for wait queues. Access to the latter also has to be
- * interrupt-safe in order to allow timers to wake up sleeping tasks.
+ * Set up a timer for the current task.
  */
-spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
+static inline void
+__rpc_add_timer(struct rpc_task *task, rpc_action timer)
+{
+	if (!task->tk_timeout)
+		return;
+
+	dprintk("RPC: %4d setting alarm for %lu ms\n",
+			task->tk_pid, task->tk_timeout * 1000 / HZ);
+
+	if (timer_pending(&task->tk_timer)) {
+		printk(KERN_ERR "RPC: Bug! Overwriting active timer\n");
+		del_timer(&task->tk_timer);
+	}
+	if (!timer)
+		timer = __rpc_default_timer;
+	init_timer(&task->tk_timer);
+	task->tk_timer.expires  = jiffies + task->tk_timeout;
+	task->tk_timer.data     = (unsigned long) task;
+	task->tk_timer.function = (void (*)(unsigned long)) timer;
+	add_timer(&task->tk_timer);
+}
+
 /*
- * Spinlock for other critical sections of code.
+ * Set up a timer for an already sleeping task.
  */
-spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
+void rpc_add_timer(struct rpc_task *task, rpc_action timer)
+{
+	spin_lock_bh(&rpc_queue_lock);
+	if (!(RPC_IS_RUNNING(task) || task->tk_wakeup))
+		__rpc_add_timer(task, timer);
+	spin_unlock_bh(&rpc_queue_lock);
+}
+
+/*
+ * Delete any timer for the current task.
+ */
+static inline void
+__rpc_del_timer(struct rpc_task *task)
+{
+	dprintk("RPC: %4d deleting timer\n", task->tk_pid);
+	if (timer_pending(&task->tk_timer))
+		del_timer(&task->tk_timer);
+	task->tk_timeout = 0;
+}
 
 /*
  * Add new request to wait queue.
@@ -104,16 +152,15 @@
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-static int
+static inline int
 __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	if (task->tk_rpcwait) {
-		if (task->tk_rpcwait != queue)
-		{
-			printk(KERN_WARNING "RPC: doubly enqueued task!\n");
-			return -EWOULDBLOCK;
-		}
+	if (task->tk_rpcwait == queue)
 		return 0;
+
+	if (task->tk_rpcwait) {
+		printk(KERN_WARNING "RPC: doubly enqueued task!\n");
+		return -EWOULDBLOCK;
 	}
 	if (RPC_IS_SWAPPER(task))
 		rpc_insert_list(&queue->task, task);
@@ -130,7 +177,7 @@
 int
 rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
 {
-	int result;
+	int		result;
 
 	spin_lock_bh(&rpc_queue_lock);
 	result = __rpc_add_wait_queue(q, task);
@@ -142,13 +189,14 @@
  * Remove request from queue.
  * Note: must be called with spin lock held.
  */
-static void
+static inline void
 __rpc_remove_wait_queue(struct rpc_task *task)
 {
-	struct rpc_wait_queue *queue;
+	struct rpc_wait_queue *queue = task->tk_rpcwait;
 
-	if (!(queue = task->tk_rpcwait))
+	if (!queue)
 		return;
+
 	rpc_remove_list(&queue->task, task);
 	task->tk_rpcwait = NULL;
 
@@ -159,51 +207,14 @@
 void
 rpc_remove_wait_queue(struct rpc_task *task)
 {
+	if (!task->tk_rpcwait)
+		return;
 	spin_lock_bh(&rpc_queue_lock);
 	__rpc_remove_wait_queue(task);
 	spin_unlock_bh(&rpc_queue_lock);
 }
 
 /*
- * Set up a timer for the current task.
- */
-inline void
-rpc_add_timer(struct rpc_task *task, rpc_action timer)
-{
-	unsigned long	expires = jiffies + task->tk_timeout;
-
-	dprintk("RPC: %4d setting alarm for %lu ms\n",
-			task->tk_pid, task->tk_timeout * 1000 / HZ);
-	if (!timer)
-		timer = __rpc_default_timer;
-	if (time_before(expires, jiffies)) {
-		printk(KERN_ERR "RPC: bad timeout value %ld - setting to 10 sec!\n",
-					task->tk_timeout);
-		expires = jiffies + 10 * HZ;
-	}
-	task->tk_timer.expires  = expires;
-	task->tk_timer.data     = (unsigned long) task;
-	task->tk_timer.function = (void (*)(unsigned long)) timer;
-	task->tk_timer.prev     = NULL;
-	task->tk_timer.next     = NULL;
-	add_timer(&task->tk_timer);
-}
-
-/*
- * Delete any timer for the current task.
- * Must be called with interrupts off.
- */
-inline void
-rpc_del_timer(struct rpc_task *task)
-{
-	if (task->tk_timeout) {
-		dprintk("RPC: %4d deleting timer\n", task->tk_pid);
-		del_timer(&task->tk_timer);
-		task->tk_timeout = 0;
-	}
-}
-
-/*
  * Make an RPC task runnable.
  *
  * Note: If the task is ASYNC, this must be called with 
@@ -218,31 +229,44 @@
 	}
 	task->tk_flags |= RPC_TASK_RUNNING;
 	if (RPC_IS_ASYNC(task)) {
-		int status;
-		status = __rpc_add_wait_queue(&schedq, task);
-		if (status)
-		{
-			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-			task->tk_status = status;
+		if (RPC_IS_SLEEPING(task)) {
+			int status;
+			status = __rpc_add_wait_queue(&schedq, task);
+			if (status < 0) {
+				printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
+				task->tk_status = status;
+			} else
+				task->tk_sleeping = 0;
 		}
 		wake_up(&rpciod_idle);
 	} else {
+		task->tk_sleeping = 0;
 		wake_up(&task->tk_wait);
 	}
 }
 
+/*
+ * Place a newly initialized task on the schedq.
+ */
+static inline void
+rpc_schedule_run(struct rpc_task *task)
+{
+	/* Don't run a child twice! */
+	if (RPC_IS_ACTIVATED(task))
+		return;
+	task->tk_active = 1;
+	task->tk_sleeping = 1;
+	rpc_make_runnable(task);
+}
 
 /*
  *	For other people who may need to wake the I/O daemon
  *	but should (for now) know nothing about its innards
  */
- 
 void rpciod_wake_up(void)
 {
 	if(rpciod_pid==0)
-	{
 		printk(KERN_ERR "rpciod: wot no daemon?\n");
-	}
 	wake_up(&rpciod_idle);
 }
 
@@ -261,19 +285,25 @@
 	dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
 				rpc_qname(q), jiffies);
 
+	if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
+		printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
+		return;
+	}
+
+	/* Mark the task as being activated if so needed */
+	if (!RPC_IS_ACTIVATED(task)) {
+		task->tk_active = 1;
+		task->tk_sleeping = 1;
+	}
+
 	status = __rpc_add_wait_queue(q, task);
-	if (status)
-	{
+	if (status) {
 		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
 		task->tk_status = status;
-		task->tk_flags |= RPC_TASK_RUNNING;
-	}
-	else
-	{
-		task->tk_callback = action;
-		if (task->tk_timeout)
-			rpc_add_timer(task, timer);
+	} else {
 		task->tk_flags &= ~RPC_TASK_RUNNING;
+		task->tk_callback = action;
+		__rpc_add_timer(task, timer);
 	}
 
 	return;
@@ -291,6 +321,19 @@
 	spin_unlock_bh(&rpc_queue_lock);
 }
 
+void
+rpc_sleep_locked(struct rpc_wait_queue *q, struct rpc_task *task,
+		 rpc_action action, rpc_action timer)
+{
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock_bh(&rpc_queue_lock);
+	__rpc_sleep_on(q, task, action, timer);
+	rpc_lock_task(task);
+	spin_unlock_bh(&rpc_queue_lock);
+}
+
 /*
  * Wake up a single task -- must be invoked with spin lock held.
  *
@@ -307,16 +350,33 @@
 	if (task->tk_magic != 0xf00baa) {
 		printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
 		rpc_debug = ~0;
+		rpc_show_tasks();
 		return;
 	}
 #endif
-	rpc_del_timer(task);
+	/* Has the task been executed yet? If not, we cannot wake it up! */
+	if (!RPC_IS_ACTIVATED(task)) {
+		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
+		return;
+	}
+	if (RPC_IS_RUNNING(task))
+		return;
+
+	__rpc_del_timer(task);
+
+	/* If the task has been locked, then set tk_wakeup so that
+	 * rpc_unlock_task() wakes us up... */
+	if (task->tk_lock) {
+		task->tk_wakeup = 1;
+		return;
+	} else
+		task->tk_wakeup = 0;
+
 	if (task->tk_rpcwait != &schedq)
 		__rpc_remove_wait_queue(task);
-	if (!RPC_IS_RUNNING(task)) {
-		task->tk_flags |= RPC_TASK_CALLBACK;
-		rpc_make_runnable(task);
-	}
+	task->tk_flags |= RPC_TASK_CALLBACK;
+	rpc_make_runnable(task);
+
 	dprintk("RPC:      __rpc_wake_up done\n");
 }
 
@@ -338,6 +398,8 @@
 void
 rpc_wake_up_task(struct rpc_task *task)
 {
+	if (RPC_IS_RUNNING(task))
+		return;
 	spin_lock_bh(&rpc_queue_lock);
 	__rpc_wake_up(task);
 	spin_unlock_bh(&rpc_queue_lock);
@@ -389,6 +451,30 @@
 }
 
 /*
+ * Lock down a sleeping task to prevent it from waking up
+ * and disappearing from beneath us.
+ *
+ * This function should always be called with the
+ * rpc_queue_lock held.
+ */
+int
+rpc_lock_task(struct rpc_task *task)
+{
+	if (!RPC_IS_RUNNING(task))
+		return ++task->tk_lock;
+	return 0;
+}
+
+void
+rpc_unlock_task(struct rpc_task *task)
+{
+	spin_lock_bh(&rpc_queue_lock);
+	if (task->tk_lock && !--task->tk_lock && task->tk_wakeup)
+		__rpc_wake_up(task);
+	spin_unlock_bh(&rpc_queue_lock);
+}
+
+/*
  * Run a task at a later time
  */
 static void	__rpc_atrun(struct rpc_task *);
@@ -426,7 +512,7 @@
 		/*
 		 * Execute any pending callback.
 		 */
-		if (task->tk_flags & RPC_TASK_CALLBACK) {
+		if (RPC_DO_CALLBACK(task)) {
 			/* Define a callback save pointer */
 			void (*save_callback)(struct rpc_task *);
 	
@@ -446,101 +532,89 @@
 		}
 
 		/*
-		 * No handler for next step means exit.
-		 */
-		if (!task->tk_action)
-			break;
-
-		/*
 		 * Perform the next FSM step.
 		 * tk_action may be NULL when the task has been killed
 		 * by someone else.
 		 */
-		if (RPC_IS_RUNNING(task) && task->tk_action)
+		if (RPC_IS_RUNNING(task)) {
+			if (!task->tk_action)
+				break;
 			task->tk_action(task);
+		}
 
 		/*
 		 * Check whether task is sleeping.
-		 * Note that if the task may go to sleep in tk_action,
+		 * Note that if the task goes to sleep in tk_action,
 		 * and the RPC reply arrives before we get here, it will
 		 * have state RUNNING, but will still be on schedq.
+		 * 27/9/99: The above has been attempted fixed by
+		 *          introduction of task->tk_sleeping.
 		 */
 		spin_lock_bh(&rpc_queue_lock);
-		if (RPC_IS_RUNNING(task)) {
-			if (task->tk_rpcwait == &schedq)
-				__rpc_remove_wait_queue(task);
-		} else while (!RPC_IS_RUNNING(task)) {
+		if (!RPC_IS_RUNNING(task)) {
+			task->tk_sleeping = 1;
 			if (RPC_IS_ASYNC(task)) {
 				spin_unlock_bh(&rpc_queue_lock);
 				return 0;
 			}
+		} else
+			task->tk_sleeping = 0;
+		spin_unlock_bh(&rpc_queue_lock);
 
+		while (RPC_IS_SLEEPING(task)) {
 			/* sync task: sleep here */
 			dprintk("RPC: %4d sync task going to sleep\n",
 							task->tk_pid);
 			if (current->pid == rpciod_pid)
 				printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
 
-			spin_unlock_bh(&rpc_queue_lock);
-			__wait_event(task->tk_wait, RPC_IS_RUNNING(task));
-			spin_lock_bh(&rpc_queue_lock);
+			__wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
+			dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
 
 			/*
-			 * When the task received a signal, remove from
-			 * any queues etc, and make runnable again.
+			 * When a sync task receives a signal, it exits with
+			 * -ERESTARTSYS. In order to catch any callbacks that
+			 * clean up after sleeping on some queue, we don't
+			 * break the loop here, but go around once more.
 			 */
-			if (signalled())
-				__rpc_wake_up(task);
-
-			dprintk("RPC: %4d sync task resuming\n",
-							task->tk_pid);
-		}
-		spin_unlock_bh(&rpc_queue_lock);
-
-		/*
-		 * When a sync task receives a signal, it exits with
-		 * -ERESTARTSYS. In order to catch any callbacks that
-		 * clean up after sleeping on some queue, we don't
-		 * break the loop here, but go around once more.
-		 */
-		if (!RPC_IS_ASYNC(task) && signalled()) {
-			dprintk("RPC: %4d got signal\n", task->tk_pid);
-			rpc_exit(task, -ERESTARTSYS);
+			if (task->tk_client->cl_intr && signalled()) {
+				dprintk("RPC: %4d got signal\n", task->tk_pid);
+				task->tk_flags |= RPC_TASK_KILLED;
+				rpc_exit(task, -ERESTARTSYS);
+				rpc_wake_up_task(task);
+			}
 		}
 	}
 
 	dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
-	if (task->tk_exit) {
-		status = task->tk_status;
+	status = task->tk_status;
+	if (task->tk_exit)
 		task->tk_exit(task);
-	}
 
 	return status;
 }
 
 /*
  * User-visible entry point to the scheduler.
- * The recursion protection is for debugging. It should go away once
- * the code has stabilized.
+ *
+ * This may be called recursively if e.g. an async NFS task updates
+ * the attributes and finds that dirty pages must be flushed.
  */
-void
+int
 rpc_execute(struct rpc_task *task)
 {
-	static int	executing = 0;
-	int		incr = RPC_IS_ASYNC(task)? 1 : 0;
-
-	if (incr) {
-		if (rpc_inhibit) {
-			printk(KERN_INFO "RPC: execution inhibited!\n");
-			return;
-		}
-		if (executing)
-			printk(KERN_WARNING "RPC: %d tasks executed\n", executing);
+	if (rpc_inhibit) {
+		printk(KERN_INFO "RPC: execution inhibited!\n");
+		return -EIO;
 	}
+	task->tk_flags |= RPC_TASK_RUNNING;
+	if (task->tk_active) {
+		printk(KERN_ERR "RPC: active task was run twice!\n");
+		return -EWOULDBLOCK;
+	}
+	task->tk_active = 1;
 	
-	executing += incr;
-	__rpc_execute(task);
-	executing -= incr;
+	return __rpc_execute(task);
 }
 
 /*
@@ -551,28 +625,33 @@
 {
 	struct rpc_task	*task;
 	int		count = 0;
-	int need_resched = current->need_resched;
 
 	dprintk("RPC:      rpc_schedule enter\n");
 	while (1) {
+		/* Ensure equal rights for tcp tasks... */
+		rpciod_tcp_dispatcher();
+
 		spin_lock_bh(&rpc_queue_lock);
 		if (!(task = schedq.task)) {
 			spin_unlock_bh(&rpc_queue_lock);
 			break;
 		}
-		rpc_del_timer(task);
+		if (task->tk_lock) {
+			spin_unlock_bh(&rpc_queue_lock);
+			printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
+			rpc_debug = ~0;
+			rpc_show_tasks();
+			break;
+		}
 		__rpc_remove_wait_queue(task);
-		task->tk_flags |= RPC_TASK_RUNNING;
 		spin_unlock_bh(&rpc_queue_lock);
 
 		__rpc_execute(task);
 
-		if (++count >= 200) {
+		if (++count >= 200 || current->need_resched) {
 			count = 0;
-			need_resched = 1;
-		}
-		if (need_resched)
 			schedule();
+		}
 	}
 	dprintk("RPC:      rpc_schedule leave\n");
 }
@@ -646,8 +725,9 @@
 				rpc_action callback, int flags)
 {
 	memset(task, 0, sizeof(*task));
+	init_timer(&task->tk_timer);
 	task->tk_client = clnt;
-	task->tk_flags  = RPC_TASK_RUNNING | flags;
+	task->tk_flags  = flags;
 	task->tk_exit   = callback;
 	init_waitqueue_head(&task->tk_wait);
 	if (current->uid != current->fsuid || current->gid != current->fsgid)
@@ -717,6 +797,15 @@
 
 	dprintk("RPC: %4d release task\n", task->tk_pid);
 
+#ifdef RPC_DEBUG
+	if (task->tk_magic != 0xf00baa) {
+		printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
+		rpc_debug = ~0;
+		rpc_show_tasks();
+		return;
+	}
+#endif
+
 	/* Remove from global task list */
 	spin_lock(&rpc_sched_lock);
 	prev = task->tk_prev_task;
@@ -734,18 +823,20 @@
 	spin_lock_bh(&rpc_queue_lock);
 
 	/* Delete any running timer */
-	rpc_del_timer(task);
+	__rpc_del_timer(task);
 
 	/* Remove from any wait queue we're still on */
 	__rpc_remove_wait_queue(task);
 
+	task->tk_active = 0;
+
 	spin_unlock_bh(&rpc_queue_lock);
 
 	/* Release resources */
 	if (task->tk_rqstp)
 		xprt_release(task);
-	if (task->tk_cred)
-		rpcauth_releasecred(task);
+	if (task->tk_msg.rpc_cred)
+		rpcauth_unbindcred(task);
 	if (task->tk_buffer) {
 		rpc_free(task->tk_buffer);
 		task->tk_buffer = NULL;
@@ -824,7 +915,7 @@
 	spin_lock_bh(&rpc_queue_lock);
 	/* N.B. Is it possible for the child to have already finished? */
 	__rpc_sleep_on(&childq, task, func, NULL);
-	rpc_make_runnable(child);
+	rpc_schedule_run(child);
 	spin_unlock_bh(&rpc_queue_lock);
 }
 
@@ -903,8 +994,6 @@
 			schedule();
 			rounds = 0;
 		}
-		dprintk("RPC: rpciod running checking dispatch\n");
-		rpciod_tcp_dispatcher();
 
 		if (!rpciod_task_pending()) {
 			dprintk("RPC: rpciod back to sleep\n");
@@ -1032,11 +1121,9 @@
 }
 
 #ifdef RPC_DEBUG
-#include <linux/nfs_fs.h>
 void rpc_show_tasks(void)
 {
 	struct rpc_task *t = all_tasks, *next;
-	struct nfs_wreq *wreq;
 
 	spin_lock(&rpc_sched_lock);
 	t = all_tasks;
@@ -1049,22 +1136,11 @@
 	for (; t; t = next) {
 		next = t->tk_next_task;
 		printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
-			t->tk_pid, t->tk_proc, t->tk_flags, t->tk_status,
+			t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
 			t->tk_client, t->tk_client->cl_prog,
 			t->tk_rqstp, t->tk_timeout,
 			t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
 			t->tk_action, t->tk_exit);
-
-		if (!(t->tk_flags & RPC_TASK_NFSWRITE))
-			continue;
-		/* NFS write requests */
-		wreq = (struct nfs_wreq *) t->tk_calldata;
-		printk("     NFS: flgs=%08x, pid=%d, pg=%p, off=(%d, %d)\n",
-			wreq->wb_flags, wreq->wb_pid, wreq->wb_page,
-			wreq->wb_offset, wreq->wb_bytes);
-		printk("          name=%s/%s\n",
-			wreq->wb_file->f_dentry->d_parent->d_name.name,
-			wreq->wb_file->f_dentry->d_name.name);
 	}
 	spin_unlock(&rpc_sched_lock);
 }

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)