From e636c8d336489bf3eed5878299e6cc045bbad077 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Tue, 20 Feb 2024 01:17:29 +0000
Subject: [PATCH] debug lk

---
 kernel/fs/gfs2/glock.c |  682 ++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 508 insertions(+), 174 deletions(-)

diff --git a/kernel/fs/gfs2/glock.c b/kernel/fs/gfs2/glock.c
index 14d11cc..310dee2 100644
--- a/kernel/fs/gfs2/glock.c
+++ b/kernel/fs/gfs2/glock.c
@@ -1,10 +1,7 @@
+// SPDX-License-Identifier: GPL-2.0-only
 /*
  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
  * Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
- *
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU General Public License version 2.
  */
 
 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
@@ -128,19 +125,45 @@
 {
 	struct gfs2_glock *gl = container_of(rcu, struct gfs2_glock, gl_rcu);
 
-	if (gl->gl_ops->go_flags & GLOF_ASPACE) {
+	kfree(gl->gl_lksb.sb_lvbptr);
+	if (gl->gl_ops->go_flags & GLOF_ASPACE)
 		kmem_cache_free(gfs2_glock_aspace_cachep, gl);
-	} else {
-		kfree(gl->gl_lksb.sb_lvbptr);
+	else
 		kmem_cache_free(gfs2_glock_cachep, gl);
-	}
+}
+
+/**
+ * glock_blocked_by_withdraw - determine if we can still use a glock
+ * @gl: the glock
+ *
+ * We need to allow some glocks to be enqueued, dequeued, promoted, and demoted
+ * when we're withdrawn. For example, to maintain metadata integrity, we should
+ * disallow the use of inode and rgrp glocks when withdrawn. Other glocks, like
+ * iopen or the transaction glocks may be safely used because none of their
+ * metadata goes through the journal. So in general, we should disallow all
+ * glocks that are journaled, and allow all the others. One exception is:
+ * we need to allow our active journal to be promoted and demoted so others
+ * may recover it and we can reacquire it when they're done.
+ */
+static bool glock_blocked_by_withdraw(struct gfs2_glock *gl)
+{
+	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+	if (likely(!gfs2_withdrawn(sdp)))
+		return false;
+	if (gl->gl_ops->go_flags & GLOF_NONDISK)
+		return false;
+	if (!sdp->sd_jdesc ||
+	    gl->gl_name.ln_number == sdp->sd_jdesc->jd_no_addr)
+		return false;
+	return true;
 }
 
 void gfs2_glock_free(struct gfs2_glock *gl)
 {
 	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 
-	BUG_ON(atomic_read(&gl->gl_revokes));
+	gfs2_glock_assert_withdraw(gl, atomic_read(&gl->gl_revokes) == 0);
 	rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
 	smp_mb();
 	wake_up_glock(gl);
@@ -247,7 +270,12 @@
 	gfs2_glock_remove_from_lru(gl);
 	spin_unlock(&gl->gl_lockref.lock);
 	GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
-	GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
+	if (mapping) {
+		truncate_inode_pages_final(mapping);
+		if (!gfs2_withdrawn(sdp))
+			GLOCK_BUG_ON(gl, mapping->nrpages ||
+				     mapping->nrexceptional);
+	}
 	trace_gfs2_glock_put(gl);
 	sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
 }
@@ -284,7 +312,7 @@
 
 static inline int may_grant(const struct gfs2_glock *gl, const struct gfs2_holder *gh)
 {
-	const struct gfs2_holder *gh_head = list_entry(gl->gl_holders.next, const struct gfs2_holder, gh_list);
+	const struct gfs2_holder *gh_head = list_first_entry(&gl->gl_holders, const struct gfs2_holder, gh_list);
 	if ((gh->gh_state == LM_ST_EXCLUSIVE ||
 	     gh_head->gh_state == LM_ST_EXCLUSIVE) && gh != gh_head)
 		return 0;
@@ -308,6 +336,11 @@
 	clear_bit(HIF_WAIT, &gh->gh_iflags);
 	smp_mb__after_atomic();
 	wake_up_bit(&gh->gh_iflags, HIF_WAIT);
+	if (gh->gh_flags & GL_ASYNC) {
+		struct gfs2_sbd *sdp = gh->gh_gl->gl_name.ln_sbd;
+
+		wake_up(&sdp->sd_async_glock_wait);
+	}
 }
 
 /**
@@ -425,15 +458,21 @@
 		else
 			gl->gl_lockref.count--;
 	}
-	if (held1 && held2 && list_empty(&gl->gl_holders))
-		clear_bit(GLF_QUEUED, &gl->gl_flags);
-
 	if (new_state != gl->gl_target)
 		/* shorten our minimum hold time */
 		gl->gl_hold_time = max(gl->gl_hold_time - GL_GLOCK_HOLD_DECR,
 				       GL_GLOCK_MIN_HOLD);
 	gl->gl_state = new_state;
 	gl->gl_tchange = jiffies;
+}
+
+static void gfs2_set_demote(struct gfs2_glock *gl)
+{
+	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+	set_bit(GLF_DEMOTE, &gl->gl_flags);
+	smp_mb();
+	wake_up(&sdp->sd_async_glock_wait);
 }
 
 static void gfs2_demote_wake(struct gfs2_glock *gl)
@@ -499,7 +538,8 @@
 			do_xmote(gl, gh, LM_ST_UNLOCKED);
 			break;
 		default: /* Everything else */
-			pr_err("wanted %u got %u\n", gl->gl_target, state);
+			fs_err(gl->gl_name.ln_sbd, "wanted %u got %u\n",
+			       gl->gl_target, state);
 			GLOCK_BUG_ON(gl, 1);
 		}
 		spin_unlock(&gl->gl_lockref.lock);
@@ -529,6 +569,16 @@
 	spin_unlock(&gl->gl_lockref.lock);
 }
 
+static bool is_system_glock(struct gfs2_glock *gl)
+{
+	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+	struct gfs2_inode *m_ip = GFS2_I(sdp->sd_statfs_inode);
+
+	if (gl == m_ip->i_gl)
+		return true;
+	return false;
+}
+
 /**
  * do_xmote - Calls the DLM to change the state of a lock
  * @gl: The lock state
@@ -546,8 +596,8 @@
 	unsigned int lck_flags = (unsigned int)(gh ? gh->gh_flags : 0);
 	int ret;
 
-	if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)) &&
-	    target != LM_ST_UNLOCKED)
+	if (target != LM_ST_UNLOCKED && glock_blocked_by_withdraw(gl) &&
+	    gh && !(gh->gh_flags & LM_FLAG_NOEXP))
 		return;
 	lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP |
 		      LM_FLAG_PRIORITY);
@@ -555,7 +605,14 @@
 	GLOCK_BUG_ON(gl, gl->gl_state == gl->gl_target);
 	if ((target == LM_ST_UNLOCKED || target == LM_ST_DEFERRED) &&
 	    glops->go_inval) {
-		set_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags);
+		/*
+		 * If another process is already doing the invalidate, let that
+		 * finish first.  The glock state machine will get back to this
+		 * holder again later.
+		 */
+		if (test_and_set_bit(GLF_INVALIDATE_IN_PROGRESS,
+				     &gl->gl_flags))
+			return;
 		do_error(gl, 0); /* Fail queued try locks */
 	}
 	gl->gl_req = target;
@@ -565,13 +622,74 @@
 	    (lck_flags & (LM_FLAG_TRY|LM_FLAG_TRY_1CB)))
 		clear_bit(GLF_BLOCKING, &gl->gl_flags);
 	spin_unlock(&gl->gl_lockref.lock);
-	if (glops->go_sync)
-		glops->go_sync(gl);
-	if (test_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags))
+	if (glops->go_sync) {
+		ret = glops->go_sync(gl);
+		/* If we had a problem syncing (due to io errors or whatever,
+		 * we should not invalidate the metadata or tell dlm to
+		 * release the glock to other nodes.
+		 */
+		if (ret) {
+			if (cmpxchg(&sdp->sd_log_error, 0, ret)) {
+				fs_err(sdp, "Error %d syncing glock \n", ret);
+				gfs2_dump_glock(NULL, gl, true);
+			}
+			goto skip_inval;
+		}
+	}
+	if (test_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags)) {
+		/*
+		 * The call to go_sync should have cleared out the ail list.
+		 * If there are still items, we have a problem. We ought to
+		 * withdraw, but we can't because the withdraw code also uses
+		 * glocks. Warn about the error, dump the glock, then fall
+		 * through and wait for logd to do the withdraw for us.
+		 */
+		if ((atomic_read(&gl->gl_ail_count) != 0) &&
+		    (!cmpxchg(&sdp->sd_log_error, 0, -EIO))) {
+			gfs2_glock_assert_warn(gl,
+					       !atomic_read(&gl->gl_ail_count));
+			gfs2_dump_glock(NULL, gl, true);
+		}
 		glops->go_inval(gl, target == LM_ST_DEFERRED ? 0 : DIO_METADATA);
-	clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags);
+		clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags);
+	}
 
+skip_inval:
 	gfs2_glock_hold(gl);
+	/*
+	 * Check for an error encountered since we called go_sync and go_inval.
+	 * If so, we can't withdraw from the glock code because the withdraw
+	 * code itself uses glocks (see function signal_our_withdraw) to
+	 * change the mount to read-only. Most importantly, we must not call
+	 * dlm to unlock the glock until the journal is in a known good state
+	 * (after journal replay) otherwise other nodes may use the object
+	 * (rgrp or dinode) and then later, journal replay will corrupt the
+	 * file system. The best we can do here is wait for the logd daemon
+	 * to see sd_log_error and withdraw, and in the meantime, requeue the
+	 * work for later.
+	 *
+	 * We make a special exception for some system glocks, such as the
+	 * system statfs inode glock, which needs to be granted before the
+	 * gfs2_quotad daemon can exit, and that exit needs to finish before
+	 * we can unmount the withdrawn file system.
+	 *
+	 * However, if we're just unlocking the lock (say, for unmount, when
+	 * gfs2_gl_hash_clear calls clear_glock) and recovery is complete
+	 * then it's okay to tell dlm to unlock it.
+	 */
+	if (unlikely(sdp->sd_log_error && !gfs2_withdrawn(sdp)))
+		gfs2_withdraw_delayed(sdp);
+	if (glock_blocked_by_withdraw(gl) &&
+	    (target != LM_ST_UNLOCKED ||
+	     test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags))) {
+		if (!is_system_glock(gl)) {
+			gfs2_glock_queue_work(gl, GL_GLOCK_DFT_HOLD);
+			goto out;
+		} else {
+			clear_bit(GLF_INVALIDATE_IN_PROGRESS, &gl->gl_flags);
+		}
+	}
+
 	if (sdp->sd_lockstruct.ls_ops->lm_lock)	{
 		/* lock_dlm */
 		ret = sdp->sd_lockstruct.ls_ops->lm_lock(gl, target, lck_flags);
@@ -580,17 +698,15 @@
 		    test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) {
 			finish_xmote(gl, target);
 			gfs2_glock_queue_work(gl, 0);
-		}
-		else if (ret) {
-			pr_err("lm_lock ret %d\n", ret);
-			GLOCK_BUG_ON(gl, !test_bit(SDF_SHUTDOWN,
-						   &sdp->sd_flags));
+		} else if (ret) {
+			fs_err(sdp, "lm_lock ret %d\n", ret);
+			GLOCK_BUG_ON(gl, !gfs2_withdrawn(sdp));
 		}
 	} else { /* lock_nolock */
 		finish_xmote(gl, target);
 		gfs2_glock_queue_work(gl, 0);
 	}
-
+out:
 	spin_lock(&gl->gl_lockref.lock);
 }
 
@@ -604,7 +720,7 @@
 	struct gfs2_holder *gh;
 
 	if (!list_empty(&gl->gl_holders)) {
-		gh = list_entry(gl->gl_holders.next, struct gfs2_holder, gh_list);
+		gh = list_first_entry(&gl->gl_holders, struct gfs2_holder, gh_list);
 		if (test_bit(HIF_HOLDER, &gh->gh_iflags))
 			return gh;
 	}
@@ -669,12 +785,95 @@
 	return;
 }
 
+void gfs2_inode_remember_delete(struct gfs2_glock *gl, u64 generation)
+{
+	struct gfs2_inode_lvb *ri = (void *)gl->gl_lksb.sb_lvbptr;
+
+	if (ri->ri_magic == 0)
+		ri->ri_magic = cpu_to_be32(GFS2_MAGIC);
+	if (ri->ri_magic == cpu_to_be32(GFS2_MAGIC))
+		ri->ri_generation_deleted = cpu_to_be64(generation);
+}
+
+bool gfs2_inode_already_deleted(struct gfs2_glock *gl, u64 generation)
+{
+	struct gfs2_inode_lvb *ri = (void *)gl->gl_lksb.sb_lvbptr;
+
+	if (ri->ri_magic != cpu_to_be32(GFS2_MAGIC))
+		return false;
+	return generation <= be64_to_cpu(ri->ri_generation_deleted);
+}
+
+static void gfs2_glock_poke(struct gfs2_glock *gl)
+{
+	int flags = LM_FLAG_TRY_1CB | LM_FLAG_ANY | GL_SKIP;
+	struct gfs2_holder gh;
+	int error;
+
+	gfs2_holder_init(gl, LM_ST_SHARED, flags, &gh);
+	error = gfs2_glock_nq(&gh);
+	if (!error)
+		gfs2_glock_dq(&gh);
+	gfs2_holder_uninit(&gh);
+}
+
+static bool gfs2_try_evict(struct gfs2_glock *gl)
+{
+	struct gfs2_inode *ip;
+	bool evicted = false;
+
+	/*
+	 * If there is contention on the iopen glock and we have an inode, try
+	 * to grab and release the inode so that it can be evicted.  This will
+	 * allow the remote node to go ahead and delete the inode without us
+	 * having to do it, which will avoid rgrp glock thrashing.
+	 *
+	 * The remote node is likely still holding the corresponding inode
+	 * glock, so it will run before we get to verify that the delete has
+	 * happened below.
+	 */
+	spin_lock(&gl->gl_lockref.lock);
+	ip = gl->gl_object;
+	if (ip && !igrab(&ip->i_inode))
+		ip = NULL;
+	spin_unlock(&gl->gl_lockref.lock);
+	if (ip) {
+		struct gfs2_glock *inode_gl = NULL;
+
+		gl->gl_no_formal_ino = ip->i_no_formal_ino;
+		set_bit(GIF_DEFERRED_DELETE, &ip->i_flags);
+		d_prune_aliases(&ip->i_inode);
+		iput(&ip->i_inode);
+
+		/* If the inode was evicted, gl->gl_object will now be NULL. */
+		spin_lock(&gl->gl_lockref.lock);
+		ip = gl->gl_object;
+		if (ip) {
+			inode_gl = ip->i_gl;
+			lockref_get(&inode_gl->gl_lockref);
+			clear_bit(GIF_DEFERRED_DELETE, &ip->i_flags);
+		}
+		spin_unlock(&gl->gl_lockref.lock);
+		if (inode_gl) {
+			gfs2_glock_poke(inode_gl);
+			gfs2_glock_put(inode_gl);
+		}
+		evicted = !ip;
+	}
+	return evicted;
+}
+
 static void delete_work_func(struct work_struct *work)
 {
-	struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_delete);
+	struct delayed_work *dwork = to_delayed_work(work);
+	struct gfs2_glock *gl = container_of(dwork, struct gfs2_glock, gl_delete);
 	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 	struct inode *inode;
 	u64 no_addr = gl->gl_name.ln_number;
+
+	spin_lock(&gl->gl_lockref.lock);
+	clear_bit(GLF_PENDING_DELETE, &gl->gl_flags);
+	spin_unlock(&gl->gl_lockref.lock);
 
 	/* If someone's using this glock to create a new dinode, the block must
 	   have been freed by another node, then re-used, in which case our
@@ -682,8 +881,34 @@
 	if (test_bit(GLF_INODE_CREATING, &gl->gl_flags))
 		goto out;
 
-	inode = gfs2_lookup_by_inum(sdp, no_addr, NULL, GFS2_BLKST_UNLINKED);
-	if (inode && !IS_ERR(inode)) {
+	if (test_bit(GLF_DEMOTE, &gl->gl_flags)) {
+		/*
+		 * If we can evict the inode, give the remote node trying to
+		 * delete the inode some time before verifying that the delete
+		 * has happened.  Otherwise, if we cause contention on the inode glock
+		 * immediately, the remote node will think that we still have
+		 * the inode in use, and so it will give up waiting.
+		 *
+		 * If we can't evict the inode, signal to the remote node that
+		 * the inode is still in use.  We'll later try to delete the
+		 * inode locally in gfs2_evict_inode.
+		 *
+		 * FIXME: We only need to verify that the remote node has
+		 * deleted the inode because nodes before this remote delete
+		 * rework won't cooperate.  At a later time, when we no longer
+		 * care about compatibility with such nodes, we can skip this
+		 * step entirely.
+		 */
+		if (gfs2_try_evict(gl)) {
+			if (gfs2_queue_delete_work(gl, 5 * HZ))
+				return;
+		}
+		goto out;
+	}
+
+	inode = gfs2_lookup_by_inum(sdp, no_addr, gl->gl_no_formal_ino,
+				    GFS2_BLKST_UNLINKED);
+	if (!IS_ERR_OR_NULL(inode)) {
 		d_prune_aliases(inode);
 		iput(inode);
 	}
@@ -713,7 +938,7 @@
 
 		if (!delay) {
 			clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
-			set_bit(GLF_DEMOTE, &gl->gl_flags);
+			gfs2_set_demote(gl);
 		}
 	}
 	run_queue(gl, 0);
@@ -817,7 +1042,7 @@
 	memset(&gl->gl_lksb, 0, sizeof(struct dlm_lksb));
 
 	if (glops->go_flags & GLOF_LVB) {
-		gl->gl_lksb.sb_lvbptr = kzalloc(GFS2_MIN_LVB_SIZE, GFP_NOFS);
+		gl->gl_lksb.sb_lvbptr = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
 		if (!gl->gl_lksb.sb_lvbptr) {
 			kmem_cache_free(cachep, gl);
 			return -ENOMEM;
@@ -828,6 +1053,7 @@
 	gl->gl_node.next = NULL;
 	gl->gl_flags = 0;
 	gl->gl_name = name;
+	lockdep_set_subclass(&gl->gl_lockref.lock, glops->go_subclass);
 	gl->gl_lockref.count = 1;
 	gl->gl_state = LM_ST_UNLOCKED;
 	gl->gl_target = LM_ST_UNLOCKED;
@@ -844,7 +1070,8 @@
 	gl->gl_object = NULL;
 	gl->gl_hold_time = GL_GLOCK_DFT_HOLD;
 	INIT_DELAYED_WORK(&gl->gl_work, glock_work_func);
-	INIT_WORK(&gl->gl_delete, delete_work_func);
+	if (gl->gl_name.ln_type == LM_TYPE_IOPEN)
+		INIT_DELAYED_WORK(&gl->gl_delete, delete_work_func);
 
 	mapping = gfs2_glock2aspace(gl);
 	if (mapping) {
@@ -934,6 +1161,17 @@
 	gh->gh_ip = 0;
 }
 
+static void gfs2_glock_update_hold_time(struct gfs2_glock *gl,
+					unsigned long start_time)
+{
+	/* Have we waited longer that a second? */
+	if (time_after(jiffies, start_time + HZ)) {
+		/* Lengthen the minimum hold time. */
+		gl->gl_hold_time = min(gl->gl_hold_time + GL_GLOCK_HOLD_INCR,
+				       GL_GLOCK_MAX_HOLD);
+	}
+}
+
 /**
  * gfs2_glock_wait - wait on a glock acquisition
  * @gh: the glock holder
@@ -943,16 +1181,97 @@
 
 int gfs2_glock_wait(struct gfs2_holder *gh)
 {
-	unsigned long time1 = jiffies;
+	unsigned long start_time = jiffies;
 
 	might_sleep();
 	wait_on_bit(&gh->gh_iflags, HIF_WAIT, TASK_UNINTERRUPTIBLE);
-	if (time_after(jiffies, time1 + HZ)) /* have we waited > a second? */
-		/* Lengthen the minimum hold time. */
-		gh->gh_gl->gl_hold_time = min(gh->gh_gl->gl_hold_time +
-					      GL_GLOCK_HOLD_INCR,
-					      GL_GLOCK_MAX_HOLD);
+	gfs2_glock_update_hold_time(gh->gh_gl, start_time);
 	return gh->gh_error;
+}
+
+static int glocks_pending(unsigned int num_gh, struct gfs2_holder *ghs)
+{
+	int i;
+
+	for (i = 0; i < num_gh; i++)
+		if (test_bit(HIF_WAIT, &ghs[i].gh_iflags))
+			return 1;
+	return 0;
+}
+
+/**
+ * gfs2_glock_async_wait - wait on multiple asynchronous glock acquisitions
+ * @num_gh: the number of holders in the array
+ * @ghs: the glock holder array
+ *
+ * Returns: 0 on success, meaning all glocks have been granted and are held.
+ *          -ESTALE if the request timed out, meaning all glocks were released,
+ *          and the caller should retry the operation.
+ */
+
+int gfs2_glock_async_wait(unsigned int num_gh, struct gfs2_holder *ghs)
+{
+	struct gfs2_sbd *sdp = ghs[0].gh_gl->gl_name.ln_sbd;
+	int i, ret = 0, timeout = 0;
+	unsigned long start_time = jiffies;
+	bool keep_waiting;
+
+	might_sleep();
+	/*
+	 * Total up the (minimum hold time * 2) of all glocks and use that to
+	 * determine the max amount of time we should wait.
+	 */
+	for (i = 0; i < num_gh; i++)
+		timeout += ghs[i].gh_gl->gl_hold_time << 1;
+
+wait_for_dlm:
+	if (!wait_event_timeout(sdp->sd_async_glock_wait,
+				!glocks_pending(num_gh, ghs), timeout))
+		ret = -ESTALE; /* request timed out. */
+
+	/*
+	 * If dlm granted all our requests, we need to adjust the glock
+	 * minimum hold time values according to how long we waited.
+	 *
+	 * If our request timed out, we need to repeatedly release any held
+	 * glocks we acquired thus far to allow dlm to acquire the remaining
+	 * glocks without deadlocking.  We cannot currently cancel outstanding
+	 * glock acquisitions.
+	 *
+	 * The HIF_WAIT bit tells us which requests still need a response from
+	 * dlm.
+	 *
+	 * If dlm sent us any errors, we return the first error we find.
+	 */
+	keep_waiting = false;
+	for (i = 0; i < num_gh; i++) {
+		/* Skip holders we have already dequeued below. */
+		if (!gfs2_holder_queued(&ghs[i]))
+			continue;
+		/* Skip holders with a pending DLM response. */
+		if (test_bit(HIF_WAIT, &ghs[i].gh_iflags)) {
+			keep_waiting = true;
+			continue;
+		}
+
+		if (test_bit(HIF_HOLDER, &ghs[i].gh_iflags)) {
+			if (ret == -ESTALE)
+				gfs2_glock_dq(&ghs[i]);
+			else
+				gfs2_glock_update_hold_time(ghs[i].gh_gl,
+							    start_time);
+		}
+		if (!ret)
+			ret = ghs[i].gh_error;
+	}
+
+	if (keep_waiting)
+		goto wait_for_dlm;
+
+	/*
+	 * At this point, we've either acquired all locks or released them all.
+	 */
+	return ret;
 }
 
 /**
@@ -967,9 +1286,10 @@
 static void handle_callback(struct gfs2_glock *gl, unsigned int state,
 			    unsigned long delay, bool remote)
 {
-	int bit = delay ? GLF_PENDING_DEMOTE : GLF_DEMOTE;
-
-	set_bit(bit, &gl->gl_flags);
+	if (delay)
+		set_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
+	else
+		gfs2_set_demote(gl);
 	if (gl->gl_demote_state == LM_ST_EXCLUSIVE) {
 		gl->gl_demote_state = state;
 		gl->gl_demote_time = jiffies;
@@ -1021,9 +1341,9 @@
 	struct gfs2_holder *gh2;
 	int try_futile = 0;
 
-	BUG_ON(gh->gh_owner_pid == NULL);
+	GLOCK_BUG_ON(gl, gh->gh_owner_pid == NULL);
 	if (test_and_set_bit(HIF_WAIT, &gh->gh_iflags))
-		BUG();
+		GLOCK_BUG_ON(gl, true);
 
 	if (gh->gh_flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB)) {
 		if (test_bit(GLF_LOCK, &gl->gl_flags))
@@ -1048,7 +1368,6 @@
 		if (unlikely((gh->gh_flags & LM_FLAG_PRIORITY) && !insert_pt))
 			insert_pt = &gh2->gh_list;
 	}
-	set_bit(GLF_QUEUED, &gl->gl_flags);
 	trace_gfs2_glock_queue(gh, 1);
 	gfs2_glstats_inc(gl, GFS2_LKS_QCOUNT);
 	gfs2_sbstats_inc(gl, GFS2_LKS_QCOUNT);
@@ -1060,7 +1379,7 @@
 	}
 	list_add_tail(&gh->gh_list, insert_pt);
 do_cancel:
-	gh = list_entry(gl->gl_holders.next, struct gfs2_holder, gh_list);
+	gh = list_first_entry(&gl->gl_holders, struct gfs2_holder, gh_list);
 	if (!(gh->gh_flags & LM_FLAG_PRIORITY)) {
 		spin_unlock(&gl->gl_lockref.lock);
 		if (sdp->sd_lockstruct.ls_ops->lm_cancel)
@@ -1070,15 +1389,15 @@
 	return;
 
 trap_recursive:
-	pr_err("original: %pSR\n", (void *)gh2->gh_ip);
-	pr_err("pid: %d\n", pid_nr(gh2->gh_owner_pid));
-	pr_err("lock type: %d req lock state : %d\n",
+	fs_err(sdp, "original: %pSR\n", (void *)gh2->gh_ip);
+	fs_err(sdp, "pid: %d\n", pid_nr(gh2->gh_owner_pid));
+	fs_err(sdp, "lock type: %d req lock state : %d\n",
 	       gh2->gh_gl->gl_name.ln_type, gh2->gh_state);
-	pr_err("new: %pSR\n", (void *)gh->gh_ip);
-	pr_err("pid: %d\n", pid_nr(gh->gh_owner_pid));
-	pr_err("lock type: %d req lock state : %d\n",
+	fs_err(sdp, "new: %pSR\n", (void *)gh->gh_ip);
+	fs_err(sdp, "pid: %d\n", pid_nr(gh->gh_owner_pid));
+	fs_err(sdp, "lock type: %d req lock state : %d\n",
 	       gh->gh_gl->gl_name.ln_type, gh->gh_state);
-	gfs2_dump_glock(NULL, gl);
+	gfs2_dump_glock(NULL, gl, true);
 	BUG();
 }
 
@@ -1094,10 +1413,9 @@
 int gfs2_glock_nq(struct gfs2_holder *gh)
 {
 	struct gfs2_glock *gl = gh->gh_gl;
-	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 	int error = 0;
 
-	if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
+	if (glock_blocked_by_withdraw(gl) && !(gh->gh_flags & LM_FLAG_NOEXP))
 		return -EIO;
 
 	if (test_bit(GLF_LRU, &gl->gl_flags))
@@ -1141,24 +1459,34 @@
 void gfs2_glock_dq(struct gfs2_holder *gh)
 {
 	struct gfs2_glock *gl = gh->gh_gl;
-	const struct gfs2_glock_operations *glops = gl->gl_ops;
+	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 	unsigned delay = 0;
 	int fast_path = 0;
 
 	spin_lock(&gl->gl_lockref.lock);
+	/*
+	 * If we're in the process of file system withdraw, we cannot just
+	 * dequeue any glocks until our journal is recovered, lest we
+	 * introduce file system corruption. We need two exceptions to this
+	 * rule: We need to allow unlocking of nondisk glocks and the glock
+	 * for our own journal that needs recovery.
+	 */
+	if (test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags) &&
+	    glock_blocked_by_withdraw(gl) &&
+	    gh->gh_gl != sdp->sd_jinode_gl) {
+		sdp->sd_glock_dqs_held++;
+		spin_unlock(&gl->gl_lockref.lock);
+		might_sleep();
+		wait_on_bit(&sdp->sd_flags, SDF_WITHDRAW_RECOVERY,
+			    TASK_UNINTERRUPTIBLE);
+		spin_lock(&gl->gl_lockref.lock);
+	}
 	if (gh->gh_flags & GL_NOCACHE)
 		handle_callback(gl, LM_ST_UNLOCKED, 0, false);
 
 	list_del_init(&gh->gh_list);
 	clear_bit(HIF_HOLDER, &gh->gh_iflags);
 	if (find_first_holder(gl) == NULL) {
-		if (glops->go_unlock) {
-			GLOCK_BUG_ON(gl, test_and_set_bit(GLF_LOCK, &gl->gl_flags));
-			spin_unlock(&gl->gl_lockref.lock);
-			glops->go_unlock(gh);
-			spin_lock(&gl->gl_lockref.lock);
-			clear_bit(GLF_LOCK, &gl->gl_flags);
-		}
 		if (list_empty(&gl->gl_holders) &&
 		    !test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
 		    !test_bit(GLF_DEMOTE, &gl->gl_flags))
@@ -1342,16 +1670,15 @@
 	unsigned long now = jiffies;
 
 	gfs2_glock_hold(gl);
+	spin_lock(&gl->gl_lockref.lock);
 	holdtime = gl->gl_tchange + gl->gl_hold_time;
-	if (test_bit(GLF_QUEUED, &gl->gl_flags) &&
+	if (!list_empty(&gl->gl_holders) &&
 	    gl->gl_name.ln_type == LM_TYPE_INODE) {
 		if (time_before(now, holdtime))
 			delay = holdtime - now;
 		if (test_bit(GLF_REPLY_PENDING, &gl->gl_flags))
 			delay = gl->gl_hold_time;
 	}
-
-	spin_lock(&gl->gl_lockref.lock);
 	handle_callback(gl, state, delay, true);
 	__gfs2_glock_queue_work(gl, delay);
 	spin_unlock(&gl->gl_lockref.lock);
@@ -1455,7 +1782,7 @@
 	list_sort(NULL, list, glock_cmp);
 
 	while(!list_empty(list)) {
-		gl = list_entry(list->next, struct gfs2_glock, gl_lru);
+		gl = list_first_entry(list, struct gfs2_glock, gl_lru);
 		list_del_init(&gl->gl_lru);
 		clear_bit(GLF_LRU, &gl->gl_flags);
 		if (!spin_trylock(&gl->gl_lockref.lock)) {
@@ -1497,7 +1824,7 @@
 
 	spin_lock(&lru_lock);
 	while ((nr-- >= 0) && !list_empty(&lru_list)) {
-		gl = list_entry(lru_list.next, struct gfs2_glock, gl_lru);
+		gl = list_first_entry(&lru_list, struct gfs2_glock, gl_lru);
 
 		/* Test for being demotable */
 		if (!test_bit(GLF_LOCK, &gl->gl_flags)) {
@@ -1538,10 +1865,9 @@
 };
 
 /**
- * examine_bucket - Call a function for glock in a hash bucket
+ * glock_hash_walk - Call a function for glock in a hash bucket
  * @examiner: the function
  * @sdp: the filesystem
- * @bucket: the bucket
  *
  * Note that the function can be called multiple times on the same
  * object.  So the user must ensure that the function can cope with
@@ -1558,15 +1884,57 @@
 	do {
 		rhashtable_walk_start(&iter);
 
-		while ((gl = rhashtable_walk_next(&iter)) && !IS_ERR(gl))
-			if (gl->gl_name.ln_sbd == sdp &&
-			    lockref_get_not_dead(&gl->gl_lockref))
+		while ((gl = rhashtable_walk_next(&iter)) && !IS_ERR(gl)) {
+			if (gl->gl_name.ln_sbd == sdp)
 				examiner(gl);
+		}
 
 		rhashtable_walk_stop(&iter);
 	} while (cond_resched(), gl == ERR_PTR(-EAGAIN));
 
 	rhashtable_walk_exit(&iter);
+}
+
+bool gfs2_queue_delete_work(struct gfs2_glock *gl, unsigned long delay)
+{
+	bool queued;
+
+	spin_lock(&gl->gl_lockref.lock);
+	queued = queue_delayed_work(gfs2_delete_workqueue,
+				    &gl->gl_delete, delay);
+	if (queued)
+		set_bit(GLF_PENDING_DELETE, &gl->gl_flags);
+	spin_unlock(&gl->gl_lockref.lock);
+	return queued;
+}
+
+void gfs2_cancel_delete_work(struct gfs2_glock *gl)
+{
+	if (cancel_delayed_work(&gl->gl_delete)) {
+		clear_bit(GLF_PENDING_DELETE, &gl->gl_flags);
+		gfs2_glock_put(gl);
+	}
+}
+
+bool gfs2_delete_work_queued(const struct gfs2_glock *gl)
+{
+	return test_bit(GLF_PENDING_DELETE, &gl->gl_flags);
+}
+
+static void flush_delete_work(struct gfs2_glock *gl)
+{
+	if (gl->gl_name.ln_type == LM_TYPE_IOPEN) {
+		if (cancel_delayed_work(&gl->gl_delete)) {
+			queue_delayed_work(gfs2_delete_workqueue,
+					   &gl->gl_delete, 0);
+		}
+	}
+}
+
+void gfs2_flush_delete_work(struct gfs2_sbd *sdp)
+{
+	glock_hash_walk(flush_delete_work, sdp);
+	flush_workqueue(gfs2_delete_workqueue);
 }
 
 /**
@@ -1577,10 +1945,10 @@
 
 static void thaw_glock(struct gfs2_glock *gl)
 {
-	if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) {
-		gfs2_glock_put(gl);
+	if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
 		return;
-	}
+	if (!lockref_get_not_dead(&gl->gl_lockref))
+		return;
 	set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
 	gfs2_glock_queue_work(gl, 0);
 }
@@ -1596,9 +1964,12 @@
 	gfs2_glock_remove_from_lru(gl);
 
 	spin_lock(&gl->gl_lockref.lock);
-	if (gl->gl_state != LM_ST_UNLOCKED)
-		handle_callback(gl, LM_ST_UNLOCKED, 0, false);
-	__gfs2_glock_queue_work(gl, 0);
+	if (!__lockref_is_dead(&gl->gl_lockref)) {
+		gl->gl_lockref.count++;
+		if (gl->gl_state != LM_ST_UNLOCKED)
+			handle_callback(gl, LM_ST_UNLOCKED, 0, false);
+		__gfs2_glock_queue_work(gl, 0);
+	}
 	spin_unlock(&gl->gl_lockref.lock);
 }
 
@@ -1613,16 +1984,16 @@
 	glock_hash_walk(thaw_glock, sdp);
 }
 
-static void dump_glock(struct seq_file *seq, struct gfs2_glock *gl)
+static void dump_glock(struct seq_file *seq, struct gfs2_glock *gl, bool fsid)
 {
 	spin_lock(&gl->gl_lockref.lock);
-	gfs2_dump_glock(seq, gl);
+	gfs2_dump_glock(seq, gl, fsid);
 	spin_unlock(&gl->gl_lockref.lock);
 }
 
 static void dump_glock_func(struct gfs2_glock *gl)
 {
-	dump_glock(NULL, gl);
+	dump_glock(NULL, gl, true);
 }
 
 /**
@@ -1651,7 +2022,7 @@
 	int ret;
 
 	ret = gfs2_truncatei_resume(ip);
-	gfs2_assert_withdraw(gl->gl_name.ln_sbd, ret == 0);
+	gfs2_glock_assert_withdraw(gl, ret == 0);
 
 	spin_lock(&gl->gl_lockref.lock);
 	clear_bit(GLF_LOCK, &gl->gl_flags);
@@ -1707,10 +2078,12 @@
  * dump_holder - print information about a glock holder
  * @seq: the seq_file struct
  * @gh: the glock holder
+ * @fs_id_buf: pointer to file system id (if requested)
  *
  */
 
-static void dump_holder(struct seq_file *seq, const struct gfs2_holder *gh)
+static void dump_holder(struct seq_file *seq, const struct gfs2_holder *gh,
+			const char *fs_id_buf)
 {
 	struct task_struct *gh_owner = NULL;
 	char flags_buf[32];
@@ -1718,8 +2091,8 @@
 	rcu_read_lock();
 	if (gh->gh_owner_pid)
 		gh_owner = pid_task(gh->gh_owner_pid, PIDTYPE_PID);
-	gfs2_print_dbg(seq, " H: s:%s f:%s e:%d p:%ld [%s] %pS\n",
-		       state2str(gh->gh_state),
+	gfs2_print_dbg(seq, "%s H: s:%s f:%s e:%d p:%ld [%s] %pS\n",
+		       fs_id_buf, state2str(gh->gh_state),
 		       hflags2str(flags_buf, gh->gh_flags, gh->gh_iflags),
 		       gh->gh_error,
 		       gh->gh_owner_pid ? (long)pid_nr(gh->gh_owner_pid) : -1,
@@ -1753,7 +2126,7 @@
 		*p++ = 'I';
 	if (test_bit(GLF_FROZEN, gflags))
 		*p++ = 'F';
-	if (test_bit(GLF_QUEUED, gflags))
+	if (!list_empty(&gl->gl_holders))
 		*p++ = 'q';
 	if (test_bit(GLF_LRU, gflags))
 		*p++ = 'L';
@@ -1761,6 +2134,12 @@
 		*p++ = 'o';
 	if (test_bit(GLF_BLOCKING, gflags))
 		*p++ = 'b';
+	if (test_bit(GLF_INODE_CREATING, gflags))
+		*p++ = 'c';
+	if (test_bit(GLF_PENDING_DELETE, gflags))
+		*p++ = 'P';
+	if (test_bit(GLF_FREEING, gflags))
+		*p++ = 'x';
 	*p = 0;
 	return buf;
 }
@@ -1769,6 +2148,7 @@
  * gfs2_dump_glock - print information about a glock
  * @seq: The seq_file struct
  * @gl: the glock
+ * @fsid: If true, also dump the file system id
  *
  * The file format is as follows:
  * One line per object, capital letters are used to indicate objects
@@ -1782,33 +2162,45 @@
  *
  */
 
-void gfs2_dump_glock(struct seq_file *seq, const struct gfs2_glock *gl)
+void gfs2_dump_glock(struct seq_file *seq, struct gfs2_glock *gl, bool fsid)
 {
 	const struct gfs2_glock_operations *glops = gl->gl_ops;
 	unsigned long long dtime;
 	const struct gfs2_holder *gh;
 	char gflags_buf[32];
+	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+	char fs_id_buf[sizeof(sdp->sd_fsname) + 7];
+	unsigned long nrpages = 0;
 
+	if (gl->gl_ops->go_flags & GLOF_ASPACE) {
+		struct address_space *mapping = gfs2_glock2aspace(gl);
+
+		nrpages = mapping->nrpages;
+	}
+	memset(fs_id_buf, 0, sizeof(fs_id_buf));
+	if (fsid && sdp) /* safety precaution */
+		sprintf(fs_id_buf, "fsid=%s: ", sdp->sd_fsname);
 	dtime = jiffies - gl->gl_demote_time;
 	dtime *= 1000000/HZ; /* demote time in uSec */
 	if (!test_bit(GLF_DEMOTE, &gl->gl_flags))
 		dtime = 0;
-	gfs2_print_dbg(seq, "G:  s:%s n:%u/%llx f:%s t:%s d:%s/%llu a:%d v:%d r:%d m:%ld\n",
-		  state2str(gl->gl_state),
-		  gl->gl_name.ln_type,
-		  (unsigned long long)gl->gl_name.ln_number,
-		  gflags2str(gflags_buf, gl),
-		  state2str(gl->gl_target),
-		  state2str(gl->gl_demote_state), dtime,
-		  atomic_read(&gl->gl_ail_count),
-		  atomic_read(&gl->gl_revokes),
-		  (int)gl->gl_lockref.count, gl->gl_hold_time);
+	gfs2_print_dbg(seq, "%sG:  s:%s n:%u/%llx f:%s t:%s d:%s/%llu a:%d "
+		       "v:%d r:%d m:%ld p:%lu\n",
+		       fs_id_buf, state2str(gl->gl_state),
+		       gl->gl_name.ln_type,
+		       (unsigned long long)gl->gl_name.ln_number,
+		       gflags2str(gflags_buf, gl),
+		       state2str(gl->gl_target),
+		       state2str(gl->gl_demote_state), dtime,
+		       atomic_read(&gl->gl_ail_count),
+		       atomic_read(&gl->gl_revokes),
+		       (int)gl->gl_lockref.count, gl->gl_hold_time, nrpages);
 
 	list_for_each_entry(gh, &gl->gl_holders, gh_list)
-		dump_holder(seq, gh);
+		dump_holder(seq, gh, fs_id_buf);
 
 	if (gl->gl_state != LM_ST_UNLOCKED && glops->go_dump)
-		glops->go_dump(seq, gl);
+		glops->go_dump(seq, gl, fs_id_buf);
 }
 
 static int gfs2_glstats_seq_show(struct seq_file *seq, void *iter_ptr)
@@ -2009,7 +2401,7 @@
 
 static int gfs2_glock_seq_show(struct seq_file *seq, void *iter_ptr)
 {
-	dump_glock(seq, iter_ptr);
+	dump_glock(seq, iter_ptr, false);
 	return 0;
 }
 
@@ -2049,7 +2441,7 @@
 	.show  = gfs2_glstats_seq_show,
 };
 
-static const struct seq_operations gfs2_sbstats_seq_ops = {
+static const struct seq_operations gfs2_sbstats_sops = {
 	.start = gfs2_sbstats_seq_start,
 	.next  = gfs2_sbstats_seq_next,
 	.stop  = gfs2_sbstats_seq_stop,
@@ -2102,16 +2494,6 @@
 	return __gfs2_glocks_open(inode, file, &gfs2_glstats_seq_ops);
 }
 
-static int gfs2_sbstats_open(struct inode *inode, struct file *file)
-{
-	int ret = seq_open(file, &gfs2_sbstats_seq_ops);
-	if (ret == 0) {
-		struct seq_file *seq = file->private_data;
-		seq->private = inode->i_private;  /* sdp */
-	}
-	return ret;
-}
-
 static const struct file_operations gfs2_glocks_fops = {
 	.owner   = THIS_MODULE,
 	.open    = gfs2_glocks_open,
@@ -2128,79 +2510,31 @@
 	.release = gfs2_glocks_release,
 };
 
-static const struct file_operations gfs2_sbstats_fops = {
-	.owner   = THIS_MODULE,
-	.open	 = gfs2_sbstats_open,
-	.read    = seq_read,
-	.llseek  = seq_lseek,
-	.release = seq_release,
-};
+DEFINE_SEQ_ATTRIBUTE(gfs2_sbstats);
 
-int gfs2_create_debugfs_file(struct gfs2_sbd *sdp)
+void gfs2_create_debugfs_file(struct gfs2_sbd *sdp)
 {
-	struct dentry *dent;
+	sdp->debugfs_dir = debugfs_create_dir(sdp->sd_table_name, gfs2_root);
 
-	dent = debugfs_create_dir(sdp->sd_table_name, gfs2_root);
-	if (IS_ERR_OR_NULL(dent))
-		goto fail;
-	sdp->debugfs_dir = dent;
+	debugfs_create_file("glocks", S_IFREG | S_IRUGO, sdp->debugfs_dir, sdp,
+			    &gfs2_glocks_fops);
 
-	dent = debugfs_create_file("glocks",
-				   S_IFREG | S_IRUGO,
-				   sdp->debugfs_dir, sdp,
-				   &gfs2_glocks_fops);
-	if (IS_ERR_OR_NULL(dent))
-		goto fail;
-	sdp->debugfs_dentry_glocks = dent;
+	debugfs_create_file("glstats", S_IFREG | S_IRUGO, sdp->debugfs_dir, sdp,
+			    &gfs2_glstats_fops);
 
-	dent = debugfs_create_file("glstats",
-				   S_IFREG | S_IRUGO,
-				   sdp->debugfs_dir, sdp,
-				   &gfs2_glstats_fops);
-	if (IS_ERR_OR_NULL(dent))
-		goto fail;
-	sdp->debugfs_dentry_glstats = dent;
-
-	dent = debugfs_create_file("sbstats",
-				   S_IFREG | S_IRUGO,
-				   sdp->debugfs_dir, sdp,
-				   &gfs2_sbstats_fops);
-	if (IS_ERR_OR_NULL(dent))
-		goto fail;
-	sdp->debugfs_dentry_sbstats = dent;
-
-	return 0;
-fail:
-	gfs2_delete_debugfs_file(sdp);
-	return dent ? PTR_ERR(dent) : -ENOMEM;
+	debugfs_create_file("sbstats", S_IFREG | S_IRUGO, sdp->debugfs_dir, sdp,
+			    &gfs2_sbstats_fops);
 }
 
 void gfs2_delete_debugfs_file(struct gfs2_sbd *sdp)
 {
-	if (sdp->debugfs_dir) {
-		if (sdp->debugfs_dentry_glocks) {
-			debugfs_remove(sdp->debugfs_dentry_glocks);
-			sdp->debugfs_dentry_glocks = NULL;
-		}
-		if (sdp->debugfs_dentry_glstats) {
-			debugfs_remove(sdp->debugfs_dentry_glstats);
-			sdp->debugfs_dentry_glstats = NULL;
-		}
-		if (sdp->debugfs_dentry_sbstats) {
-			debugfs_remove(sdp->debugfs_dentry_sbstats);
-			sdp->debugfs_dentry_sbstats = NULL;
-		}
-		debugfs_remove(sdp->debugfs_dir);
-		sdp->debugfs_dir = NULL;
-	}
+	debugfs_remove_recursive(sdp->debugfs_dir);
+	sdp->debugfs_dir = NULL;
 }
 
-int gfs2_register_debugfs(void)
+void gfs2_register_debugfs(void)
 {
 	gfs2_root = debugfs_create_dir("gfs2", NULL);
-	if (IS_ERR(gfs2_root))
-		return PTR_ERR(gfs2_root);
-	return gfs2_root ? 0 : -ENOMEM;
 }
 
 void gfs2_unregister_debugfs(void)

--
Gitblit v1.6.2