From cde9070d9970eef1f7ec2360586c802a16230ad8 Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Fri, 10 May 2024 07:43:50 +0000
Subject: [PATCH] rtl88x2CE_WiFi_linux driver

---
 kernel/fs/ceph/caps.c | 1653 ++++++++++++++++++++++++++++++++++++----------------------
 1 files changed, 1,025 insertions(+), 628 deletions(-)

diff --git a/kernel/fs/ceph/caps.c b/kernel/fs/ceph/caps.c
index 6443ba1..432dc2a 100644
--- a/kernel/fs/ceph/caps.c
+++ b/kernel/fs/ceph/caps.c
@@ -8,6 +8,7 @@
 #include <linux/vmalloc.h>
 #include <linux/wait.h>
 #include <linux/writeback.h>
+#include <linux/iversion.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -148,11 +149,17 @@
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+			      struct ceph_mount_options *fsopt)
 {
 	spin_lock(&mdsc->caps_list_lock);
-	mdsc->caps_min_count += delta;
-	BUG_ON(mdsc->caps_min_count < 0);
+	mdsc->caps_min_count = fsopt->max_readdir;
+	if (mdsc->caps_min_count < 1024)
+		mdsc->caps_min_count = 1024;
+	mdsc->caps_use_max = fsopt->caps_max;
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_max < mdsc->caps_min_count)
+		mdsc->caps_use_max = mdsc->caps_min_count;
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
@@ -272,6 +279,7 @@
 	if (!err) {
 		BUG_ON(have + alloc != need);
 		ctx->count = need;
+		ctx->used = 0;
 	}
 
 	spin_lock(&mdsc->caps_list_lock);
@@ -295,13 +303,24 @@
 }
 
 void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
-			struct ceph_cap_reservation *ctx)
+			 struct ceph_cap_reservation *ctx)
 {
+	bool reclaim = false;
+	if (!ctx->count)
+		return;
+
 	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
 	spin_lock(&mdsc->caps_list_lock);
 	__ceph_unreserve_caps(mdsc, ctx->count);
 	ctx->count = 0;
+
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_count > mdsc->caps_use_max)
+		reclaim = true;
 	spin_unlock(&mdsc->caps_list_lock);
+
+	if (reclaim)
+		ceph_reclaim_caps_nr(mdsc, ctx->used);
 }
 
 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +365,7 @@
 	BUG_ON(list_empty(&mdsc->caps_list));
 
 	ctx->count--;
+	ctx->used++;
 	mdsc->caps_reserve_count--;
 	mdsc->caps_use_count++;
 
@@ -438,37 +458,6 @@
 }
 
 /*
- * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
- */
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
-{
-	struct ceph_cap *cap;
-	int mds = -1;
-	struct rb_node *p;
-
-	/* prefer mds with WR|BUFFER|EXCL caps */
-	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
-		cap = rb_entry(p, struct ceph_cap, ci_node);
-		mds = cap->mds;
-		if (cap->issued & (CEPH_CAP_FILE_WR |
-				   CEPH_CAP_FILE_BUFFER |
-				   CEPH_CAP_FILE_EXCL))
-			break;
-	}
-	return mds;
-}
-
-int ceph_get_cap_mds(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int mds;
-	spin_lock(&ci->i_ceph_lock);
-	mds = __ceph_get_cap_mds(ceph_inode(inode));
-	spin_unlock(&ci->i_ceph_lock);
-	return mds;
-}
-
-/*
  * Called under i_ceph_lock.
  */
 static void __insert_cap_node(struct ceph_inode_info *ci,
@@ -500,14 +489,11 @@
 static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
 			       struct ceph_inode_info *ci)
 {
-	struct ceph_mount_options *ma = mdsc->fsc->mount_options;
-
-	ci->i_hold_caps_min = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_min * HZ);
+	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
 	ci->i_hold_caps_max = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_max * HZ);
-	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
-	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
+					    opt->caps_wanted_delay_max * HZ);
+	dout("__cap_set_timeouts %p %lu\n", &ci->vfs_inode,
+	     ci->i_hold_caps_max - jiffies);
 }
 
 /*
@@ -521,8 +507,7 @@
 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
 				struct ceph_inode_info *ci)
 {
-	__cap_set_timeouts(mdsc, ci);
-	dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
+	dout("__cap_delay_requeue %p flags 0x%lx at %lu\n", &ci->vfs_inode,
 	     ci->i_ceph_flags, ci->i_hold_caps_max);
 	if (!mdsc->stopping) {
 		spin_lock(&mdsc->cap_delay_lock);
@@ -531,6 +516,7 @@
 				goto no_change;
 			list_del_init(&ci->i_cap_delay_list);
 		}
+		__cap_set_timeouts(mdsc, ci);
 		list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
 no_change:
 		spin_unlock(&mdsc->cap_delay_lock);
@@ -570,19 +556,20 @@
 	spin_unlock(&mdsc->cap_delay_lock);
 }
 
-/*
- * Common issue checks for add_cap, handle_cap_grant.
- */
+/* Common issue checks for add_cap, handle_cap_grant. */
 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 			      unsigned issued)
 {
 	unsigned had = __ceph_caps_issued(ci, NULL);
 
+	lockdep_assert_held(&ci->i_ceph_lock);
+
 	/*
 	 * Each time we receive FILE_CACHE anew, we increment
 	 * i_rdcache_gen.
 	 */
-	if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+	if (S_ISREG(ci->vfs_inode.i_mode) &&
+	    (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
 	    (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
 		ci->i_rdcache_gen++;
 	}
@@ -601,12 +588,40 @@
 			__ceph_dir_clear_complete(ci);
 		}
 	}
+
+	/* Wipe saved layout if we're losing DIR_CREATE caps */
+	if (S_ISDIR(ci->vfs_inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) &&
+		!(issued & CEPH_CAP_DIR_CREATE)) {
+	     ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
+	     memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
+	}
+}
+
+/**
+ * change_auth_cap_ses - move inode to appropriate lists when auth caps change
+ * @ci: inode to be moved
+ * @session: new auth caps session
+ */
+static void change_auth_cap_ses(struct ceph_inode_info *ci,
+				struct ceph_mds_session *session)
+{
+	lockdep_assert_held(&ci->i_ceph_lock);
+
+	if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
+		return;
+
+	spin_lock(&session->s_mdsc->cap_dirty_lock);
+	if (!list_empty(&ci->i_dirty_item))
+		list_move(&ci->i_dirty_item, &session->s_cap_dirty);
+	if (!list_empty(&ci->i_flushing_item))
+		list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
+	spin_unlock(&session->s_mdsc->cap_dirty_lock);
 }
 
 /*
  * Add a capability under the given MDS session.
  *
- * Caller should hold session snap_rwsem (read) and s_mutex.
+ * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
  *
  * @fmode is the open file mode, if we are opening a file, otherwise
  * it is < 0.  (This is so we can atomically add the cap and add an
@@ -614,7 +629,7 @@
  */
 void ceph_add_cap(struct inode *inode,
 		  struct ceph_mds_session *session, u64 cap_id,
-		  int fmode, unsigned issued, unsigned wanted,
+		  unsigned issued, unsigned wanted,
 		  unsigned seq, unsigned mseq, u64 realmino, int flags,
 		  struct ceph_cap **new_cap)
 {
@@ -623,16 +638,16 @@
 	struct ceph_cap *cap;
 	int mds = session->s_mds;
 	int actual_wanted;
+	u32 gen;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
 	     session->s_mds, cap_id, ceph_cap_string(issued), seq);
 
-	/*
-	 * If we are opening the file, include file mode wanted bits
-	 * in wanted.
-	 */
-	if (fmode >= 0)
-		wanted |= ceph_caps_for_mode(fmode);
+	spin_lock(&session->s_gen_ttl_lock);
+	gen = session->s_cap_gen;
+	spin_unlock(&session->s_gen_ttl_lock);
 
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
@@ -653,8 +668,16 @@
 		spin_lock(&session->s_cap_lock);
 		list_add_tail(&cap->session_caps, &session->s_caps);
 		session->s_nr_caps++;
+		atomic64_inc(&mdsc->metric.total_caps);
 		spin_unlock(&session->s_cap_lock);
 	} else {
+		spin_lock(&session->s_cap_lock);
+		list_move_tail(&cap->session_caps, &session->s_caps);
+		spin_unlock(&session->s_cap_lock);
+
+		if (cap->cap_gen < gen)
+			cap->issued = cap->implemented = CEPH_CAP_PIN;
+
 		/*
 		 * auth mds of the inode changed. we received the cap export
 		 * message, but still haven't received the cap import message.
@@ -726,6 +749,9 @@
 	if (flags & CEPH_CAP_FLAG_AUTH) {
 		if (!ci->i_auth_cap ||
 		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
+			if (ci->i_auth_cap &&
+			    ci->i_auth_cap->session != cap->session)
+				change_auth_cap_ses(ci, cap->session);
 			ci->i_auth_cap = cap;
 			cap->mds_wanted = wanted;
 		}
@@ -746,10 +772,7 @@
 	cap->seq = seq;
 	cap->issue_seq = seq;
 	cap->mseq = mseq;
-	cap->cap_gen = session->s_cap_gen;
-
-	if (fmode >= 0)
-		__ceph_get_fmode(ci, fmode);
+	cap->cap_gen = gen;
 }
 
 /*
@@ -864,8 +887,8 @@
 	int have = ci->i_snap_caps;
 
 	if ((have & mask) == mask) {
-		dout("__ceph_caps_issued_mask %p snap issued %s"
-		     " (mask %s)\n", &ci->vfs_inode,
+		dout("__ceph_caps_issued_mask ino 0x%llx snap issued %s"
+		     " (mask %s)\n", ceph_ino(&ci->vfs_inode),
 		     ceph_cap_string(have),
 		     ceph_cap_string(mask));
 		return 1;
@@ -876,8 +899,8 @@
 		if (!__cap_is_valid(cap))
 			continue;
 		if ((cap->issued & mask) == mask) {
-			dout("__ceph_caps_issued_mask %p cap %p issued %s"
-			     " (mask %s)\n", &ci->vfs_inode, cap,
+			dout("__ceph_caps_issued_mask ino 0x%llx cap %p issued %s"
+			     " (mask %s)\n", ceph_ino(&ci->vfs_inode), cap,
 			     ceph_cap_string(cap->issued),
 			     ceph_cap_string(mask));
 			if (touch)
@@ -888,8 +911,8 @@
 		/* does a combination of caps satisfy mask? */
 		have |= cap->issued;
 		if ((have & mask) == mask) {
-			dout("__ceph_caps_issued_mask %p combo issued %s"
-			     " (mask %s)\n", &ci->vfs_inode,
+			dout("__ceph_caps_issued_mask ino 0x%llx combo issued %s"
+			     " (mask %s)\n", ceph_ino(&ci->vfs_inode),
 			     ceph_cap_string(cap->issued),
 			     ceph_cap_string(mask));
 			if (touch) {
@@ -903,7 +926,8 @@
 						       ci_node);
 					if (!__cap_is_valid(cap))
 						continue;
-					__touch_cap(cap);
+					if (cap->issued & mask)
+						__touch_cap(cap);
 				}
 			}
 			return 1;
@@ -911,6 +935,20 @@
 	}
 
 	return 0;
+}
+
+int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
+				   int touch)
+{
+	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+	int r;
+
+	r = __ceph_caps_issued_mask(ci, mask, touch);
+	if (r)
+		ceph_update_cap_hit(&fsc->mdsc->metric);
+	else
+		ceph_update_cap_mis(&fsc->mdsc->metric);
+	return r;
 }
 
 /*
@@ -952,29 +990,97 @@
 	if (ci->i_rd_ref)
 		used |= CEPH_CAP_FILE_RD;
 	if (ci->i_rdcache_ref ||
-	    (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+	    (S_ISREG(ci->vfs_inode.i_mode) &&
 	     ci->vfs_inode.i_data.nrpages))
 		used |= CEPH_CAP_FILE_CACHE;
 	if (ci->i_wr_ref)
 		used |= CEPH_CAP_FILE_WR;
 	if (ci->i_wb_ref || ci->i_wrbuffer_ref)
 		used |= CEPH_CAP_FILE_BUFFER;
+	if (ci->i_fx_ref)
+		used |= CEPH_CAP_FILE_EXCL;
 	return used;
 }
+
+#define FMODE_WAIT_BIAS 1000
 
 /*
  * wanted, by virtue of open file modes
  */
 int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
 {
-	int i, bits = 0;
-	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
-		if (ci->i_nr_by_mode[i])
-			bits |= 1 << i;
+	const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN);
+	const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD);
+	const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR);
+	const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY);
+	struct ceph_mount_options *opt =
+		ceph_inode_to_client(&ci->vfs_inode)->mount_options;
+	unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ;
+	unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ;
+
+	if (S_ISDIR(ci->vfs_inode.i_mode)) {
+		int want = 0;
+
+		/* use used_cutoff here, to keep dir's wanted caps longer */
+		if (ci->i_nr_by_mode[RD_SHIFT] > 0 ||
+		    time_after(ci->i_last_rd, used_cutoff))
+			want |= CEPH_CAP_ANY_SHARED;
+
+		if (ci->i_nr_by_mode[WR_SHIFT] > 0 ||
+		    time_after(ci->i_last_wr, used_cutoff)) {
+			want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
+				want |= CEPH_CAP_ANY_DIR_OPS;
+		}
+
+		if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0)
+			want |= CEPH_CAP_PIN;
+
+		return want;
+	} else {
+		int bits = 0;
+
+		if (ci->i_nr_by_mode[RD_SHIFT] > 0) {
+			if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS ||
+			    time_after(ci->i_last_rd, used_cutoff))
+				bits |= 1 << RD_SHIFT;
+		} else if (time_after(ci->i_last_rd, idle_cutoff)) {
+			bits |= 1 << RD_SHIFT;
+		}
+
+		if (ci->i_nr_by_mode[WR_SHIFT] > 0) {
+			if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS ||
+			    time_after(ci->i_last_wr, used_cutoff))
+				bits |= 1 << WR_SHIFT;
+		} else if (time_after(ci->i_last_wr, idle_cutoff)) {
+			bits |= 1 << WR_SHIFT;
+		}
+
+		/* check lazyio only when read/write is wanted */
+		if ((bits & (CEPH_FILE_MODE_RDWR << 1)) &&
+		    ci->i_nr_by_mode[LAZY_SHIFT] > 0)
+			bits |= 1 << LAZY_SHIFT;
+
+		return bits ? ceph_caps_for_mode(bits >> 1) : 0;
 	}
-	if (bits == 0)
-		return 0;
-	return ceph_caps_for_mode(bits >> 1);
+}
+
+/*
+ * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
+ */
+int __ceph_caps_wanted(struct ceph_inode_info *ci)
+{
+	int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
+	if (S_ISDIR(ci->vfs_inode.i_mode)) {
+		/* we want EXCL if holding caps of dir ops */
+		if (w & CEPH_CAP_ANY_DIR_OPS)
+			w |= CEPH_CAP_FILE_EXCL;
+	} else {
+		/* we want EXCL if dirty data */
+		if (w & CEPH_CAP_FILE_BUFFER)
+			w |= CEPH_CAP_FILE_EXCL;
+	}
+	return w;
 }
 
 /*
@@ -998,26 +1104,13 @@
 	return mds_wanted;
 }
 
-/*
- * called under i_ceph_lock
- */
-static int __ceph_is_single_caps(struct ceph_inode_info *ci)
-{
-	return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
-}
-
-static int __ceph_is_any_caps(struct ceph_inode_info *ci)
-{
-	return !RB_EMPTY_ROOT(&ci->i_caps);
-}
-
 int ceph_is_any_caps(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int ret;
 
 	spin_lock(&ci->i_ceph_lock);
-	ret = __ceph_is_any_caps(ci);
+	ret = __ceph_is_any_real_caps(ci);
 	spin_unlock(&ci->i_ceph_lock);
 
 	return ret;
@@ -1062,8 +1155,10 @@
 
 	/* remove from inode's cap rbtree, and clear auth cap */
 	rb_erase(&cap->ci_node, &ci->i_caps);
-	if (ci->i_auth_cap == cap)
+	if (ci->i_auth_cap == cap) {
+		WARN_ON_ONCE(!list_empty(&ci->i_dirty_item));
 		ci->i_auth_cap = NULL;
+	}
 
 	/* remove from session list */
 	spin_lock(&session->s_cap_lock);
@@ -1074,6 +1169,7 @@
 	} else {
 		list_del_init(&cap->session_caps);
 		session->s_nr_caps--;
+		atomic64_dec(&mdsc->metric.total_caps);
 		cap->session = NULL;
 		removed = 1;
 	}
@@ -1088,9 +1184,7 @@
 	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
 		cap->queue_release = 1;
 		if (removed) {
-			list_add_tail(&cap->session_caps,
-				      &session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			removed = 0;
 		}
 	} else {
@@ -1103,15 +1197,16 @@
 	if (removed)
 		ceph_put_cap(mdsc, cap);
 
-	/* when reconnect denied, we remove session caps forcibly,
-	 * i_wr_ref can be non-zero. If there are ongoing write,
-	 * keep i_snap_realm.
-	 */
-	if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm)
-		drop_inode_snap_realm(ci);
+	if (!__ceph_is_any_real_caps(ci)) {
+		/* when reconnect denied, we remove session caps forcibly,
+		 * i_wr_ref can be non-zero. If there are ongoing write,
+		 * keep i_snap_realm.
+		 */
+		if (ci->i_wr_ref == 0 && ci->i_snap_realm)
+			drop_inode_snap_realm(ci);
 
-	if (!__ceph_is_any_real_caps(ci))
 		__cap_delay_cancel(mdsc, ci);
+	}
 }
 
 struct cap_msg_args {
@@ -1119,8 +1214,10 @@
 	u64			ino, cid, follows;
 	u64			flush_tid, oldest_flush_tid, size, max_size;
 	u64			xattr_version;
+	u64			change_attr;
 	struct ceph_buffer	*xattr_buf;
-	struct timespec64	atime, mtime, ctime;
+	struct ceph_buffer	*old_xattr_buf;
+	struct timespec64	atime, mtime, ctime, btime;
 	int			op, caps, wanted, dirty;
 	u32			seq, issue_seq, mseq, time_warp_seq;
 	u32			flags;
@@ -1128,39 +1225,30 @@
 	kgid_t			gid;
 	umode_t			mode;
 	bool			inline_data;
+	bool			wake;
 };
 
 /*
- * Build and send a cap message to the given MDS.
- *
- * Caller should be holding s_mutex.
+ * cap struct size + flock buffer size + inline version + inline data size +
+ * osd_epoch_barrier + oldest_flush_tid
  */
-static int send_cap_msg(struct cap_msg_args *arg)
+#define CAP_MSG_SIZE (sizeof(struct ceph_mds_caps) + \
+		      4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4)
+
+/* Marshal up the cap msg to the MDS */
+static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg)
 {
 	struct ceph_mds_caps *fc;
-	struct ceph_msg *msg;
 	void *p;
-	size_t extra_len;
-	struct timespec64 zerotime = {0};
 	struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
 
-	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
-	     " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
-	     " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
-	     arg->cid, arg->ino, ceph_cap_string(arg->caps),
-	     ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
-	     arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
-	     arg->mseq, arg->follows, arg->size, arg->max_size,
-	     arg->xattr_version,
+	dout("%s %s %llx %llx caps %s wanted %s dirty %s seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu xattr_ver %llu xattr_len %d\n",
+	     __func__, ceph_cap_op_name(arg->op), arg->cid, arg->ino,
+	     ceph_cap_string(arg->caps), ceph_cap_string(arg->wanted),
+	     ceph_cap_string(arg->dirty), arg->seq, arg->issue_seq,
+	     arg->flush_tid, arg->oldest_flush_tid, arg->mseq, arg->follows,
+	     arg->size, arg->max_size, arg->xattr_version,
 	     arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
-
-	/* flock buffer size + inline version + inline data size +
-	 * osd_epoch_barrier + oldest_flush_tid */
-	extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
-			   GFP_NOFS, false);
-	if (!msg)
-		return -ENOMEM;
 
 	msg->hdr.version = cpu_to_le16(10);
 	msg->hdr.tid = cpu_to_le64(arg->flush_tid);
@@ -1226,29 +1314,20 @@
 	/* pool namespace (version 8) (mds always ignores this) */
 	ceph_encode_32(&p, 0);
 
-	/*
-	 * btime and change_attr (version 9)
-	 *
-	 * We just zero these out for now, as the MDS ignores them unless
-	 * the requisite feature flags are set (which we don't do yet).
-	 */
-	ceph_encode_timespec64(p, &zerotime);
+	/* btime and change_attr (version 9) */
+	ceph_encode_timespec64(p, &arg->btime);
 	p += sizeof(struct ceph_timespec);
-	ceph_encode_64(&p, 0);
+	ceph_encode_64(&p, arg->change_attr);
 
 	/* Advisory flags (version 10) */
 	ceph_encode_32(&p, arg->flags);
-
-	ceph_con_send(&arg->session->s_con, msg);
-	return 0;
 }
 
 /*
  * Queue cap releases when an inode is dropped from our cache.
  */
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct ceph_inode_info *ci)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct rb_node *p;
 
 	/* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
@@ -1264,141 +1343,133 @@
 }
 
 /*
- * Send a cap msg on the given inode.  Update our caps state, then
- * drop i_ceph_lock and send the message.
+ * Prepare to send a cap message to an MDS. Update the cap state, and populate
+ * the arg struct with the parameters that will need to be sent. This should
+ * be done under the i_ceph_lock to guard against changes to cap state.
  *
  * Make note of max_size reported/requested from mds, revoked caps
  * that have now been implemented.
- *
- * Make half-hearted attempt ot to invalidate page cache if we are
- * dropping RDCACHE.  Note that this will leave behind locked pages
- * that we'll then need to deal with elsewhere.
- *
- * Return non-zero if delayed release, or we experienced an error
- * such that the caller should requeue + retry later.
- *
- * called with i_ceph_lock, then drops it.
- * caller should hold snap_rwsem (read), s_mutex.
  */
-static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-		      int op, bool sync, int used, int want, int retain,
-		      int flushing, u64 flush_tid, u64 oldest_flush_tid)
-	__releases(cap->ci->i_ceph_lock)
+static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
+		       int op, int flags, int used, int want, int retain,
+		       int flushing, u64 flush_tid, u64 oldest_flush_tid)
 {
 	struct ceph_inode_info *ci = cap->ci;
 	struct inode *inode = &ci->vfs_inode;
-	struct ceph_buffer *old_blob = NULL;
-	struct cap_msg_args arg;
 	int held, revoking;
-	int wake = 0;
-	int delayed = 0;
-	int ret;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	held = cap->issued | cap->implemented;
 	revoking = cap->implemented & ~cap->issued;
 	retain &= ~revoking;
 
-	dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
-	     inode, cap, cap->session,
+	dout("%s %p cap %p session %p %s -> %s (revoking %s)\n",
+	     __func__, inode, cap, cap->session,
 	     ceph_cap_string(held), ceph_cap_string(held & retain),
 	     ceph_cap_string(revoking));
 	BUG_ON((retain & CEPH_CAP_PIN) == 0);
 
-	arg.session = cap->session;
-
-	/* don't release wanted unless we've waited a bit. */
-	if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
-	    time_before(jiffies, ci->i_hold_caps_min)) {
-		dout(" delaying issued %s -> %s, wanted %s -> %s on send\n",
-		     ceph_cap_string(cap->issued),
-		     ceph_cap_string(cap->issued & retain),
-		     ceph_cap_string(cap->mds_wanted),
-		     ceph_cap_string(want));
-		want |= cap->mds_wanted;
-		retain |= cap->issued;
-		delayed = 1;
-	}
-	ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
-	if (want & ~cap->mds_wanted) {
-		/* user space may open/close single file frequently.
-		 * This avoids droping mds_wanted immediately after
-		 * requesting new mds_wanted.
-		 */
-		__cap_set_timeouts(mdsc, ci);
-	}
+	ci->i_ceph_flags &= ~CEPH_I_FLUSH;
 
 	cap->issued &= retain;  /* drop bits we don't want */
-	if (cap->implemented & ~cap->issued) {
-		/*
-		 * Wake up any waiters on wanted -> needed transition.
-		 * This is due to the weird transition from buffered
-		 * to sync IO... we need to flush dirty pages _before_
-		 * allowing sync writes to avoid reordering.
-		 */
-		wake = 1;
-	}
+	/*
+	 * Wake up any waiters on wanted -> needed transition. This is due to
+	 * the weird transition from buffered to sync IO... we need to flush
+	 * dirty pages _before_ allowing sync writes to avoid reordering.
+	 */
+	arg->wake = cap->implemented & ~cap->issued;
 	cap->implemented &= cap->issued | used;
 	cap->mds_wanted = want;
 
-	arg.ino = ceph_vino(inode).ino;
-	arg.cid = cap->cap_id;
-	arg.follows = flushing ? ci->i_head_snapc->seq : 0;
-	arg.flush_tid = flush_tid;
-	arg.oldest_flush_tid = oldest_flush_tid;
+	arg->session = cap->session;
+	arg->ino = ceph_vino(inode).ino;
+	arg->cid = cap->cap_id;
+	arg->follows = flushing ? ci->i_head_snapc->seq : 0;
+	arg->flush_tid = flush_tid;
+	arg->oldest_flush_tid = oldest_flush_tid;
 
-	arg.size = inode->i_size;
-	ci->i_reported_size = arg.size;
-	arg.max_size = ci->i_wanted_max_size;
-	ci->i_requested_max_size = arg.max_size;
+	arg->size = inode->i_size;
+	ci->i_reported_size = arg->size;
+	arg->max_size = ci->i_wanted_max_size;
+	if (cap == ci->i_auth_cap) {
+		if (want & CEPH_CAP_ANY_FILE_WR)
+			ci->i_requested_max_size = arg->max_size;
+		else
+			ci->i_requested_max_size = 0;
+	}
 
 	if (flushing & CEPH_CAP_XATTR_EXCL) {
-		old_blob = __ceph_build_xattrs_blob(ci);
-		arg.xattr_version = ci->i_xattrs.version;
-		arg.xattr_buf = ci->i_xattrs.blob;
+		arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
+		arg->xattr_version = ci->i_xattrs.version;
+		arg->xattr_buf = ci->i_xattrs.blob;
 	} else {
-		arg.xattr_buf = NULL;
+		arg->xattr_buf = NULL;
+		arg->old_xattr_buf = NULL;
 	}
 
-	arg.mtime = inode->i_mtime;
-	arg.atime = inode->i_atime;
-	arg.ctime = inode->i_ctime;
+	arg->mtime = inode->i_mtime;
+	arg->atime = inode->i_atime;
+	arg->ctime = inode->i_ctime;
+	arg->btime = ci->i_btime;
+	arg->change_attr = inode_peek_iversion_raw(inode);
 
-	arg.op = op;
-	arg.caps = cap->implemented;
-	arg.wanted = want;
-	arg.dirty = flushing;
+	arg->op = op;
+	arg->caps = cap->implemented;
+	arg->wanted = want;
+	arg->dirty = flushing;
 
-	arg.seq = cap->seq;
-	arg.issue_seq = cap->issue_seq;
-	arg.mseq = cap->mseq;
-	arg.time_warp_seq = ci->i_time_warp_seq;
+	arg->seq = cap->seq;
+	arg->issue_seq = cap->issue_seq;
+	arg->mseq = cap->mseq;
+	arg->time_warp_seq = ci->i_time_warp_seq;
 
-	arg.uid = inode->i_uid;
-	arg.gid = inode->i_gid;
-	arg.mode = inode->i_mode;
+	arg->uid = inode->i_uid;
+	arg->gid = inode->i_gid;
+	arg->mode = inode->i_mode;
 
-	arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
-	if (list_empty(&ci->i_cap_snaps))
-		arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
-	else
-		arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
-	if (sync)
-		arg.flags |= CEPH_CLIENT_CAPS_SYNC;
+	arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+	if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
+	    !list_empty(&ci->i_cap_snaps)) {
+		struct ceph_cap_snap *capsnap;
+		list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) {
+			if (capsnap->cap_flush.tid)
+				break;
+			if (capsnap->need_flush) {
+				flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
+				break;
+			}
+		}
+	}
+	arg->flags = flags;
+}
 
-	spin_unlock(&ci->i_ceph_lock);
+/*
+ * Send a cap msg on the given inode.
+ *
+ * Caller should hold snap_rwsem (read), s_mutex.
+ */
+static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci)
+{
+	struct ceph_msg *msg;
+	struct inode *inode = &ci->vfs_inode;
 
-	ceph_buffer_put(old_blob);
-
-	ret = send_cap_msg(&arg);
-	if (ret < 0) {
-		dout("error sending cap msg, must requeue %p\n", inode);
-		delayed = 1;
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false);
+	if (!msg) {
+		pr_err("error allocating cap msg: ino (%llx.%llx) flushing %s tid %llu, requeuing cap.\n",
+		       ceph_vinop(inode), ceph_cap_string(arg->dirty),
+		       arg->flush_tid);
+		spin_lock(&ci->i_ceph_lock);
+		__cap_delay_requeue(arg->session->s_mdsc, ci);
+		spin_unlock(&ci->i_ceph_lock);
+		return;
 	}
 
-	if (wake)
+	encode_cap_msg(msg, arg);
+	ceph_con_send(&arg->session->s_con, msg);
+	ceph_buffer_put(arg->old_xattr_buf);
+	if (arg->wake)
 		wake_up_all(&ci->i_cap_wq);
-
-	return delayed;
 }
 
 static inline int __send_flush_snap(struct inode *inode,
@@ -1407,6 +1478,11 @@
 				    u32 mseq, u64 oldest_flush_tid)
 {
 	struct cap_msg_args	arg;
+	struct ceph_msg		*msg;
+
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false);
+	if (!msg)
+		return -ENOMEM;
 
 	arg.session = session;
 	arg.ino = ceph_vino(inode).ino;
@@ -1419,10 +1495,13 @@
 	arg.max_size = 0;
 	arg.xattr_version = capsnap->xattr_version;
 	arg.xattr_buf = capsnap->xattr_blob;
+	arg.old_xattr_buf = NULL;
 
 	arg.atime = capsnap->atime;
 	arg.mtime = capsnap->mtime;
 	arg.ctime = capsnap->ctime;
+	arg.btime = capsnap->btime;
+	arg.change_attr = capsnap->change_attr;
 
 	arg.op = CEPH_CAP_OP_FLUSHSNAP;
 	arg.caps = capsnap->issued;
@@ -1440,8 +1519,11 @@
 
 	arg.inline_data = capsnap->inline_data;
 	arg.flags = 0;
+	arg.wake = false;
 
-	return send_cap_msg(&arg);
+	encode_cap_msg(msg, &arg);
+	ceph_con_send(&arg.session->s_con, msg);
+	return 0;
 }
 
 /*
@@ -1554,6 +1636,7 @@
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_mds_session *session = NULL;
+	bool need_put = false;
 	int mds;
 
 	dout("ceph_flush_snaps %p\n", inode);
@@ -1590,10 +1673,8 @@
 	}
 
 	// make sure flushsnap messages are sent in proper order.
-	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
 		__kick_flushing_caps(mdsc, session, ci, 0);
-		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
-	}
 
 	__ceph_flush_snaps(ci, session);
 out:
@@ -1607,8 +1688,13 @@
 	}
 	/* we flushed them all; remove this inode from the queue */
 	spin_lock(&mdsc->snap_flush_lock);
+	if (!list_empty(&ci->i_snap_flush_item))
+		need_put = true;
 	list_del_init(&ci->i_snap_flush_item);
 	spin_unlock(&mdsc->snap_flush_lock);
+
+	if (need_put)
+		iput(inode);
 }
 
 /*
@@ -1625,6 +1711,8 @@
 	int was = ci->i_dirty_caps;
 	int dirty = 0;
 
+	lockdep_assert_held(&ci->i_ceph_lock);
+
 	if (!ci->i_auth_cap) {
 		pr_warn("__mark_dirty_caps %p %llx mask %s, "
 			"but no auth cap (session was closed?)\n",
@@ -1637,6 +1725,8 @@
 	     ceph_cap_string(was | mask));
 	ci->i_dirty_caps |= mask;
 	if (was == 0) {
+		struct ceph_mds_session *session = ci->i_auth_cap->session;
+
 		WARN_ON_ONCE(ci->i_prealloc_cap_flush);
 		swap(ci->i_prealloc_cap_flush, *pcf);
 
@@ -1649,7 +1739,7 @@
 		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
 		BUG_ON(!list_empty(&ci->i_dirty_item));
 		spin_lock(&mdsc->cap_dirty_lock);
-		list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+		list_add(&ci->i_dirty_item, &session->s_cap_dirty);
 		spin_unlock(&mdsc->cap_dirty_lock);
 		if (ci->i_flushing_caps == 0) {
 			ihold(inode);
@@ -1668,7 +1758,14 @@
 
 struct ceph_cap_flush *ceph_alloc_cap_flush(void)
 {
-	return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+	struct ceph_cap_flush *cf;
+
+	cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+	if (!cf)
+		return NULL;
+
+	cf->is_capsnap = false;
+	return cf;
 }
 
 void ceph_free_cap_flush(struct ceph_cap_flush *cf)
@@ -1692,30 +1789,33 @@
  * Remove cap_flush from the mdsc's or inode's flushing cap list.
  * Return true if caller needs to wake up flush waiters.
  */
-static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
-			       struct ceph_inode_info *ci,
-			       struct ceph_cap_flush *cf)
+static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
+					 struct ceph_cap_flush *cf)
 {
 	struct ceph_cap_flush *prev;
 	bool wake = cf->wake;
-	if (mdsc) {
-		/* are there older pending cap flushes? */
-		if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
-			prev = list_prev_entry(cf, g_list);
-			prev->wake = true;
-			wake = false;
-		}
-		list_del(&cf->g_list);
-	} else if (ci) {
-		if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
-			prev = list_prev_entry(cf, i_list);
-			prev->wake = true;
-			wake = false;
-		}
-		list_del(&cf->i_list);
-	} else {
-		BUG_ON(1);
+
+	if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
+		prev = list_prev_entry(cf, g_list);
+		prev->wake = true;
+		wake = false;
 	}
+	list_del_init(&cf->g_list);
+	return wake;
+}
+
+static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
+				       struct ceph_cap_flush *cf)
+{
+	struct ceph_cap_flush *prev;
+	bool wake = cf->wake;
+
+	if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
+		prev = list_prev_entry(cf, i_list);
+		prev->wake = true;
+		wake = false;
+	}
+	list_del_init(&cf->i_list);
 	return wake;
 }
 
@@ -1723,17 +1823,18 @@
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
  *
- * Called under i_ceph_lock.
+ * Called under i_ceph_lock. Returns the flush tid.
  */
-static int __mark_caps_flushing(struct inode *inode,
+static u64 __mark_caps_flushing(struct inode *inode,
 				struct ceph_mds_session *session, bool wake,
-				u64 *flush_tid, u64 *oldest_flush_tid)
+				u64 *oldest_flush_tid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap_flush *cf = NULL;
 	int flushing;
 
+	lockdep_assert_held(&ci->i_ceph_lock);
 	BUG_ON(ci->i_dirty_caps == 0);
 	BUG_ON(list_empty(&ci->i_dirty_item));
 	BUG_ON(!ci->i_prealloc_cap_flush);
@@ -1766,8 +1867,7 @@
 
 	list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
 
-	*flush_tid = cf->tid;
-	return flushing;
+	return cf->tid;
 }
 
 /*
@@ -1817,8 +1917,6 @@
  * versus held caps.  Release, flush, ack revoked caps to mds as
  * appropriate.
  *
- *  CHECK_CAPS_NODELAY - caller is delayed work and we should not delay
- *    cap release further.
  *  CHECK_CAPS_AUTHONLY - we should only check the auth cap
  *  CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
  *    further delay.
@@ -1826,9 +1924,8 @@
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 		     struct ceph_mds_session *session)
 {
-	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
-	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct inode *inode = &ci->vfs_inode;
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 	struct ceph_cap *cap;
 	u64 flush_tid, oldest_flush_tid;
 	int file_wanted, used, cap_used;
@@ -1837,48 +1934,53 @@
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
 	struct rb_node *p;
-	int delayed = 0, sent = 0;
-	bool no_delay = flags & CHECK_CAPS_NODELAY;
 	bool queue_invalidate = false;
 	bool tried_invalidate = false;
 
-	/* if we are unmounting, flush any unused caps immediately. */
-	if (mdsc->stopping)
-		no_delay = true;
-
 	spin_lock(&ci->i_ceph_lock);
-
 	if (ci->i_ceph_flags & CEPH_I_FLUSH)
 		flags |= CHECK_CAPS_FLUSH;
-
-	if (!(flags & CHECK_CAPS_AUTHONLY) ||
-	    (ci->i_auth_cap && __ceph_is_single_caps(ci)))
-		__cap_delay_cancel(mdsc, ci);
 
 	goto retry_locked;
 retry:
 	spin_lock(&ci->i_ceph_lock);
 retry_locked:
+	/* Caps wanted by virtue of active open files. */
 	file_wanted = __ceph_caps_file_wanted(ci);
+
+	/* Caps which have active references against them */
 	used = __ceph_caps_used(ci);
+
+	/*
+	 * "issued" represents the current caps that the MDS wants us to have.
+	 * "implemented" is the set that we have been granted, and includes the
+	 * ones that have not yet been returned to the MDS (the "revoking" set,
+	 * usually because they have outstanding references).
+	 */
 	issued = __ceph_caps_issued(ci, &implemented);
 	revoking = implemented & ~issued;
 
 	want = file_wanted;
+
+	/* The ones we currently want to retain (may be adjusted below) */
 	retain = file_wanted | used | CEPH_CAP_PIN;
 	if (!mdsc->stopping && inode->i_nlink > 0) {
 		if (file_wanted) {
 			retain |= CEPH_CAP_ANY;       /* be greedy */
 		} else if (S_ISDIR(inode->i_mode) &&
 			   (issued & CEPH_CAP_FILE_SHARED) &&
-			    __ceph_dir_is_complete(ci)) {
+			   __ceph_dir_is_complete(ci)) {
 			/*
 			 * If a directory is complete, we want to keep
 			 * the exclusive cap. So that MDS does not end up
 			 * revoking the shared cap on every create/unlink
 			 * operation.
 			 */
-			want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			if (IS_RDONLY(inode)) {
+				want = CEPH_CAP_ANY_SHARED;
+			} else {
+				want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			}
 			retain |= want;
 		} else {
 
@@ -1894,14 +1996,13 @@
 	}
 
 	dout("check_caps %p file_want %s used %s dirty %s flushing %s"
-	     " issued %s revoking %s retain %s %s%s%s\n", inode,
+	     " issued %s revoking %s retain %s %s%s\n", inode,
 	     ceph_cap_string(file_wanted),
 	     ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
 	     ceph_cap_string(ci->i_flushing_caps),
 	     ceph_cap_string(issued), ceph_cap_string(revoking),
 	     ceph_cap_string(retain),
 	     (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
-	     (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
 	     (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
 
 	/*
@@ -1909,8 +2010,8 @@
 	 * have cached pages, but don't want them, then try to invalidate.
 	 * If we fail, it's because pages are locked.... try again later.
 	 */
-	if ((!no_delay || mdsc->stopping) &&
-	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */
+	if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
+	    S_ISREG(inode->i_mode) &&
 	    !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
 	    inode->i_data.nrpages &&		/* have cached pages */
 	    (revoking & (CEPH_CAP_FILE_CACHE|
@@ -1927,6 +2028,9 @@
 	}
 
 	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+		int mflags = 0;
+		struct cap_msg_args arg;
+
 		cap = rb_entry(p, struct ceph_cap, ci_node);
 
 		/* avoid looping forever */
@@ -1936,6 +2040,10 @@
 
 		/* NOTE: no side-effects allowed, until we take s_mutex */
 
+		/*
+		 * If we have an auth cap, we don't need to consider any
+		 * overlapping caps as used.
+		 */
 		cap_used = used;
 		if (ci->i_auth_cap && cap != ci->i_auth_cap)
 			cap_used &= ~ci->i_auth_cap->issued;
@@ -1990,31 +2098,10 @@
 		}
 
 		/* things we might delay */
-		if ((cap->issued & ~retain) == 0 &&
-		    cap->mds_wanted == want)
+		if ((cap->issued & ~retain) == 0)
 			continue;     /* nope, all good */
 
-		if (no_delay)
-			goto ack;
-
-		/* delay? */
-		if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
-		    time_before(jiffies, ci->i_hold_caps_max)) {
-			dout(" delaying issued %s -> %s, wanted %s -> %s\n",
-			     ceph_cap_string(cap->issued),
-			     ceph_cap_string(cap->issued & retain),
-			     ceph_cap_string(cap->mds_wanted),
-			     ceph_cap_string(want));
-			delayed++;
-			continue;
-		}
-
 ack:
-		if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
-			dout(" skipping %p I_NOFLUSH set\n", inode);
-			continue;
-		}
-
 		if (session && session != cap->session) {
 			dout("oops, wrong session %p mutex\n", session);
 			mutex_unlock(&session->s_mutex);
@@ -2052,10 +2139,8 @@
 		if (cap == ci->i_auth_cap &&
 		    (ci->i_ceph_flags &
 		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
-			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
 				__kick_flushing_caps(mdsc, session, ci, 0);
-				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
-			}
 			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
 				__ceph_flush_snaps(ci, session);
 
@@ -2076,9 +2161,12 @@
 		}
 
 		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
-			flushing = __mark_caps_flushing(inode, session, false,
-							&flush_tid,
-							&oldest_flush_tid);
+			flushing = ci->i_dirty_caps;
+			flush_tid = __mark_caps_flushing(inode, session, false,
+							 &oldest_flush_tid);
+			if (flags & CHECK_CAPS_FLUSH &&
+			    list_empty(&session->s_cap_dirty))
+				mflags |= CEPH_CLIENT_CAPS_SYNC;
 		} else {
 			flushing = 0;
 			flush_tid = 0;
@@ -2088,18 +2176,23 @@
 		}
 
 		mds = cap->mds;  /* remember mds, so we don't repeat */
-		sent++;
 
-		/* __send_cap drops i_ceph_lock */
-		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
-				cap_used, want, retain, flushing,
-				flush_tid, oldest_flush_tid);
+		__prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
+			   want, retain, flushing, flush_tid, oldest_flush_tid);
+		spin_unlock(&ci->i_ceph_lock);
+
+		__send_cap(&arg, ci);
+
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
-	/* Reschedule delayed caps release if we delayed anything */
-	if (delayed)
+	/* periodically re-calculate caps wanted by open files */
+	if (__ceph_is_any_real_caps(ci) &&
+	    list_empty(&ci->i_cap_delay_list) &&
+	    (file_wanted & ~CEPH_CAP_PIN) &&
+	    !(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
 		__cap_delay_requeue(mdsc, ci);
+	}
 
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -2125,18 +2218,12 @@
 
 retry:
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
-		spin_unlock(&ci->i_ceph_lock);
-		dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
-		goto out;
-	}
+retry_locked:
 	if (ci->i_dirty_caps && ci->i_auth_cap) {
 		struct ceph_cap *cap = ci->i_auth_cap;
-		int used = __ceph_caps_used(ci);
-		int want = __ceph_caps_wanted(ci);
-		int delayed;
+		struct cap_msg_args arg;
 
-		if (!session || session != cap->session) {
+		if (session != cap->session) {
 			spin_unlock(&ci->i_ceph_lock);
 			if (session)
 				mutex_unlock(&session->s_mutex);
@@ -2149,19 +2236,26 @@
 			goto out;
 		}
 
-		flushing = __mark_caps_flushing(inode, session, true,
-						&flush_tid, &oldest_flush_tid);
-
-		/* __send_cap drops i_ceph_lock */
-		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
-				used, want, (cap->issued | cap->implemented),
-				flushing, flush_tid, oldest_flush_tid);
-
-		if (delayed) {
-			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
-			spin_unlock(&ci->i_ceph_lock);
+		if (ci->i_ceph_flags &
+		    (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) {
+			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
+				__kick_flushing_caps(mdsc, session, ci, 0);
+			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
+				__ceph_flush_snaps(ci, session);
+			goto retry_locked;
 		}
+
+		flushing = ci->i_dirty_caps;
+		flush_tid = __mark_caps_flushing(inode, session, true,
+						 &oldest_flush_tid);
+
+		__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
+			   __ceph_caps_used(ci), __ceph_caps_wanted(ci),
+			   (cap->issued | cap->implemented),
+			   flushing, flush_tid, oldest_flush_tid);
+		spin_unlock(&ci->i_ceph_lock);
+
+		__send_cap(&arg, ci);
 	} else {
 		if (!list_empty(&ci->i_cap_flush_list)) {
 			struct ceph_cap_flush *cf =
@@ -2206,6 +2300,7 @@
  */
 static int unsafe_request_wait(struct inode *inode)
 {
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_request *req1 = NULL, *req2 = NULL;
 	int ret, err = 0;
@@ -2225,6 +2320,76 @@
 	}
 	spin_unlock(&ci->i_unsafe_lock);
 
+	/*
+	 * Trigger to flush the journal logs in all the relevant MDSes
+	 * manually, or in the worst case we must wait at most 5 seconds
+	 * to wait the journal logs to be flushed by the MDSes periodically.
+	 */
+	if (req1 || req2) {
+		struct ceph_mds_request *req;
+		struct ceph_mds_session **sessions;
+		struct ceph_mds_session *s;
+		unsigned int max_sessions;
+		int i;
+
+		mutex_lock(&mdsc->mutex);
+		max_sessions = mdsc->max_sessions;
+
+		sessions = kcalloc(max_sessions, sizeof(s), GFP_KERNEL);
+		if (!sessions) {
+			mutex_unlock(&mdsc->mutex);
+			err = -ENOMEM;
+			goto out;
+		}
+
+		spin_lock(&ci->i_unsafe_lock);
+		if (req1) {
+			list_for_each_entry(req, &ci->i_unsafe_dirops,
+					    r_unsafe_dir_item) {
+				s = req->r_session;
+				if (!s)
+					continue;
+				if (!sessions[s->s_mds]) {
+					s = ceph_get_mds_session(s);
+					sessions[s->s_mds] = s;
+				}
+			}
+		}
+		if (req2) {
+			list_for_each_entry(req, &ci->i_unsafe_iops,
+					    r_unsafe_target_item) {
+				s = req->r_session;
+				if (!s)
+					continue;
+				if (!sessions[s->s_mds]) {
+					s = ceph_get_mds_session(s);
+					sessions[s->s_mds] = s;
+				}
+			}
+		}
+		spin_unlock(&ci->i_unsafe_lock);
+
+		/* the auth MDS */
+		spin_lock(&ci->i_ceph_lock);
+		if (ci->i_auth_cap) {
+			s = ci->i_auth_cap->session;
+			if (!sessions[s->s_mds])
+				sessions[s->s_mds] = ceph_get_mds_session(s);
+		}
+		spin_unlock(&ci->i_ceph_lock);
+		mutex_unlock(&mdsc->mutex);
+
+		/* send flush mdlog request to MDSes */
+		for (i = 0; i < max_sessions; i++) {
+			s = sessions[i];
+			if (s) {
+				send_flush_mdlog(s);
+				ceph_put_mds_session(s);
+			}
+		}
+		kfree(sessions);
+	}
+
 	dout("unsafe_request_wait %p wait on tid %llu %llu\n",
 	     inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
 	if (req1) {
@@ -2232,15 +2397,19 @@
 					ceph_timeout_jiffies(req1->r_timeout));
 		if (ret)
 			err = -EIO;
-		ceph_mdsc_put_request(req1);
 	}
 	if (req2) {
 		ret = !wait_for_completion_timeout(&req2->r_safe_completion,
 					ceph_timeout_jiffies(req2->r_timeout));
 		if (ret)
 			err = -EIO;
-		ceph_mdsc_put_request(req2);
 	}
+
+out:
+	if (req1)
+		ceph_mdsc_put_request(req1);
+	if (req2)
+		ceph_mdsc_put_request(req2);
 	return err;
 }
 
@@ -2249,35 +2418,40 @@
 	struct inode *inode = file->f_mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 flush_tid;
-	int ret;
+	int ret, err;
 	int dirty;
 
 	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
 
 	ret = file_write_and_wait_range(file, start, end);
-	if (ret < 0)
-		goto out;
-
 	if (datasync)
 		goto out;
 
-	inode_lock(inode);
+	ret = ceph_wait_on_async_create(inode);
+	if (ret)
+		goto out;
 
 	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
-	ret = unsafe_request_wait(inode);
+	err = unsafe_request_wait(inode);
 
 	/*
 	 * only wait on non-file metadata writeback (the mds
 	 * can recover size and mtime, so we don't need to
 	 * wait for that)
 	 */
-	if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
-		ret = wait_event_interruptible(ci->i_cap_wq,
+	if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
+		err = wait_event_interruptible(ci->i_cap_wq,
 					caps_are_flushed(inode, flush_tid));
 	}
-	inode_unlock(inode);
+
+	if (err < 0)
+		ret = err;
+
+	err = file_check_and_advance_wb_err(file);
+	if (err < 0)
+		ret = err;
 out:
 	dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
 	return ret;
@@ -2327,6 +2501,16 @@
 	struct ceph_cap_flush *cf;
 	int ret;
 	u64 first_tid = 0;
+	u64 last_snap_flush = 0;
+
+	ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+
+	list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
+		if (cf->is_capsnap) {
+			last_snap_flush = cf->tid;
+			break;
+		}
+	}
 
 	list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
 		if (cf->tid < first_tid)
@@ -2341,22 +2525,20 @@
 
 		first_tid = cf->tid + 1;
 
-		if (cf->caps) {
+		if (!cf->is_capsnap) {
+			struct cap_msg_args arg;
+
 			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
 			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
-			ci->i_ceph_flags |= CEPH_I_NODELAY;
-			ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-					  false, __ceph_caps_used(ci),
+			__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
+					 (cf->tid < last_snap_flush ?
+					  CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
+					  __ceph_caps_used(ci),
 					  __ceph_caps_wanted(ci),
-					  cap->issued | cap->implemented,
+					  (cap->issued | cap->implemented),
 					  cf->caps, cf->tid, oldest_flush_tid);
-			if (ret) {
-				pr_err("kick_flushing_caps: error sending "
-					"cap flush, ino (%llx.%llx) "
-					"tid %llu flushing %s\n",
-					ceph_vinop(inode), cf->tid,
-					ceph_cap_string(cf->caps));
-			}
+			spin_unlock(&ci->i_ceph_lock);
+			__send_cap(&arg, ci);
 		} else {
 			struct ceph_cap_snap *capsnap =
 					container_of(cf, struct ceph_cap_snap,
@@ -2417,7 +2599,12 @@
 		 */
 		if ((cap->issued & ci->i_flushing_caps) !=
 		    ci->i_flushing_caps) {
-			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+			/* encode_caps_cb() also will reset these sequence
+			 * numbers. make sure sequence numbers in cap flush
+			 * message match later reconnect message */
+			cap->seq = 0;
+			cap->issue_seq = 0;
+			cap->mseq = 0;
 			__kick_flushing_caps(mdsc, session, ci,
 					     oldest_flush_tid);
 		} else {
@@ -2435,6 +2622,8 @@
 	struct ceph_cap *cap;
 	u64 oldest_flush_tid;
 
+	lockdep_assert_held(&session->s_mutex);
+
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
 
 	spin_lock(&mdsc->cap_dirty_lock);
@@ -2451,7 +2640,6 @@
 			continue;
 		}
 		if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
-			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 			__kick_flushing_caps(mdsc, session, ci,
 					     oldest_flush_tid);
 		}
@@ -2459,16 +2647,15 @@
 	}
 }
 
-static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
-				     struct ceph_mds_session *session,
-				     struct inode *inode)
-	__releases(ci->i_ceph_lock)
+void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
+				   struct ceph_inode_info *ci)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_cap *cap;
+	struct ceph_mds_client *mdsc = session->s_mdsc;
+	struct ceph_cap *cap = ci->i_auth_cap;
 
-	cap = ci->i_auth_cap;
-	dout("kick_flushing_inode_caps %p flushing %s\n", inode,
+	lockdep_assert_held(&ci->i_ceph_lock);
+
+	dout("%s %p flushing %s\n", __func__, &ci->vfs_inode,
 	     ceph_cap_string(ci->i_flushing_caps));
 
 	if (!list_empty(&ci->i_cap_flush_list)) {
@@ -2479,11 +2666,7 @@
 		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
 		spin_unlock(&mdsc->cap_dirty_lock);
 
-		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
-		spin_unlock(&ci->i_ceph_lock);
-	} else {
-		spin_unlock(&ci->i_ceph_lock);
 	}
 }
 
@@ -2491,18 +2674,20 @@
 /*
  * Take references to capabilities we hold, so that we don't release
  * them to the MDS prematurely.
- *
- * Protected by i_ceph_lock.
  */
-static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+void ceph_take_cap_refs(struct ceph_inode_info *ci, int got,
 			    bool snap_rwsem_locked)
 {
+	lockdep_assert_held(&ci->i_ceph_lock);
+
 	if (got & CEPH_CAP_PIN)
 		ci->i_pin_ref++;
 	if (got & CEPH_CAP_FILE_RD)
 		ci->i_rd_ref++;
 	if (got & CEPH_CAP_FILE_CACHE)
 		ci->i_rdcache_ref++;
+	if (got & CEPH_CAP_FILE_EXCL)
+		ci->i_fx_ref++;
 	if (got & CEPH_CAP_FILE_WR) {
 		if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
 			BUG_ON(!snap_rwsem_locked);
@@ -2515,7 +2700,7 @@
 		if (ci->i_wb_ref == 0)
 			ihold(&ci->vfs_inode);
 		ci->i_wb_ref++;
-		dout("__take_cap_refs %p wb %d -> %d (?)\n",
+		dout("%s %p wb %d -> %d (?)\n", __func__,
 		     &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref);
 	}
 }
@@ -2526,15 +2711,26 @@
  * to (when applicable), and check against max_size here as well.
  * Note that caller is responsible for ensuring max_size increases are
  * requested from the MDS.
+ *
+ * Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
+ * or a negative error code. There are 3 speical error codes:
+ *  -EAGAIN: need to sleep but non-blocking is specified
+ *  -EFBIG:  ask caller to call check_max_size() and try again.
+ *  -ESTALE: ask caller to call ceph_renew_caps() and try again.
  */
-static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-			    loff_t endoff, bool nonblock, int *got, int *err)
+enum {
+	/* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
+	NON_BLOCKING	= (1 << 8),
+	CHECK_FILELOCK	= (1 << 9),
+};
+
+static int try_get_cap_refs(struct inode *inode, int need, int want,
+			    loff_t endoff, int flags, int *got)
 {
-	struct inode *inode = &ci->vfs_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	int ret = 0;
 	int have, implemented;
-	int file_wanted;
 	bool snap_rwsem_locked = false;
 
 	dout("get_cap_refs %p need %s want %s\n", inode,
@@ -2543,13 +2739,10 @@
 again:
 	spin_lock(&ci->i_ceph_lock);
 
-	/* make sure file is actually open */
-	file_wanted = __ceph_caps_file_wanted(ci);
-	if ((file_wanted & need) != need) {
-		dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
-		     ceph_cap_string(need), ceph_cap_string(file_wanted));
-		*err = -EBADF;
-		ret = 1;
+	if ((flags & CHECK_FILELOCK) &&
+	    (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) {
+		dout("try_get_cap_refs %p error filelock\n", inode);
+		ret = -EIO;
 		goto out_unlock;
 	}
 
@@ -2570,10 +2763,8 @@
 		if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
 			     inode, endoff, ci->i_max_size);
-			if (endoff > ci->i_requested_max_size) {
-				*err = -EAGAIN;
-				ret = 1;
-			}
+			if (endoff > ci->i_requested_max_size)
+				ret = ci->i_auth_cap ? -EFBIG : -ESTALE;
 			goto out_unlock;
 		}
 		/*
@@ -2607,9 +2798,8 @@
 					 * we can not call down_read() when
 					 * task isn't in TASK_RUNNING state
 					 */
-					if (nonblock) {
-						*err = -EAGAIN;
-						ret = 1;
+					if (flags & NON_BLOCKING) {
+						ret = -EAGAIN;
 						goto out_unlock;
 					}
 
@@ -2620,57 +2810,63 @@
 				}
 				snap_rwsem_locked = true;
 			}
-			*got = need | (have & want);
-			if ((need & CEPH_CAP_FILE_RD) &&
+			if ((have & want) == want)
+				*got = need | want;
+			else
+				*got = need;
+			if (S_ISREG(inode->i_mode) &&
+			    (need & CEPH_CAP_FILE_RD) &&
 			    !(*got & CEPH_CAP_FILE_CACHE))
 				ceph_disable_fscache_readpage(ci);
-			__take_cap_refs(ci, *got, true);
+			ceph_take_cap_refs(ci, *got, true);
 			ret = 1;
 		}
 	} else {
 		int session_readonly = false;
-		if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
+		int mds_wanted;
+		if (ci->i_auth_cap &&
+		    (need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) {
 			struct ceph_mds_session *s = ci->i_auth_cap->session;
 			spin_lock(&s->s_cap_lock);
 			session_readonly = s->s_readonly;
 			spin_unlock(&s->s_cap_lock);
 		}
 		if (session_readonly) {
-			dout("get_cap_refs %p needed %s but mds%d readonly\n",
+			dout("get_cap_refs %p need %s but mds%d readonly\n",
 			     inode, ceph_cap_string(need), ci->i_auth_cap->mds);
-			*err = -EROFS;
-			ret = 1;
+			ret = -EROFS;
 			goto out_unlock;
 		}
 
-		if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
-			int mds_wanted;
-			if (READ_ONCE(mdsc->fsc->mount_state) ==
-			    CEPH_MOUNT_SHUTDOWN) {
-				dout("get_cap_refs %p forced umount\n", inode);
-				*err = -EIO;
-				ret = 1;
-				goto out_unlock;
-			}
-			mds_wanted = __ceph_caps_mds_wanted(ci, false);
-			if (need & ~(mds_wanted & need)) {
-				dout("get_cap_refs %p caps were dropped"
-				     " (session killed?)\n", inode);
-				*err = -ESTALE;
-				ret = 1;
-				goto out_unlock;
-			}
-			if (!(file_wanted & ~mds_wanted))
-				ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
+		if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+			dout("get_cap_refs %p forced umount\n", inode);
+			ret = -EIO;
+			goto out_unlock;
+		}
+		mds_wanted = __ceph_caps_mds_wanted(ci, false);
+		if (need & ~mds_wanted) {
+			dout("get_cap_refs %p need %s > mds_wanted %s\n",
+			     inode, ceph_cap_string(need),
+			     ceph_cap_string(mds_wanted));
+			ret = -ESTALE;
+			goto out_unlock;
 		}
 
-		dout("get_cap_refs %p have %s needed %s\n", inode,
+		dout("get_cap_refs %p have %s need %s\n", inode,
 		     ceph_cap_string(have), ceph_cap_string(need));
 	}
 out_unlock:
+
+	__ceph_touch_fmode(ci, mdsc, flags);
+
 	spin_unlock(&ci->i_ceph_lock);
 	if (snap_rwsem_locked)
 		up_read(&mdsc->snap_rwsem);
+
+	if (!ret)
+		ceph_update_cap_mis(&mdsc->metric);
+	else if (ret == 1)
+		ceph_update_cap_hit(&mdsc->metric);
 
 	dout("get_cap_refs %p ret %d got %s\n", inode,
 	     ret, ceph_cap_string(*got));
@@ -2705,24 +2901,39 @@
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
-int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+static inline int get_used_fmode(int caps)
 {
-	int ret, err = 0;
+	int fmode = 0;
+	if (caps & CEPH_CAP_FILE_RD)
+		fmode |= CEPH_FILE_MODE_RD;
+	if (caps & CEPH_CAP_FILE_WR)
+		fmode |= CEPH_FILE_MODE_WR;
+	return fmode;
+}
+
+int ceph_try_get_caps(struct inode *inode, int need, int want,
+		      bool nonblock, int *got)
+{
+	int ret, flags;
 
 	BUG_ON(need & ~CEPH_CAP_FILE_RD);
-	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
-	ret = ceph_pool_perm_check(ci, need);
-	if (ret < 0)
-		return ret;
-
-	ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
-	if (ret) {
-		if (err == -EAGAIN) {
-			ret = 0;
-		} else if (err < 0) {
-			ret = err;
-		}
+	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
+			CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+			CEPH_CAP_ANY_DIR_OPS));
+	if (need) {
+		ret = ceph_pool_perm_check(inode, need);
+		if (ret < 0)
+			return ret;
 	}
+
+	flags = get_used_fmode(need | want);
+	if (nonblock)
+		flags |= NON_BLOCKING;
+
+	ret = try_get_cap_refs(inode, need, want, 0, flags, got);
+	/* three special error codes */
+	if (ret == -EAGAIN || ret == -EFBIG || ret == -ESTALE)
+		ret = 0;
 	return ret;
 }
 
@@ -2731,34 +2942,54 @@
  * due to a small max_size, make sure we check_max_size (and possibly
  * ask the mds) so we don't get hung up indefinitely.
  */
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+int ceph_get_caps(struct file *filp, int need, int want,
 		  loff_t endoff, int *got, struct page **pinned_page)
 {
-	int _got, ret, err = 0;
+	struct ceph_file_info *fi = filp->private_data;
+	struct inode *inode = file_inode(filp);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	int ret, _got, flags;
 
-	ret = ceph_pool_perm_check(ci, need);
+	ret = ceph_pool_perm_check(inode, need);
 	if (ret < 0)
 		return ret;
 
-	while (true) {
-		if (endoff > 0)
-			check_max_size(&ci->vfs_inode, endoff);
+	if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+	    fi->filp_gen != READ_ONCE(fsc->filp_gen))
+		return -EBADF;
 
-		err = 0;
+	flags = get_used_fmode(need | want);
+
+	while (true) {
+		flags &= CEPH_FILE_MODE_MASK;
+		if (vfs_inode_has_locks(inode))
+			flags |= CHECK_FILELOCK;
 		_got = 0;
-		ret = try_get_cap_refs(ci, need, want, endoff,
-				       false, &_got, &err);
-		if (ret) {
-			if (err == -EAGAIN)
-				continue;
-			if (err < 0)
-				ret = err;
-		} else {
+		ret = try_get_cap_refs(inode, need, want, endoff,
+				       flags, &_got);
+		WARN_ON_ONCE(ret == -EAGAIN);
+		if (!ret) {
+			struct ceph_mds_client *mdsc = fsc->mdsc;
+			struct cap_wait cw;
 			DEFINE_WAIT_FUNC(wait, woken_wake_function);
+
+			cw.ino = ceph_ino(inode);
+			cw.tgid = current->tgid;
+			cw.need = need;
+			cw.want = want;
+
+			spin_lock(&mdsc->caps_list_lock);
+			list_add(&cw.list, &mdsc->cap_wait_list);
+			spin_unlock(&mdsc->caps_list_lock);
+
+			/* make sure used fmode not timeout */
+			ceph_get_fmode(ci, flags, FMODE_WAIT_BIAS);
 			add_wait_queue(&ci->i_cap_wq, &wait);
 
-			while (!try_get_cap_refs(ci, need, want, endoff,
-						 true, &_got, &err)) {
+			flags |= NON_BLOCKING;
+			while (!(ret = try_get_cap_refs(inode, need, want,
+							endoff, flags, &_got))) {
 				if (signal_pending(current)) {
 					ret = -ERESTARTSYS;
 					break;
@@ -2767,27 +2998,48 @@
 			}
 
 			remove_wait_queue(&ci->i_cap_wq, &wait);
+			ceph_put_fmode(ci, flags, FMODE_WAIT_BIAS);
 
-			if (err == -EAGAIN)
+			spin_lock(&mdsc->caps_list_lock);
+			list_del(&cw.list);
+			spin_unlock(&mdsc->caps_list_lock);
+
+			if (ret == -EAGAIN)
 				continue;
-			if (err < 0)
-				ret = err;
 		}
+
+		if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+		    fi->filp_gen != READ_ONCE(fsc->filp_gen)) {
+			if (ret >= 0 && _got)
+				ceph_put_cap_refs(ci, _got);
+			return -EBADF;
+		}
+
 		if (ret < 0) {
-			if (err == -ESTALE) {
+			if (ret == -EFBIG || ret == -ESTALE) {
+				int ret2 = ceph_wait_on_async_create(inode);
+				if (ret2 < 0)
+					return ret2;
+			}
+			if (ret == -EFBIG) {
+				check_max_size(inode, endoff);
+				continue;
+			}
+			if (ret == -ESTALE) {
 				/* session was killed, try renew caps */
-				ret = ceph_renew_caps(&ci->vfs_inode);
+				ret = ceph_renew_caps(inode, flags);
 				if (ret == 0)
 					continue;
 			}
 			return ret;
 		}
 
-		if (ci->i_inline_version != CEPH_INLINE_NONE &&
+		if (S_ISREG(ci->vfs_inode.i_mode) &&
+		    ci->i_inline_version != CEPH_INLINE_NONE &&
 		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-		    i_size_read(&ci->vfs_inode) > 0) {
+		    i_size_read(inode) > 0) {
 			struct page *page =
-				find_get_page(ci->vfs_inode.i_mapping, 0);
+				find_get_page(inode->i_mapping, 0);
 			if (page) {
 				if (PageUptodate(page)) {
 					*pinned_page = page;
@@ -2806,7 +3058,7 @@
 			 * getattr request will bring inline data into
 			 * page cache
 			 */
-			ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+			ret = __ceph_do_getattr(inode, NULL,
 						CEPH_STAT_CAP_INLINE_DATA,
 						true);
 			if (ret < 0)
@@ -2816,7 +3068,8 @@
 		break;
 	}
 
-	if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
+	if (S_ISREG(ci->vfs_inode.i_mode) &&
+	    (_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
 		ceph_fscache_revalidate_cookie(ci);
 
 	*got = _got;
@@ -2830,7 +3083,7 @@
 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
 {
 	spin_lock(&ci->i_ceph_lock);
-	__take_cap_refs(ci, caps, false);
+	ceph_take_cap_refs(ci, caps, false);
 	spin_unlock(&ci->i_ceph_lock);
 }
 
@@ -2867,7 +3120,8 @@
  * If we are releasing a WR cap (from a sync write), finalize any affected
  * cap_snap, and wake up any waiters.
  */
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
+				bool skip_checking_caps)
 {
 	struct inode *inode = &ci->vfs_inode;
 	int last = 0, put = 0, flushsnaps = 0, wake = 0;
@@ -2880,6 +3134,9 @@
 			last++;
 	if (had & CEPH_CAP_FILE_CACHE)
 		if (--ci->i_rdcache_ref == 0)
+			last++;
+	if (had & CEPH_CAP_FILE_EXCL)
+		if (--ci->i_fx_ref == 0)
 			last++;
 	if (had & CEPH_CAP_FILE_BUFFER) {
 		if (--ci->i_wb_ref == 0) {
@@ -2912,7 +3169,7 @@
 				ci->i_head_snapc = NULL;
 			}
 			/* see comment in __ceph_remove_cap() */
-			if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
+			if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
 				drop_inode_snap_realm(ci);
 		}
 	spin_unlock(&ci->i_ceph_lock);
@@ -2920,14 +3177,26 @@
 	dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
 	     last ? " last" : "", put ? " put" : "");
 
-	if (last && !flushsnaps)
-		ceph_check_caps(ci, 0, NULL);
-	else if (flushsnaps)
-		ceph_flush_snaps(ci, NULL);
+	if (!skip_checking_caps) {
+		if (last)
+			ceph_check_caps(ci, 0, NULL);
+		else if (flushsnaps)
+			ceph_flush_snaps(ci, NULL);
+	}
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 	while (put-- > 0)
 		iput(inode);
+}
+
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+	__ceph_put_cap_refs(ci, had, false);
+}
+
+void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had)
+{
+	__ceph_put_cap_refs(ci, had, true);
 }
 
 /*
@@ -2977,7 +3246,16 @@
 				break;
 			}
 		}
-		BUG_ON(!found);
+
+		if (!found) {
+			/*
+			 * The capsnap should already be removed when removing
+			 * auth cap in the case of a forced unmount.
+			 */
+			WARN_ON_ONCE(ci->i_auth_cap);
+			goto unlock;
+		}
+
 		capsnap->dirty_pages -= nr;
 		if (capsnap->dirty_pages == 0) {
 			complete_capsnap = true;
@@ -2999,17 +3277,20 @@
 		     complete_capsnap ? " (complete capsnap)" : "");
 	}
 
+unlock:
 	spin_unlock(&ci->i_ceph_lock);
 
 	if (last) {
-		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+		ceph_check_caps(ci, 0, NULL);
 	} else if (flush_snaps) {
 		ceph_flush_snaps(ci, NULL);
 	}
 	if (complete_capsnap)
 		wake_up_all(&ci->i_cap_wq);
-	while (put-- > 0)
-		iput(inode);
+	while (put-- > 0) {
+		/* avoid calling iput_final() in osd dispatch threads */
+		ceph_async_iput(inode);
+	}
 }
 
 /*
@@ -3054,8 +3335,10 @@
 	bool dirstat_valid;
 	u64 nfiles;
 	u64 nsubdirs;
+	u64 change_attr;
 	/* currently issued */
 	int issued;
+	struct timespec64 btime;
 };
 
 /*
@@ -3079,7 +3362,8 @@
 	int used, wanted, dirty;
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
-	int check_caps = 0;
+	unsigned char check_caps = 0;
+	bool was_stale = cap->cap_gen < session->s_cap_gen;
 	bool wake = false;
 	bool writeback = false;
 	bool queue_trunc = false;
@@ -3092,6 +3376,28 @@
 	dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
 		inode->i_size);
 
+
+	/*
+	 * If CACHE is being revoked, and we have no dirty buffers,
+	 * try to invalidate (once).  (If there are dirty buffers, we
+	 * will invalidate _after_ writeback.)
+	 */
+	if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */
+	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
+	    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
+		if (try_nonblocking_invalidate(inode)) {
+			/* there were locked pages.. invalidate later
+			   in a separate thread. */
+			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
+				queue_invalidate = true;
+				ci->i_rdcache_revoking = ci->i_rdcache_gen;
+			}
+		}
+	}
+
+	if (was_stale)
+		cap->issued = cap->implemented = CEPH_CAP_PIN;
 
 	/*
 	 * auth mds of the inode changed. we received the cap export message,
@@ -3108,36 +3414,20 @@
 		newcaps |= cap->issued;
 	}
 
-	/*
-	 * If CACHE is being revoked, and we have no dirty buffers,
-	 * try to invalidate (once).  (If there are dirty buffers, we
-	 * will invalidate _after_ writeback.)
-	 */
-	if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
-	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
-	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
-	    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
-		if (try_nonblocking_invalidate(inode)) {
-			/* there were locked pages.. invalidate later
-			   in a separate thread. */
-			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-				queue_invalidate = true;
-				ci->i_rdcache_revoking = ci->i_rdcache_gen;
-			}
-		}
-	}
-
 	/* side effects now are allowed */
 	cap->cap_gen = session->s_cap_gen;
 	cap->seq = seq;
 
 	__check_cap_issue(ci, cap, newcaps);
 
+	inode_set_max_iversion_raw(inode, extra_info->change_attr);
+
 	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
 	    (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
 		inode->i_mode = le32_to_cpu(grant->mode);
 		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
 		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
+		ci->i_btime = extra_info->btime;
 		dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
 		     from_kuid(&init_user_ns, inode->i_uid),
 		     from_kgid(&init_user_ns, inode->i_gid));
@@ -3164,6 +3454,7 @@
 			ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
 			ci->i_xattrs.version = version;
 			ceph_forget_all_cached_acls(inode);
+			ceph_security_invalidate_secctx(inode);
 		}
 	}
 
@@ -3216,10 +3507,6 @@
 				ci->i_requested_max_size = 0;
 			}
 			wake = true;
-		} else if (ci->i_wanted_max_size > ci->i_max_size &&
-			   ci->i_wanted_max_size > ci->i_requested_max_size) {
-			/* CEPH_CAP_OP_IMPORT */
-			wake = true;
 		}
 	}
 
@@ -3231,13 +3518,20 @@
 	     ceph_cap_string(wanted),
 	     ceph_cap_string(used),
 	     ceph_cap_string(dirty));
-	if (wanted != le32_to_cpu(grant->wanted)) {
-		dout("mds wanted %s -> %s\n",
-		     ceph_cap_string(le32_to_cpu(grant->wanted)),
-		     ceph_cap_string(wanted));
-		/* imported cap may not have correct mds_wanted */
-		if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
-			check_caps = 1;
+
+	if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
+	    (wanted & ~(cap->mds_wanted | newcaps))) {
+		/*
+		 * If mds is importing cap, prior cap messages that update
+		 * 'wanted' may get dropped by mds (migrate seq mismatch).
+		 *
+		 * We don't send cap message to update 'wanted' if what we
+		 * want are already issued. If mds revokes caps, cap message
+		 * that releases caps also tells mds what we want. But if
+		 * caps got revoked by mds forcedly (session stale). We may
+		 * haven't told mds what we want.
+		 */
+		check_caps = 1;
 	}
 
 	/* revocation, grant, or no-op? */
@@ -3248,11 +3542,12 @@
 		     ceph_cap_string(cap->issued),
 		     ceph_cap_string(newcaps),
 		     ceph_cap_string(revoking));
-		if (revoking & used & CEPH_CAP_FILE_BUFFER)
+		if (S_ISREG(inode->i_mode) &&
+		    (revoking & used & CEPH_CAP_FILE_BUFFER))
 			writeback = true;  /* initiate writeback; will delay ack */
-		else if (revoking == CEPH_CAP_FILE_CACHE &&
-			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
-			 queue_invalidate)
+		else if (queue_invalidate &&
+			 revoking == CEPH_CAP_FILE_CACHE &&
+			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0)
 			; /* do nothing yet, invalidation will be queued */
 		else if (cap == ci->i_auth_cap)
 			check_caps = 1; /* check auth cap only */
@@ -3279,6 +3574,15 @@
 	}
 	BUG_ON(cap->issued & ~cap->implemented);
 
+	/* don't let check_caps skip sending a response to MDS for revoke msgs */
+	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_REVOKE) {
+		cap->mds_wanted = 0;
+		if (cap == ci->i_auth_cap)
+			check_caps = 1; /* check auth cap only */
+		else
+			check_caps = 2; /* check all caps */
+	}
+
 	if (extra_info->inline_version > 0 &&
 	    extra_info->inline_version >= ci->i_inline_version) {
 		ci->i_inline_version = extra_info->inline_version;
@@ -3288,13 +3592,22 @@
 	}
 
 	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
-		if (newcaps & ~extra_info->issued)
-			wake = true;
-		kick_flushing_inode_caps(session->s_mdsc, session, inode);
+		if (ci->i_auth_cap == cap) {
+			if (newcaps & ~extra_info->issued)
+				wake = true;
+
+			if (ci->i_requested_max_size > max_size ||
+			    !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
+				/* re-request max_size if necessary */
+				ci->i_requested_max_size = 0;
+				wake = true;
+			}
+
+			ceph_kick_flushing_inode_caps(session, ci);
+		}
 		up_read(&session->s_mdsc->snap_rwsem);
-	} else {
-		spin_unlock(&ci->i_ceph_lock);
 	}
+	spin_unlock(&ci->i_ceph_lock);
 
 	if (fill_inline)
 		ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
@@ -3318,10 +3631,10 @@
 		wake_up_all(&ci->i_cap_wq);
 
 	if (check_caps == 1)
-		ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
 				session);
 	else if (check_caps == 2)
-		ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+		ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
 	else
 		mutex_unlock(&session->s_mutex);
 }
@@ -3348,15 +3661,26 @@
 	bool wake_mdsc = false;
 
 	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
+		/* Is this the one that was flushed? */
 		if (cf->tid == flush_tid)
 			cleaned = cf->caps;
-		if (cf->caps == 0) /* capsnap */
+
+		/* Is this a capsnap? */
+		if (cf->is_capsnap)
 			continue;
+
 		if (cf->tid <= flush_tid) {
-			if (__finish_cap_flush(NULL, ci, cf))
-				wake_ci = true;
+			/*
+			 * An earlier or current tid. The FLUSH_ACK should
+			 * represent a superset of this flush's caps.
+			 */
+			wake_ci |= __detach_cap_flush_from_ci(ci, cf);
 			list_add_tail(&cf->i_list, &to_remove);
 		} else {
+			/*
+			 * This is a later one. Any caps in it are still dirty
+			 * so don't count them as cleaned.
+			 */
 			cleaned &= ~cf->caps;
 			if (!cleaned)
 				break;
@@ -3376,10 +3700,8 @@
 
 	spin_lock(&mdsc->cap_dirty_lock);
 
-	list_for_each_entry(cf, &to_remove, i_list) {
-		if (__finish_cap_flush(mdsc, NULL, cf))
-			wake_mdsc = true;
-	}
+	list_for_each_entry(cf, &to_remove, i_list)
+		wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf);
 
 	if (ci->i_flushing_caps == 0) {
 		if (list_empty(&ci->i_cap_flush_list)) {
@@ -3417,8 +3739,9 @@
 	while (!list_empty(&to_remove)) {
 		cf = list_first_entry(&to_remove,
 				      struct ceph_cap_flush, i_list);
-		list_del(&cf->i_list);
-		ceph_free_cap_flush(cf);
+		list_del_init(&cf->i_list);
+		if (!cf->is_capsnap)
+			ceph_free_cap_flush(cf);
 	}
 
 	if (wake_ci)
@@ -3427,6 +3750,43 @@
 		wake_up_all(&mdsc->cap_flushing_wq);
 	if (drop)
 		iput(inode);
+}
+
+void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
+			   bool *wake_ci, bool *wake_mdsc)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+	bool ret;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
+
+	dout("removing capsnap %p, inode %p ci %p\n", capsnap, inode, ci);
+
+	list_del_init(&capsnap->ci_item);
+	ret = __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
+	if (wake_ci)
+		*wake_ci = ret;
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	if (list_empty(&ci->i_cap_flush_list))
+		list_del_init(&ci->i_flushing_item);
+
+	ret = __detach_cap_flush_from_mdsc(mdsc, &capsnap->cap_flush);
+	if (wake_mdsc)
+		*wake_mdsc = ret;
+	spin_unlock(&mdsc->cap_dirty_lock);
+}
+
+void ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
+			 bool *wake_ci, bool *wake_mdsc)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	lockdep_assert_held(&ci->i_ceph_lock);
+
+	WARN_ON_ONCE(capsnap->dirty_pages || capsnap->writing);
+	__ceph_remove_capsnap(inode, capsnap, wake_ci, wake_mdsc);
 }
 
 /*
@@ -3466,25 +3826,10 @@
 			     capsnap, capsnap->follows);
 		}
 	}
-	if (flushed) {
-		WARN_ON(capsnap->dirty_pages || capsnap->writing);
-		dout(" removing %p cap_snap %p follows %lld\n",
-		     inode, capsnap, follows);
-		list_del(&capsnap->ci_item);
-		if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
-			wake_ci = true;
-
-		spin_lock(&mdsc->cap_dirty_lock);
-
-		if (list_empty(&ci->i_cap_flush_list))
-			list_del_init(&ci->i_flushing_item);
-
-		if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
-			wake_mdsc = true;
-
-		spin_unlock(&mdsc->cap_dirty_lock);
-	}
+	if (flushed)
+		ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc);
 	spin_unlock(&ci->i_ceph_lock);
+
 	if (flushed) {
 		ceph_put_snap_context(capsnap->context);
 		ceph_put_cap_snap(capsnap);
@@ -3501,10 +3846,9 @@
  *
  * caller hold s_mutex.
  */
-static void handle_cap_trunc(struct inode *inode,
+static bool handle_cap_trunc(struct inode *inode,
 			     struct ceph_mds_caps *trunc,
 			     struct ceph_mds_session *session)
-	__releases(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int mds = session->s_mds;
@@ -3515,7 +3859,9 @@
 	int implemented = 0;
 	int dirty = __ceph_caps_dirty(ci);
 	int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
-	int queue_trunc = 0;
+	bool queue_trunc = false;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	issued |= implemented | dirty;
 
@@ -3523,10 +3869,7 @@
 	     inode, mds, seq, truncate_size, truncate_seq);
 	queue_trunc = ceph_fill_file_size(inode, issued,
 					  truncate_seq, truncate_size, size);
-	spin_unlock(&ci->i_ceph_lock);
-
-	if (queue_trunc)
-		ceph_queue_vmtruncate(inode);
+	return queue_trunc;
 }
 
 /*
@@ -3571,8 +3914,6 @@
 
 	if (target < 0) {
 		__ceph_remove_cap(cap, false);
-		if (!ci->i_auth_cap)
-			ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
 		goto out_unlock;
 	}
 
@@ -3602,15 +3943,9 @@
 			tcap->issue_seq = t_seq - 1;
 			tcap->issued |= issued;
 			tcap->implemented |= issued;
-			if (cap == ci->i_auth_cap)
+			if (cap == ci->i_auth_cap) {
 				ci->i_auth_cap = tcap;
-
-			if (!list_empty(&ci->i_cap_flush_list) &&
-			    ci->i_auth_cap == tcap) {
-				spin_lock(&mdsc->cap_dirty_lock);
-				list_move_tail(&ci->i_flushing_item,
-					       &tcap->session->s_cap_flushing);
-				spin_unlock(&mdsc->cap_dirty_lock);
+				change_auth_cap_ses(ci, tcap->session);
 			}
 		}
 		__ceph_remove_cap(cap, false);
@@ -3619,7 +3954,7 @@
 		/* add placeholder for the export tagert */
 		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
 		tcap = new_cap;
-		ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
+		ceph_add_cap(inode, tsession, t_cap_id, issued, 0,
 			     t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
 
 		if (!list_empty(&ci->i_cap_flush_list) &&
@@ -3679,7 +4014,6 @@
 			      struct ceph_mds_cap_peer *ph,
 			      struct ceph_mds_session *session,
 			      struct ceph_cap **target_cap, int *old_issued)
-	__acquires(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap *cap, *ocap, *new_cap = NULL;
@@ -3704,14 +4038,13 @@
 
 	dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
 	     inode, ci, mds, mseq, peer);
-
 retry:
-	spin_lock(&ci->i_ceph_lock);
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
 		if (!new_cap) {
 			spin_unlock(&ci->i_ceph_lock);
 			new_cap = ceph_get_cap(mdsc, NULL);
+			spin_lock(&ci->i_ceph_lock);
 			goto retry;
 		}
 		cap = new_cap;
@@ -3725,7 +4058,7 @@
 	__ceph_caps_issued(ci, &issued);
 	issued |= __ceph_caps_dirty(ci);
 
-	ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
+	ceph_add_cap(inode, session, cap_id, caps, wanted, seq, mseq,
 		     realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
 
 	ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
@@ -3745,9 +4078,6 @@
 		}
 		__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
 	}
-
-	/* make sure we re-request max_size, if necessary */
-	ci->i_requested_max_size = 0;
 
 	*old_issued = issued;
 	*target_cap = cap;
@@ -3777,6 +4107,7 @@
 	size_t snaptrace_len;
 	void *p, *end;
 	struct cap_extra_info extra_info = {};
+	bool queue_trunc;
 
 	dout("handle_caps from mds%d\n", session->s_mds);
 
@@ -3852,17 +4183,19 @@
 		}
 	}
 
-	if (msg_version >= 11) {
+	if (msg_version >= 9) {
 		struct ceph_timespec *btime;
-		u64 change_attr;
-		u32 flags;
 
-		/* version >= 9 */
 		if (p + sizeof(*btime) > end)
 			goto bad;
 		btime = p;
+		ceph_decode_timespec64(&extra_info.btime, btime);
 		p += sizeof(*btime);
-		ceph_decode_64_safe(&p, end, change_attr, bad);
+		ceph_decode_64_safe(&p, end, extra_info.change_attr, bad);
+	}
+
+	if (msg_version >= 11) {
+		u32 flags;
 		/* version >= 10 */
 		ceph_decode_32_safe(&p, end, flags, bad);
 		/* version >= 11 */
@@ -3878,7 +4211,7 @@
 	     vino.snap, inode);
 
 	mutex_lock(&session->s_mutex);
-	session->s_seq++;
+	inc_session_sequence(session);
 	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
 	     (unsigned)seq);
 
@@ -3894,9 +4227,7 @@
 			cap->seq = seq;
 			cap->issue_seq = seq;
 			spin_lock(&session->s_cap_lock);
-			list_add_tail(&cap->session_caps,
-					&session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			spin_unlock(&session->s_cap_lock);
 		}
 		goto flush_cap_releases;
@@ -3924,6 +4255,7 @@
 		} else {
 			down_read(&mdsc->snap_rwsem);
 		}
+		spin_lock(&ci->i_ceph_lock);
 		handle_cap_import(mdsc, inode, h, peer, session,
 				  &cap, &extra_info.issued);
 		handle_cap_grant(inode, session, cap,
@@ -3960,7 +4292,10 @@
 		break;
 
 	case CEPH_CAP_OP_TRUNC:
-		handle_cap_trunc(inode, h, session);
+		queue_trunc = handle_cap_trunc(inode, h, session);
+		spin_unlock(&ci->i_ceph_lock);
+		if (queue_trunc)
+			ceph_queue_vmtruncate(inode);
 		break;
 
 	default:
@@ -3969,7 +4304,13 @@
 		       ceph_cap_op_name(op));
 	}
 
-	goto done;
+done:
+	mutex_unlock(&session->s_mutex);
+done_unlocked:
+	ceph_put_string(extra_info.pool_ns);
+	/* avoid calling iput_final() in mds dispatch threads */
+	ceph_async_iput(inode);
+	return;
 
 flush_cap_releases:
 	/*
@@ -3977,14 +4318,8 @@
 	 * along for the mds (who clearly thinks we still have this
 	 * cap).
 	 */
-	ceph_send_cap_releases(mdsc, session);
-
-done:
-	mutex_unlock(&session->s_mutex);
-done_unlocked:
-	iput(inode);
-	ceph_put_string(extra_info.pool_ns);
-	return;
+	ceph_flush_cap_releases(mdsc, session);
+	goto done;
 
 bad:
 	pr_err("ceph_handle_caps: corrupt message\n");
@@ -3994,56 +4329,70 @@
 
 /*
  * Delayed work handler to process end of delayed cap release LRU list.
+ *
+ * If new caps are added to the list while processing it, these won't get
+ * processed in this run.  In this case, the ci->i_hold_caps_max will be
+ * returned so that the work can be scheduled accordingly.
  */
-void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 {
 	struct inode *inode;
 	struct ceph_inode_info *ci;
-	int flags = CHECK_CAPS_NODELAY;
+	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
+	unsigned long delay_max = opt->caps_wanted_delay_max * HZ;
+	unsigned long loop_start = jiffies;
+	unsigned long delay = 0;
 
 	dout("check_delayed_caps\n");
-	while (1) {
-		spin_lock(&mdsc->cap_delay_lock);
-		if (list_empty(&mdsc->cap_delay_list))
-			break;
+	spin_lock(&mdsc->cap_delay_lock);
+	while (!list_empty(&mdsc->cap_delay_list)) {
 		ci = list_first_entry(&mdsc->cap_delay_list,
 				      struct ceph_inode_info,
 				      i_cap_delay_list);
+		if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
+			dout("%s caps added recently.  Exiting loop", __func__);
+			delay = ci->i_hold_caps_max;
+			break;
+		}
 		if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
 		    time_before(jiffies, ci->i_hold_caps_max))
 			break;
 		list_del_init(&ci->i_cap_delay_list);
 
 		inode = igrab(&ci->vfs_inode);
-		spin_unlock(&mdsc->cap_delay_lock);
-
 		if (inode) {
+			spin_unlock(&mdsc->cap_delay_lock);
 			dout("check_delayed_caps on %p\n", inode);
-			ceph_check_caps(ci, flags, NULL);
-			iput(inode);
+			ceph_check_caps(ci, 0, NULL);
+			/* avoid calling iput_final() in tick thread */
+			ceph_async_iput(inode);
+			spin_lock(&mdsc->cap_delay_lock);
 		}
 	}
 	spin_unlock(&mdsc->cap_delay_lock);
+
+	return delay;
 }
 
 /*
  * Flush all dirty caps to the mds
  */
-void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
+static void flush_dirty_session_caps(struct ceph_mds_session *s)
 {
+	struct ceph_mds_client *mdsc = s->s_mdsc;
 	struct ceph_inode_info *ci;
 	struct inode *inode;
 
 	dout("flush_dirty_caps\n");
 	spin_lock(&mdsc->cap_dirty_lock);
-	while (!list_empty(&mdsc->cap_dirty)) {
-		ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
+	while (!list_empty(&s->s_cap_dirty)) {
+		ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info,
 				      i_dirty_item);
 		inode = &ci->vfs_inode;
 		ihold(inode);
 		dout("flush_dirty_caps %p\n", inode);
 		spin_unlock(&mdsc->cap_dirty_lock);
-		ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
+		ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL);
 		iput(inode);
 		spin_lock(&mdsc->cap_dirty_lock);
 	}
@@ -4051,14 +4400,53 @@
 	dout("flush_dirty_caps done\n");
 }
 
-void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
+void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 {
-	int i;
+	ceph_mdsc_iterate_sessions(mdsc, flush_dirty_session_caps, true);
+}
+
+void __ceph_touch_fmode(struct ceph_inode_info *ci,
+			struct ceph_mds_client *mdsc, int fmode)
+{
+	unsigned long now = jiffies;
+	if (fmode & CEPH_FILE_MODE_RD)
+		ci->i_last_rd = now;
+	if (fmode & CEPH_FILE_MODE_WR)
+		ci->i_last_wr = now;
+	/* queue periodic check */
+	if (fmode &&
+	    __ceph_is_any_real_caps(ci) &&
+	    list_empty(&ci->i_cap_delay_list))
+		__cap_delay_requeue(mdsc, ci);
+}
+
+void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
+{
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->vfs_inode.i_sb);
 	int bits = (fmode << 1) | 1;
+	bool already_opened = false;
+	int i;
+
+	if (count == 1)
+		atomic64_inc(&mdsc->metric.opened_files);
+
+	spin_lock(&ci->i_ceph_lock);
 	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+		/*
+		 * If any of the mode ref is larger than 0,
+		 * that means it has been already opened by
+		 * others. Just skip checking the PIN ref.
+		 */
+		if (i && ci->i_nr_by_mode[i])
+			already_opened = true;
+
 		if (bits & (1 << i))
-			ci->i_nr_by_mode[i]++;
+			ci->i_nr_by_mode[i] += count;
 	}
+
+	if (!already_opened)
+		percpu_counter_inc(&mdsc->metric.opened_inodes);
+	spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
@@ -4066,30 +4454,39 @@
  * we may need to release capabilities to the MDS (or schedule
  * their delayed release).
  */
-void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
+void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
 {
-	int i, last = 0;
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->vfs_inode.i_sb);
 	int bits = (fmode << 1) | 1;
+	bool is_closed = true;
+	int i;
+
+	if (count == 1)
+		atomic64_dec(&mdsc->metric.opened_files);
+
 	spin_lock(&ci->i_ceph_lock);
 	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
 		if (bits & (1 << i)) {
-			BUG_ON(ci->i_nr_by_mode[i] == 0);
-			if (--ci->i_nr_by_mode[i] == 0)
-				last++;
+			BUG_ON(ci->i_nr_by_mode[i] < count);
+			ci->i_nr_by_mode[i] -= count;
 		}
-	}
-	dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
-	     &ci->vfs_inode, fmode,
-	     ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
-	     ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
-	spin_unlock(&ci->i_ceph_lock);
 
-	if (last && ci->i_vino.snap == CEPH_NOSNAP)
-		ceph_check_caps(ci, 0, NULL);
+		/*
+		 * If any of the mode ref is not 0 after
+		 * decreased, that means it is still opened
+		 * by others. Just skip checking the PIN ref.
+		 */
+		if (i && ci->i_nr_by_mode[i])
+			is_closed = false;
+	}
+
+	if (is_closed)
+		percpu_counter_dec(&mdsc->metric.opened_inodes);
+	spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
- * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it
+ * For a soon-to-be unlinked file, drop the LINK caps. If it
  * looks like the link count will hit 0, drop any other caps (other
  * than PIN) we don't specifically want (due to the file still being
  * open).
@@ -4103,7 +4500,6 @@
 	if (inode->i_nlink == 1) {
 		drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
 
-		ci->i_ceph_flags |= CEPH_I_NODELAY;
 		if (__ceph_caps_dirty(ci)) {
 			struct ceph_mds_client *mdsc =
 				ceph_inode_to_client(inode)->mdsc;
@@ -4159,8 +4555,6 @@
 		if (force || (cap->issued & drop)) {
 			if (cap->issued & drop) {
 				int wanted = __ceph_caps_wanted(ci);
-				if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
-					wanted |= cap->mds_wanted;
 				dout("encode_inode_release %p cap %p "
 				     "%s -> %s, wanted %s -> %s\n", inode, cap,
 				     ceph_cap_string(cap->issued),
@@ -4171,6 +4565,9 @@
 				cap->issued &= ~drop;
 				cap->implemented &= ~drop;
 				cap->mds_wanted = wanted;
+				if (cap == ci->i_auth_cap &&
+				    !(wanted & CEPH_CAP_ANY_FILE_WR))
+					ci->i_requested_max_size = 0;
 			} else {
 				dout("encode_inode_release %p cap %p %s"
 				     " (force)\n", inode, cap,

--
Gitblit v1.6.2