From 102a0743326a03cd1a1202ceda21e175b7d3575c Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Tue, 20 Feb 2024 01:20:52 +0000
Subject: [PATCH] add new system file

---
 kernel/fs/ceph/snap.c |  288 +++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 249 insertions(+), 39 deletions(-)

diff --git a/kernel/fs/ceph/snap.c b/kernel/fs/ceph/snap.c
index 5cf7b5f..db46468 100644
--- a/kernel/fs/ceph/snap.c
+++ b/kernel/fs/ceph/snap.c
@@ -3,11 +3,13 @@
 
 #include <linux/sort.h>
 #include <linux/slab.h>
-
+#include <linux/iversion.h>
 #include "super.h"
 #include "mds_client.h"
-
 #include <linux/ceph/decode.h>
+
+/* unused map expires after 5 minutes */
+#define CEPH_SNAPID_MAP_TIMEOUT	(5 * 60 * HZ)
 
 /*
  * Snapshots in ceph are driven in large part by cooperation from the
@@ -58,24 +60,26 @@
 /*
  * increase ref count for the realm
  *
- * caller must hold snap_rwsem for write.
+ * caller must hold snap_rwsem.
  */
 void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
 			 struct ceph_snap_realm *realm)
 {
-	dout("get_realm %p %d -> %d\n", realm,
-	     atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
+	lockdep_assert_held(&mdsc->snap_rwsem);
+
 	/*
-	 * since we _only_ increment realm refs or empty the empty
-	 * list with snap_rwsem held, adjusting the empty list here is
-	 * safe.  we do need to protect against concurrent empty list
-	 * additions, however.
+	 * The 0->1 and 1->0 transitions must take the snap_empty_lock
+	 * atomically with the refcount change. Go ahead and bump the
+	 * nref here, unless it's 0, in which case we take the spinlock
+	 * and then do the increment and remove it from the list.
 	 */
-	if (atomic_inc_return(&realm->nref) == 1) {
-		spin_lock(&mdsc->snap_empty_lock);
+	if (atomic_inc_not_zero(&realm->nref))
+		return;
+
+	spin_lock(&mdsc->snap_empty_lock);
+	if (atomic_inc_return(&realm->nref) == 1)
 		list_del_init(&realm->empty_item);
-		spin_unlock(&mdsc->snap_empty_lock);
-	}
+	spin_unlock(&mdsc->snap_empty_lock);
 }
 
 static void __insert_snap_realm(struct rb_root *root,
@@ -111,6 +115,8 @@
 {
 	struct ceph_snap_realm *realm;
 
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	realm = kzalloc(sizeof(*realm), GFP_NOFS);
 	if (!realm)
 		return ERR_PTR(-ENOMEM);
@@ -124,6 +130,8 @@
 	INIT_LIST_HEAD(&realm->inodes_with_caps);
 	spin_lock_init(&realm->inodes_with_caps_lock);
 	__insert_snap_realm(&mdsc->snap_realms, realm);
+	mdsc->num_snap_realms++;
+
 	dout("create_snap_realm %llx %p\n", realm->ino, realm);
 	return realm;
 }
@@ -131,13 +139,15 @@
 /*
  * lookup the realm rooted at @ino.
  *
- * caller must hold snap_rwsem for write.
+ * caller must hold snap_rwsem.
  */
 static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
 						   u64 ino)
 {
 	struct rb_node *n = mdsc->snap_realms.rb_node;
 	struct ceph_snap_realm *r;
+
+	lockdep_assert_held(&mdsc->snap_rwsem);
 
 	while (n) {
 		r = rb_entry(n, struct ceph_snap_realm, node);
@@ -172,9 +182,12 @@
 static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 				 struct ceph_snap_realm *realm)
 {
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 
 	rb_erase(&realm->node, &mdsc->snap_realms);
+	mdsc->num_snap_realms--;
 
 	if (realm->parent) {
 		list_del_init(&realm->child_item);
@@ -193,28 +206,30 @@
 static void __put_snap_realm(struct ceph_mds_client *mdsc,
 			     struct ceph_snap_realm *realm)
 {
-	dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
-	     atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
+	/*
+	 * We do not require the snap_empty_lock here, as any caller that
+	 * increments the value must hold the snap_rwsem.
+	 */
 	if (atomic_dec_and_test(&realm->nref))
 		__destroy_snap_realm(mdsc, realm);
 }
 
 /*
- * caller needn't hold any locks
+ * See comments in ceph_get_snap_realm. Caller needn't hold any locks.
  */
 void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
 			 struct ceph_snap_realm *realm)
 {
-	dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
-	     atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
-	if (!atomic_dec_and_test(&realm->nref))
+	if (!atomic_dec_and_lock(&realm->nref, &mdsc->snap_empty_lock))
 		return;
 
 	if (down_write_trylock(&mdsc->snap_rwsem)) {
+		spin_unlock(&mdsc->snap_empty_lock);
 		__destroy_snap_realm(mdsc, realm);
 		up_write(&mdsc->snap_rwsem);
 	} else {
-		spin_lock(&mdsc->snap_empty_lock);
 		list_add(&realm->empty_item, &mdsc->snap_empty);
 		spin_unlock(&mdsc->snap_empty_lock);
 	}
@@ -230,6 +245,8 @@
 static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
 {
 	struct ceph_snap_realm *realm;
+
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
 
 	spin_lock(&mdsc->snap_empty_lock);
 	while (!list_empty(&mdsc->snap_empty)) {
@@ -263,6 +280,8 @@
 				    u64 parentino)
 {
 	struct ceph_snap_realm *parent;
+
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
 
 	if (realm->parent_ino == parentino)
 		return 0;
@@ -468,6 +487,9 @@
 		pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
 		return;
 	}
+	capsnap->cap_flush.is_capsnap = true;
+	INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
+	INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
 
 	spin_lock(&ci->i_ceph_lock);
 	used = __ceph_caps_used(ci);
@@ -597,13 +619,15 @@
 			    struct ceph_cap_snap *capsnap)
 {
 	struct inode *inode = &ci->vfs_inode;
-	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 
 	BUG_ON(capsnap->writing);
 	capsnap->size = inode->i_size;
 	capsnap->mtime = inode->i_mtime;
 	capsnap->atime = inode->i_atime;
 	capsnap->ctime = inode->i_ctime;
+	capsnap->btime = ci->i_btime;
+	capsnap->change_attr = inode_peek_iversion_raw(inode);
 	capsnap->time_warp_seq = ci->i_time_warp_seq;
 	capsnap->truncate_size = ci->i_truncate_size;
 	capsnap->truncate_seq = ci->i_truncate_seq;
@@ -623,8 +647,10 @@
 	     capsnap->size);
 
 	spin_lock(&mdsc->snap_flush_lock);
-	if (list_empty(&ci->i_snap_flush_item))
+	if (list_empty(&ci->i_snap_flush_item)) {
+		ihold(inode);
 		list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
+	}
 	spin_unlock(&mdsc->snap_flush_lock);
 	return 1;  /* caller may want to ceph_flush_snaps */
 }
@@ -646,13 +672,15 @@
 		if (!inode)
 			continue;
 		spin_unlock(&realm->inodes_with_caps_lock);
-		iput(lastinode);
+		/* avoid calling iput_final() while holding
+		 * mdsc->snap_rwsem or in mds dispatch threads */
+		ceph_async_iput(lastinode);
 		lastinode = inode;
 		ceph_queue_cap_snap(ci);
 		spin_lock(&realm->inodes_with_caps_lock);
 	}
 	spin_unlock(&realm->inodes_with_caps_lock);
-	iput(lastinode);
+	ceph_async_iput(lastinode);
 
 	dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
 }
@@ -671,14 +699,19 @@
 	struct ceph_mds_snap_realm *ri;    /* encoded */
 	__le64 *snaps;                     /* encoded */
 	__le64 *prior_parent_snaps;        /* encoded */
-	struct ceph_snap_realm *realm = NULL;
+	struct ceph_snap_realm *realm;
 	struct ceph_snap_realm *first_realm = NULL;
-	int invalidate = 0;
+	struct ceph_snap_realm *realm_to_rebuild = NULL;
+	int rebuild_snapcs;
 	int err = -ENOMEM;
 	LIST_HEAD(dirty_realms);
 
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	dout("update_snap_trace deletion=%d\n", deletion);
 more:
+	realm = NULL;
+	rebuild_snapcs = 0;
 	ceph_decode_need(&p, e, sizeof(*ri), bad);
 	ri = p;
 	p += sizeof(*ri);
@@ -702,7 +735,7 @@
 	err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
 	if (err < 0)
 		goto fail;
-	invalidate += err;
+	rebuild_snapcs += err;
 
 	if (le64_to_cpu(ri->seq) > realm->seq) {
 		dout("update_snap_trace updating %llx %p %lld -> %lld\n",
@@ -727,22 +760,30 @@
 		if (realm->seq > mdsc->last_snap_seq)
 			mdsc->last_snap_seq = realm->seq;
 
-		invalidate = 1;
+		rebuild_snapcs = 1;
 	} else if (!realm->cached_context) {
 		dout("update_snap_trace %llx %p seq %lld new\n",
 		     realm->ino, realm, realm->seq);
-		invalidate = 1;
+		rebuild_snapcs = 1;
 	} else {
 		dout("update_snap_trace %llx %p seq %lld unchanged\n",
 		     realm->ino, realm, realm->seq);
 	}
 
-	dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
-	     realm, invalidate, p, e);
+	dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino,
+	     realm, rebuild_snapcs, p, e);
 
-	/* invalidate when we reach the _end_ (root) of the trace */
-	if (invalidate && p >= e)
-		rebuild_snap_realms(realm, &dirty_realms);
+	/*
+	 * this will always track the uppest parent realm from which
+	 * we need to rebuild the snapshot contexts _downward_ in
+	 * hierarchy.
+	 */
+	if (rebuild_snapcs)
+		realm_to_rebuild = realm;
+
+	/* rebuild_snapcs when we reach the _end_ (root) of the trace */
+	if (realm_to_rebuild && p >= e)
+		rebuild_snap_realms(realm_to_rebuild, &dirty_realms);
 
 	if (!first_realm)
 		first_realm = realm;
@@ -804,7 +845,9 @@
 		ihold(inode);
 		spin_unlock(&mdsc->snap_flush_lock);
 		ceph_flush_snaps(ci, &session);
-		iput(inode);
+		/* avoid calling iput_final() while holding
+		 * session->s_mutex or in mds dispatch threads */
+		ceph_async_iput(inode);
 		spin_lock(&mdsc->snap_flush_lock);
 	}
 	spin_unlock(&mdsc->snap_flush_lock);
@@ -862,7 +905,7 @@
 	     ceph_snap_op_name(op), split, trace_len);
 
 	mutex_lock(&session->s_mutex);
-	session->s_seq++;
+	inc_session_sequence(session);
 	mutex_unlock(&session->s_mutex);
 
 	down_write(&mdsc->snap_rwsem);
@@ -948,12 +991,14 @@
 			ceph_get_snap_realm(mdsc, realm);
 			ceph_put_snap_realm(mdsc, oldrealm);
 
-			iput(inode);
+			/* avoid calling iput_final() while holding
+			 * mdsc->snap_rwsem or mds in dispatch threads */
+			ceph_async_iput(inode);
 			continue;
 
 skip_inode:
 			spin_unlock(&ci->i_ceph_lock);
-			iput(inode);
+			ceph_async_iput(inode);
 		}
 
 		/* we may have taken some of the old realm's children. */
@@ -965,6 +1010,19 @@
 				continue;
 			adjust_snap_realm_parent(mdsc, child, realm->ino);
 		}
+	} else {
+		/*
+		 * In the non-split case both 'num_split_inos' and
+		 * 'num_split_realms' should be 0, making this a no-op.
+		 * However the MDS happens to populate 'split_realms' list
+		 * in one of the UPDATE op cases by mistake.
+		 *
+		 * Skip both lists just in case to ensure that 'p' is
+		 * positioned at the start of realm info, as expected by
+		 * ceph_update_snap_trace().
+		 */
+		p += sizeof(u64) * num_split_inos;
+		p += sizeof(u64) * num_split_realms;
 	}
 
 	/*
@@ -993,3 +1051,155 @@
 		up_write(&mdsc->snap_rwsem);
 	return;
 }
+
+struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
+					    u64 snap)
+{
+	struct ceph_snapid_map *sm, *exist;
+	struct rb_node **p, *parent;
+	int ret;
+
+	exist = NULL;
+	spin_lock(&mdsc->snapid_map_lock);
+	p = &mdsc->snapid_map_tree.rb_node;
+	while (*p) {
+		exist = rb_entry(*p, struct ceph_snapid_map, node);
+		if (snap > exist->snap) {
+			p = &(*p)->rb_left;
+		} else if (snap < exist->snap) {
+			p = &(*p)->rb_right;
+		} else {
+			if (atomic_inc_return(&exist->ref) == 1)
+				list_del_init(&exist->lru);
+			break;
+		}
+		exist = NULL;
+	}
+	spin_unlock(&mdsc->snapid_map_lock);
+	if (exist) {
+		dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
+		return exist;
+	}
+
+	sm = kmalloc(sizeof(*sm), GFP_NOFS);
+	if (!sm)
+		return NULL;
+
+	ret = get_anon_bdev(&sm->dev);
+	if (ret < 0) {
+		kfree(sm);
+		return NULL;
+	}
+
+	INIT_LIST_HEAD(&sm->lru);
+	atomic_set(&sm->ref, 1);
+	sm->snap = snap;
+
+	exist = NULL;
+	parent = NULL;
+	p = &mdsc->snapid_map_tree.rb_node;
+	spin_lock(&mdsc->snapid_map_lock);
+	while (*p) {
+		parent = *p;
+		exist = rb_entry(*p, struct ceph_snapid_map, node);
+		if (snap > exist->snap)
+			p = &(*p)->rb_left;
+		else if (snap < exist->snap)
+			p = &(*p)->rb_right;
+		else
+			break;
+		exist = NULL;
+	}
+	if (exist) {
+		if (atomic_inc_return(&exist->ref) == 1)
+			list_del_init(&exist->lru);
+	} else {
+		rb_link_node(&sm->node, parent, p);
+		rb_insert_color(&sm->node, &mdsc->snapid_map_tree);
+	}
+	spin_unlock(&mdsc->snapid_map_lock);
+	if (exist) {
+		free_anon_bdev(sm->dev);
+		kfree(sm);
+		dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
+		return exist;
+	}
+
+	dout("create snapid map %llx -> %x\n", sm->snap, sm->dev);
+	return sm;
+}
+
+void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
+			 struct ceph_snapid_map *sm)
+{
+	if (!sm)
+		return;
+	if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) {
+		if (!RB_EMPTY_NODE(&sm->node)) {
+			sm->last_used = jiffies;
+			list_add_tail(&sm->lru, &mdsc->snapid_map_lru);
+			spin_unlock(&mdsc->snapid_map_lock);
+		} else {
+			/* already cleaned up by
+			 * ceph_cleanup_snapid_map() */
+			spin_unlock(&mdsc->snapid_map_lock);
+			kfree(sm);
+		}
+	}
+}
+
+void ceph_trim_snapid_map(struct ceph_mds_client *mdsc)
+{
+	struct ceph_snapid_map *sm;
+	unsigned long now;
+	LIST_HEAD(to_free);
+
+	spin_lock(&mdsc->snapid_map_lock);
+	now = jiffies;
+
+	while (!list_empty(&mdsc->snapid_map_lru)) {
+		sm = list_first_entry(&mdsc->snapid_map_lru,
+				      struct ceph_snapid_map, lru);
+		if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now))
+			break;
+
+		rb_erase(&sm->node, &mdsc->snapid_map_tree);
+		list_move(&sm->lru, &to_free);
+	}
+	spin_unlock(&mdsc->snapid_map_lock);
+
+	while (!list_empty(&to_free)) {
+		sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
+		list_del(&sm->lru);
+		dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev);
+		free_anon_bdev(sm->dev);
+		kfree(sm);
+	}
+}
+
+void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
+{
+	struct ceph_snapid_map *sm;
+	struct rb_node *p;
+	LIST_HEAD(to_free);
+
+	spin_lock(&mdsc->snapid_map_lock);
+	while ((p = rb_first(&mdsc->snapid_map_tree))) {
+		sm = rb_entry(p, struct ceph_snapid_map, node);
+		rb_erase(p, &mdsc->snapid_map_tree);
+		RB_CLEAR_NODE(p);
+		list_move(&sm->lru, &to_free);
+	}
+	spin_unlock(&mdsc->snapid_map_lock);
+
+	while (!list_empty(&to_free)) {
+		sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
+		list_del(&sm->lru);
+		free_anon_bdev(sm->dev);
+		if (WARN_ON_ONCE(atomic_read(&sm->ref))) {
+			pr_err("snapid map %llx -> %x still in use\n",
+			       sm->snap, sm->dev);
+		}
+		kfree(sm);
+	}
+}

--
Gitblit v1.6.2