From 2f7c68cb55ecb7331f2381deb497c27155f32faf Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Wed, 03 Jan 2024 09:43:39 +0000
Subject: [PATCH] update kernel to 5.10.198

---
 kernel/fs/ceph/mds_client.c | 2100 +++++++++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 1,522 insertions(+), 578 deletions(-)

diff --git a/kernel/fs/ceph/mds_client.c b/kernel/fs/ceph/mds_client.c
index 5f3707a..df1ecb8 100644
--- a/kernel/fs/ceph/mds_client.c
+++ b/kernel/fs/ceph/mds_client.c
@@ -9,6 +9,8 @@
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
 #include <linux/ratelimit.h>
+#include <linux/bits.h>
+#include <linux/ktime.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -19,6 +21,8 @@
 #include <linux/ceph/pagelist.h>
 #include <linux/ceph/auth.h>
 #include <linux/ceph/debugfs.h>
+
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
 
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
@@ -46,13 +50,17 @@
  */
 
 struct ceph_reconnect_state {
-	int nr_caps;
+	struct ceph_mds_session *session;
+	int nr_caps, nr_realms;
 	struct ceph_pagelist *pagelist;
 	unsigned msg_version;
+	bool allow_multi;
 };
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
+static void ceph_cap_reclaim_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -61,6 +69,29 @@
  * mds reply parsing
  */
 
+static int parse_reply_info_quota(void **p, void *end,
+				  struct ceph_mds_reply_info_in *info)
+{
+	u8 struct_v, struct_compat;
+	u32 struct_len;
+
+	ceph_decode_8_safe(p, end, struct_v, bad);
+	ceph_decode_8_safe(p, end, struct_compat, bad);
+	/* struct_v is expected to be >= 1. we only
+	 * understand encoding with struct_compat == 1. */
+	if (!struct_v || struct_compat != 1)
+		goto bad;
+	ceph_decode_32_safe(p, end, struct_len, bad);
+	ceph_decode_need(p, end, struct_len, bad);
+	end = *p + struct_len;
+	ceph_decode_64_safe(p, end, info->max_bytes, bad);
+	ceph_decode_64_safe(p, end, info->max_files, bad);
+	*p = end;
+	return 0;
+bad:
+	return -EIO;
+}
+
 /*
  * parse individual inode info
  */
@@ -68,8 +99,24 @@
 			       struct ceph_mds_reply_info_in *info,
 			       u64 features)
 {
-	int err = -EIO;
+	int err = 0;
+	u8 struct_v = 0;
 
+	if (features == (u64)-1) {
+		u32 struct_len;
+		u8 struct_compat;
+		ceph_decode_8_safe(p, end, struct_v, bad);
+		ceph_decode_8_safe(p, end, struct_compat, bad);
+		/* struct_v is expected to be >= 1. we only understand
+		 * encoding with struct_compat == 1. */
+		if (!struct_v || struct_compat != 1)
+			goto bad;
+		ceph_decode_32_safe(p, end, struct_len, bad);
+		ceph_decode_need(p, end, struct_len, bad);
+		end = *p + struct_len;
+	}
+
+	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
 	info->in = *p;
 	*p += sizeof(struct ceph_mds_reply_inode) +
 		sizeof(*info->in->fragtree.splits) *
@@ -80,60 +127,158 @@
 	info->symlink = *p;
 	*p += info->symlink_len;
 
-	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
-		ceph_decode_copy_safe(p, end, &info->dir_layout,
-				      sizeof(info->dir_layout), bad);
-	else
-		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
-
+	ceph_decode_copy_safe(p, end, &info->dir_layout,
+			      sizeof(info->dir_layout), bad);
 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
 	ceph_decode_need(p, end, info->xattr_len, bad);
 	info->xattr_data = *p;
 	*p += info->xattr_len;
 
-	if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+	if (features == (u64)-1) {
+		/* inline data */
 		ceph_decode_64_safe(p, end, info->inline_version, bad);
 		ceph_decode_32_safe(p, end, info->inline_len, bad);
 		ceph_decode_need(p, end, info->inline_len, bad);
 		info->inline_data = *p;
 		*p += info->inline_len;
-	} else
-		info->inline_version = CEPH_INLINE_NONE;
-
-	if (features & CEPH_FEATURE_MDS_QUOTA) {
-		u8 struct_v, struct_compat;
-		u32 struct_len;
-
-		/*
-		 * both struct_v and struct_compat are expected to be >= 1
-		 */
-		ceph_decode_8_safe(p, end, struct_v, bad);
-		ceph_decode_8_safe(p, end, struct_compat, bad);
-		if (!struct_v || !struct_compat)
-			goto bad;
-		ceph_decode_32_safe(p, end, struct_len, bad);
-		ceph_decode_need(p, end, struct_len, bad);
-		ceph_decode_64_safe(p, end, info->max_bytes, bad);
-		ceph_decode_64_safe(p, end, info->max_files, bad);
-	} else {
-		info->max_bytes = 0;
-		info->max_files = 0;
-	}
-
-	info->pool_ns_len = 0;
-	info->pool_ns_data = NULL;
-	if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+		/* quota */
+		err = parse_reply_info_quota(p, end, info);
+		if (err < 0)
+			goto out_bad;
+		/* pool namespace */
 		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
 		if (info->pool_ns_len > 0) {
 			ceph_decode_need(p, end, info->pool_ns_len, bad);
 			info->pool_ns_data = *p;
 			*p += info->pool_ns_len;
 		}
-	}
 
+		/* btime */
+		ceph_decode_need(p, end, sizeof(info->btime), bad);
+		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+
+		/* change attribute */
+		ceph_decode_64_safe(p, end, info->change_attr, bad);
+
+		/* dir pin */
+		if (struct_v >= 2) {
+			ceph_decode_32_safe(p, end, info->dir_pin, bad);
+		} else {
+			info->dir_pin = -ENODATA;
+		}
+
+		/* snapshot birth time, remains zero for v<=2 */
+		if (struct_v >= 3) {
+			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
+			ceph_decode_copy(p, &info->snap_btime,
+					 sizeof(info->snap_btime));
+		} else {
+			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
+		}
+
+		*p = end;
+	} else {
+		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+			ceph_decode_64_safe(p, end, info->inline_version, bad);
+			ceph_decode_32_safe(p, end, info->inline_len, bad);
+			ceph_decode_need(p, end, info->inline_len, bad);
+			info->inline_data = *p;
+			*p += info->inline_len;
+		} else
+			info->inline_version = CEPH_INLINE_NONE;
+
+		if (features & CEPH_FEATURE_MDS_QUOTA) {
+			err = parse_reply_info_quota(p, end, info);
+			if (err < 0)
+				goto out_bad;
+		} else {
+			info->max_bytes = 0;
+			info->max_files = 0;
+		}
+
+		info->pool_ns_len = 0;
+		info->pool_ns_data = NULL;
+		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
+			if (info->pool_ns_len > 0) {
+				ceph_decode_need(p, end, info->pool_ns_len, bad);
+				info->pool_ns_data = *p;
+				*p += info->pool_ns_len;
+			}
+		}
+
+		if (features & CEPH_FEATURE_FS_BTIME) {
+			ceph_decode_need(p, end, sizeof(info->btime), bad);
+			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
+			ceph_decode_64_safe(p, end, info->change_attr, bad);
+		}
+
+		info->dir_pin = -ENODATA;
+		/* info->snap_btime remains zero */
+	}
 	return 0;
 bad:
+	err = -EIO;
+out_bad:
 	return err;
+}
+
+static int parse_reply_info_dir(void **p, void *end,
+				struct ceph_mds_reply_dirfrag **dirfrag,
+				u64 features)
+{
+	if (features == (u64)-1) {
+		u8 struct_v, struct_compat;
+		u32 struct_len;
+		ceph_decode_8_safe(p, end, struct_v, bad);
+		ceph_decode_8_safe(p, end, struct_compat, bad);
+		/* struct_v is expected to be >= 1. we only understand
+		 * encoding whose struct_compat == 1. */
+		if (!struct_v || struct_compat != 1)
+			goto bad;
+		ceph_decode_32_safe(p, end, struct_len, bad);
+		ceph_decode_need(p, end, struct_len, bad);
+		end = *p + struct_len;
+	}
+
+	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
+	*dirfrag = *p;
+	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
+	if (unlikely(*p > end))
+		goto bad;
+	if (features == (u64)-1)
+		*p = end;
+	return 0;
+bad:
+	return -EIO;
+}
+
+static int parse_reply_info_lease(void **p, void *end,
+				  struct ceph_mds_reply_lease **lease,
+				  u64 features)
+{
+	if (features == (u64)-1) {
+		u8 struct_v, struct_compat;
+		u32 struct_len;
+		ceph_decode_8_safe(p, end, struct_v, bad);
+		ceph_decode_8_safe(p, end, struct_compat, bad);
+		/* struct_v is expected to be >= 1. we only understand
+		 * encoding whose struct_compat == 1. */
+		if (!struct_v || struct_compat != 1)
+			goto bad;
+		ceph_decode_32_safe(p, end, struct_len, bad);
+		ceph_decode_need(p, end, struct_len, bad);
+		end = *p + struct_len;
+	}
+
+	ceph_decode_need(p, end, sizeof(**lease), bad);
+	*lease = *p;
+	*p += sizeof(**lease);
+	if (features == (u64)-1)
+		*p = end;
+	return 0;
+bad:
+	return -EIO;
 }
 
 /*
@@ -151,20 +296,18 @@
 		if (err < 0)
 			goto out_bad;
 
-		if (unlikely(*p + sizeof(*info->dirfrag) > end))
-			goto bad;
-		info->dirfrag = *p;
-		*p += sizeof(*info->dirfrag) +
-			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
-		if (unlikely(*p > end))
-			goto bad;
+		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
+		if (err < 0)
+			goto out_bad;
 
 		ceph_decode_32_safe(p, end, info->dname_len, bad);
 		ceph_decode_need(p, end, info->dname_len, bad);
 		info->dname = *p;
 		*p += info->dname_len;
-		info->dlease = *p;
-		*p += sizeof(*info->dlease);
+
+		err = parse_reply_info_lease(p, end, &info->dlease, features);
+		if (err < 0)
+			goto out_bad;
 	}
 
 	if (info->head->is_target) {
@@ -187,20 +330,16 @@
 /*
  * parse readdir results
  */
-static int parse_reply_info_dir(void **p, void *end,
+static int parse_reply_info_readdir(void **p, void *end,
 				struct ceph_mds_reply_info_parsed *info,
 				u64 features)
 {
 	u32 num, i = 0;
 	int err;
 
-	info->dir_dir = *p;
-	if (*p + sizeof(*info->dir_dir) > end)
-		goto bad;
-	*p += sizeof(*info->dir_dir) +
-		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
-	if (*p > end)
-		goto bad;
+	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
+	if (err < 0)
+		goto out_bad;
 
 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
 	num = ceph_decode_32(p);
@@ -226,15 +365,16 @@
 	while (num) {
 		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
 		/* dentry */
-		ceph_decode_need(p, end, sizeof(u32)*2, bad);
-		rde->name_len = ceph_decode_32(p);
+		ceph_decode_32_safe(p, end, rde->name_len, bad);
 		ceph_decode_need(p, end, rde->name_len, bad);
 		rde->name = *p;
 		*p += rde->name_len;
 		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
-		rde->lease = *p;
-		*p += sizeof(struct ceph_mds_reply_lease);
 
+		/* dentry lease */
+		err = parse_reply_info_lease(p, end, &rde->lease, features);
+		if (err)
+			goto out_bad;
 		/* inode */
 		err = parse_reply_info_in(p, end, &rde->inode, features);
 		if (err < 0)
@@ -246,8 +386,8 @@
 	}
 
 done:
-	if (*p != end)
-		goto bad;
+	/* Skip over any unrecognized fields */
+	*p = end;
 	return 0;
 
 bad:
@@ -268,36 +408,145 @@
 		goto bad;
 
 	info->filelock_reply = *p;
-	*p += sizeof(*info->filelock_reply);
 
-	if (unlikely(*p != end))
-		goto bad;
+	/* Skip over any unrecognized fields */
+	*p = end;
 	return 0;
-
 bad:
 	return -EIO;
 }
+
+
+#if BITS_PER_LONG == 64
+
+#define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
+
+static int ceph_parse_deleg_inos(void **p, void *end,
+				 struct ceph_mds_session *s)
+{
+	u32 sets;
+
+	ceph_decode_32_safe(p, end, sets, bad);
+	dout("got %u sets of delegated inodes\n", sets);
+	while (sets--) {
+		u64 start, len, ino;
+
+		ceph_decode_64_safe(p, end, start, bad);
+		ceph_decode_64_safe(p, end, len, bad);
+
+		/* Don't accept a delegation of system inodes */
+		if (start < CEPH_INO_SYSTEM_BASE) {
+			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
+					start, len);
+			continue;
+		}
+		while (len--) {
+			int err = xa_insert(&s->s_delegated_inos, ino = start++,
+					    DELEGATED_INO_AVAILABLE,
+					    GFP_KERNEL);
+			if (!err) {
+				dout("added delegated inode 0x%llx\n",
+				     start - 1);
+			} else if (err == -EBUSY) {
+				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
+					start - 1);
+			} else {
+				return err;
+			}
+		}
+	}
+	return 0;
+bad:
+	return -EIO;
+}
+
+u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
+{
+	unsigned long ino;
+	void *val;
+
+	xa_for_each(&s->s_delegated_inos, ino, val) {
+		val = xa_erase(&s->s_delegated_inos, ino);
+		if (val == DELEGATED_INO_AVAILABLE)
+			return ino;
+	}
+	return 0;
+}
+
+int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
+{
+	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
+			 GFP_KERNEL);
+}
+#else /* BITS_PER_LONG == 64 */
+/*
+ * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
+ * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
+ * and bottom words?
+ */
+static int ceph_parse_deleg_inos(void **p, void *end,
+				 struct ceph_mds_session *s)
+{
+	u32 sets;
+
+	ceph_decode_32_safe(p, end, sets, bad);
+	if (sets)
+		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
+	return 0;
+bad:
+	return -EIO;
+}
+
+u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
+{
+	return 0;
+}
+
+int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
+{
+	return 0;
+}
+#endif /* BITS_PER_LONG == 64 */
 
 /*
  * parse create results
  */
 static int parse_reply_info_create(void **p, void *end,
 				  struct ceph_mds_reply_info_parsed *info,
-				  u64 features)
+				  u64 features, struct ceph_mds_session *s)
 {
-	if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+	int ret;
+
+	if (features == (u64)-1 ||
+	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
 		if (*p == end) {
+			/* Malformed reply? */
 			info->has_create_ino = false;
-		} else {
+		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
+			u8 struct_v, struct_compat;
+			u32 len;
+
 			info->has_create_ino = true;
-			info->ino = ceph_decode_64(p);
+			ceph_decode_8_safe(p, end, struct_v, bad);
+			ceph_decode_8_safe(p, end, struct_compat, bad);
+			ceph_decode_32_safe(p, end, len, bad);
+			ceph_decode_64_safe(p, end, info->ino, bad);
+			ret = ceph_parse_deleg_inos(p, end, s);
+			if (ret)
+				return ret;
+		} else {
+			/* legacy */
+			ceph_decode_64_safe(p, end, info->ino, bad);
+			info->has_create_ino = true;
 		}
+	} else {
+		if (*p != end)
+			goto bad;
 	}
 
-	if (unlikely(*p != end))
-		goto bad;
+	/* Skip over any unrecognized fields */
+	*p = end;
 	return 0;
-
 bad:
 	return -EIO;
 }
@@ -307,16 +556,16 @@
  */
 static int parse_reply_info_extra(void **p, void *end,
 				  struct ceph_mds_reply_info_parsed *info,
-				  u64 features)
+				  u64 features, struct ceph_mds_session *s)
 {
 	u32 op = le32_to_cpu(info->head->op);
 
 	if (op == CEPH_MDS_OP_GETFILELOCK)
 		return parse_reply_info_filelock(p, end, info, features);
 	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
-		return parse_reply_info_dir(p, end, info, features);
+		return parse_reply_info_readdir(p, end, info, features);
 	else if (op == CEPH_MDS_OP_CREATE)
-		return parse_reply_info_create(p, end, info, features);
+		return parse_reply_info_create(p, end, info, features, s);
 	else
 		return -EIO;
 }
@@ -324,7 +573,7 @@
 /*
  * parse entire mds reply
  */
-static int parse_reply_info(struct ceph_msg *msg,
+static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
 			    struct ceph_mds_reply_info_parsed *info,
 			    u64 features)
 {
@@ -349,7 +598,7 @@
 	ceph_decode_32_safe(&p, end, len, bad);
 	if (len > 0) {
 		ceph_decode_need(&p, end, len, bad);
-		err = parse_reply_info_extra(&p, p+len, info, features);
+		err = parse_reply_info_extra(&p, p+len, info, features, s);
 		if (err < 0)
 			goto out_bad;
 	}
@@ -390,6 +639,7 @@
 	case CEPH_MDS_SESSION_OPEN: return "open";
 	case CEPH_MDS_SESSION_HUNG: return "hung";
 	case CEPH_MDS_SESSION_CLOSING: return "closing";
+	case CEPH_MDS_SESSION_CLOSED: return "closed";
 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 	case CEPH_MDS_SESSION_REJECTED: return "rejected";
@@ -397,7 +647,7 @@
 	}
 }
 
-static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
 {
 	if (refcount_inc_not_zero(&s->s_ref)) {
 		dout("mdsc get_session %p %d -> %d\n", s,
@@ -411,11 +661,16 @@
 
 void ceph_put_mds_session(struct ceph_mds_session *s)
 {
+	if (IS_ERR_OR_NULL(s))
+		return;
+
 	dout("mdsc put_session %p %d -> %d\n", s,
 	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
 	if (refcount_dec_and_test(&s->s_ref)) {
 		if (s->s_auth.authorizer)
 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
+		WARN_ON(mutex_is_locked(&s->s_mutex));
+		xa_destroy(&s->s_delegated_inos);
 		kfree(s);
 	}
 }
@@ -426,15 +681,9 @@
 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 						   int mds)
 {
-	struct ceph_mds_session *session;
-
 	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 		return NULL;
-	session = mdsc->sessions[mds];
-	dout("lookup_mds_session %p %d\n", session,
-	     refcount_read(&session->s_ref));
-	get_session(session);
-	return session;
+	return ceph_get_mds_session(mdsc->sessions[mds]);
 }
 
 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -463,7 +712,7 @@
 {
 	struct ceph_mds_session *s;
 
-	if (mds >= mdsc->mdsmap->m_num_mds)
+	if (mds >= mdsc->mdsmap->possible_max_rank)
 		return ERR_PTR(-EINVAL);
 
 	s = kzalloc(sizeof(*s), GFP_NOFS);
@@ -498,7 +747,7 @@
 	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 
 	spin_lock_init(&s->s_gen_ttl_lock);
-	s->s_cap_gen = 0;
+	s->s_cap_gen = 1;
 	s->s_cap_ttl = jiffies - 1;
 
 	spin_lock_init(&s->s_cap_lock);
@@ -506,14 +755,17 @@
 	s->s_renew_seq = 0;
 	INIT_LIST_HEAD(&s->s_caps);
 	s->s_nr_caps = 0;
-	s->s_trim_caps = 0;
 	refcount_set(&s->s_ref, 1);
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_unsafe);
+	xa_init(&s->s_delegated_inos);
 	s->s_num_cap_releases = 0;
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
+	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
+	INIT_LIST_HEAD(&s->s_cap_dirty);
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 
 	mdsc->sessions[mds] = s;
@@ -557,11 +809,39 @@
 	}
 }
 
+void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
+				void (*cb)(struct ceph_mds_session *),
+				bool check_state)
+{
+	int mds;
+
+	mutex_lock(&mdsc->mutex);
+	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
+		struct ceph_mds_session *s;
+
+		s = __ceph_lookup_mds_session(mdsc, mds);
+		if (!s)
+			continue;
+
+		if (check_state && !check_session_state(s)) {
+			ceph_put_mds_session(s);
+			continue;
+		}
+
+		mutex_unlock(&mdsc->mutex);
+		cb(s);
+		ceph_put_mds_session(s);
+		mutex_lock(&mdsc->mutex);
+	}
+	mutex_unlock(&mdsc->mutex);
+}
+
 void ceph_mdsc_release_request(struct kref *kref)
 {
 	struct ceph_mds_request *req = container_of(kref,
 						    struct ceph_mds_request,
 						    r_kref);
+	ceph_mdsc_release_dir_caps_no_check(req);
 	destroy_reply_info(&req->r_reply_info);
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
@@ -569,11 +849,14 @@
 		ceph_msg_put(req->r_reply);
 	if (req->r_inode) {
 		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
-		iput(req->r_inode);
+		/* avoid calling iput_final() in mds dispatch threads */
+		ceph_async_iput(req->r_inode);
 	}
-	if (req->r_parent)
+	if (req->r_parent) {
 		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
-	iput(req->r_target_inode);
+		ceph_async_iput(req->r_parent);
+	}
+	ceph_async_iput(req->r_target_inode);
 	if (req->r_dentry)
 		dput(req->r_dentry);
 	if (req->r_old_dentry)
@@ -587,7 +870,7 @@
 		 */
 		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
-		iput(req->r_old_dentry_dir);
+		ceph_async_iput(req->r_old_dentry_dir);
 	}
 	kfree(req->r_path1);
 	kfree(req->r_path2);
@@ -595,7 +878,8 @@
 		ceph_pagelist_release(req->r_pagelist);
 	put_request_session(req);
 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
-	kfree(req);
+	WARN_ON_ONCE(!list_empty(&req->r_wait));
+	kmem_cache_free(ceph_mds_request_cachep, req);
 }
 
 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
@@ -652,8 +936,13 @@
 		mdsc->oldest_tid = req->r_tid;
 
 	if (dir) {
+		struct ceph_inode_info *ci = ceph_inode(dir);
+
 		ihold(dir);
 		req->r_unsafe_dir = dir;
+		spin_lock(&ci->i_unsafe_lock);
+		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
+		spin_unlock(&ci->i_unsafe_lock);
 	}
 }
 
@@ -681,8 +970,7 @@
 
 	erase_request(&mdsc->request_tree, req);
 
-	if (req->r_unsafe_dir  &&
-	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+	if (req->r_unsafe_dir) {
 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
 		spin_lock(&ci->i_unsafe_lock);
 		list_del_init(&req->r_unsafe_dir_item);
@@ -697,7 +985,8 @@
 	}
 
 	if (req->r_unsafe_dir) {
-		iput(req->r_unsafe_dir);
+		/* avoid calling iput_final() in mds dispatch threads */
+		ceph_async_iput(req->r_unsafe_dir);
 		req->r_unsafe_dir = NULL;
 	}
 
@@ -737,7 +1026,8 @@
  * Called under mdsc->mutex.
  */
 static int __choose_mds(struct ceph_mds_client *mdsc,
-			struct ceph_mds_request *req)
+			struct ceph_mds_request *req,
+			bool *random)
 {
 	struct inode *inode;
 	struct ceph_inode_info *ci;
@@ -747,6 +1037,9 @@
 	u32 hash = req->r_direct_hash;
 	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
 
+	if (random)
+		*random = false;
+
 	/*
 	 * is there a specific mds we should try?  ignore hint if we have
 	 * no session and the mds is not up (active or recovering).
@@ -754,7 +1047,7 @@
 	if (req->r_resend_mds >= 0 &&
 	    (__have_session(mdsc, req->r_resend_mds) ||
 	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
-		dout("choose_mds using resend_mds mds%d\n",
+		dout("%s using resend_mds mds%d\n", __func__,
 		     req->r_resend_mds);
 		return req->r_resend_mds;
 	}
@@ -772,7 +1065,7 @@
 			rcu_read_lock();
 			inode = get_nonsnap_parent(req->r_dentry);
 			rcu_read_unlock();
-			dout("__choose_mds using snapdir's parent %p\n", inode);
+			dout("%s using snapdir's parent %p\n", __func__, inode);
 		}
 	} else if (req->r_dentry) {
 		/* ignore race with rename; old or new d_parent is okay */
@@ -780,7 +1073,7 @@
 		struct inode *dir;
 
 		rcu_read_lock();
-		parent = req->r_dentry->d_parent;
+		parent = READ_ONCE(req->r_dentry->d_parent);
 		dir = req->r_parent ? : d_inode_rcu(parent);
 
 		if (!dir || dir->i_sb != mdsc->fsc->sb) {
@@ -792,7 +1085,7 @@
 			/* direct snapped/virtual snapdir requests
 			 * based on parent dir inode */
 			inode = get_nonsnap_parent(parent);
-			dout("__choose_mds using nonsnap parent %p\n", inode);
+			dout("%s using nonsnap parent %p\n", __func__, inode);
 		} else {
 			/* dentry target */
 			inode = d_inode(req->r_dentry);
@@ -808,8 +1101,8 @@
 		rcu_read_unlock();
 	}
 
-	dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
-	     (int)hash, mode);
+	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
+	     hash, mode);
 	if (!inode)
 		goto random;
 	ci = ceph_inode(inode);
@@ -827,30 +1120,32 @@
 				get_random_bytes(&r, 1);
 				r %= frag.ndist;
 				mds = frag.dist[r];
-				dout("choose_mds %p %llx.%llx "
-				     "frag %u mds%d (%d/%d)\n",
-				     inode, ceph_vinop(inode),
-				     frag.frag, mds,
-				     (int)r, frag.ndist);
+				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
+				     __func__, inode, ceph_vinop(inode),
+				     frag.frag, mds, (int)r, frag.ndist);
 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
-				    CEPH_MDS_STATE_ACTIVE)
+				    CEPH_MDS_STATE_ACTIVE &&
+				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
 					goto out;
 			}
 
 			/* since this file/dir wasn't known to be
 			 * replicated, then we want to look for the
 			 * authoritative mds. */
-			mode = USE_AUTH_MDS;
 			if (frag.mds >= 0) {
 				/* choose auth mds */
 				mds = frag.mds;
-				dout("choose_mds %p %llx.%llx "
-				     "frag %u mds%d (auth)\n",
-				     inode, ceph_vinop(inode), frag.frag, mds);
+				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
+				     __func__, inode, ceph_vinop(inode),
+				     frag.frag, mds);
 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
-				    CEPH_MDS_STATE_ACTIVE)
-					goto out;
+				    CEPH_MDS_STATE_ACTIVE) {
+					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
+								  mds))
+						goto out;
+				}
 			}
+			mode = USE_AUTH_MDS;
 		}
 	}
 
@@ -862,21 +1157,26 @@
 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 	if (!cap) {
 		spin_unlock(&ci->i_ceph_lock);
-		iput(inode);
+		ceph_async_iput(inode);
 		goto random;
 	}
 	mds = cap->session->s_mds;
-	dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
+	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
 	     inode, ceph_vinop(inode), mds,
 	     cap == ci->i_auth_cap ? "auth " : "", cap);
 	spin_unlock(&ci->i_ceph_lock);
 out:
-	iput(inode);
+	/* avoid calling iput_final() while holding mdsc->mutex or
+	 * in mds dispatch threads */
+	ceph_async_iput(inode);
 	return mds;
 
 random:
+	if (random)
+		*random = true;
+
 	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
-	dout("choose_mds chose random mds%d\n", mds);
+	dout("%s chose random mds%d\n", __func__, mds);
 	return mds;
 }
 
@@ -884,7 +1184,7 @@
 /*
  * session messages
  */
-static struct ceph_msg *create_session_msg(u32 op, u64 seq)
+struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
 {
 	struct ceph_msg *msg;
 	struct ceph_mds_session_head *h;
@@ -892,7 +1192,8 @@
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
 			   false);
 	if (!msg) {
-		pr_err("create_session_msg ENOMEM creating msg\n");
+		pr_err("ENOMEM creating session %s msg\n",
+		       ceph_session_op_name(op));
 		return NULL;
 	}
 	h = msg->front.iov_base;
@@ -902,25 +1203,77 @@
 	return msg;
 }
 
-static void encode_supported_features(void **p, void *end)
+static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
+#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
+static int encode_supported_features(void **p, void *end)
 {
-	static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
-	static const size_t count = ARRAY_SIZE(bits);
+	static const size_t count = ARRAY_SIZE(feature_bits);
 
 	if (count > 0) {
 		size_t i;
-		size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
+		size_t size = FEATURE_BYTES(count);
+		unsigned long bit;
 
-		BUG_ON(*p + 4 + size > end);
+		if (WARN_ON_ONCE(*p + 4 + size > end))
+			return -ERANGE;
+
+		ceph_encode_32(p, size);
+		memset(*p, 0, size);
+		for (i = 0; i < count; i++) {
+			bit = feature_bits[i];
+			((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
+		}
+		*p += size;
+	} else {
+		if (WARN_ON_ONCE(*p + 4 > end))
+			return -ERANGE;
+
+		ceph_encode_32(p, 0);
+	}
+
+	return 0;
+}
+
+static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
+#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
+static int encode_metric_spec(void **p, void *end)
+{
+	static const size_t count = ARRAY_SIZE(metric_bits);
+
+	/* header */
+	if (WARN_ON_ONCE(*p + 2 > end))
+		return -ERANGE;
+
+	ceph_encode_8(p, 1); /* version */
+	ceph_encode_8(p, 1); /* compat */
+
+	if (count > 0) {
+		size_t i;
+		size_t size = METRIC_BYTES(count);
+
+		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
+			return -ERANGE;
+
+		/* metric spec info length */
+		ceph_encode_32(p, 4 + size);
+
+		/* metric spec */
 		ceph_encode_32(p, size);
 		memset(*p, 0, size);
 		for (i = 0; i < count; i++)
-			((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
+			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
 		*p += size;
 	} else {
-		BUG_ON(*p + 4 > end);
+		if (WARN_ON_ONCE(*p + 4 + 4 > end))
+			return -ERANGE;
+
+		/* metric spec info length */
+		ceph_encode_32(p, 4);
+		/* metric spec */
 		ceph_encode_32(p, 0);
 	}
+
+	return 0;
 }
 
 /*
@@ -936,7 +1289,9 @@
 	int metadata_key_count = 0;
 	struct ceph_options *opt = mdsc->fsc->client->options;
 	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
+	size_t size, count;
 	void *p, *end;
+	int ret;
 
 	const char* metadata[][2] = {
 		{"hostname", mdsc->nodename},
@@ -953,15 +1308,27 @@
 			strlen(metadata[i][1]);
 		metadata_key_count++;
 	}
+
 	/* supported feature */
-	extra_bytes += 4 + 8;
+	size = 0;
+	count = ARRAY_SIZE(feature_bits);
+	if (count > 0)
+		size = FEATURE_BYTES(count);
+	extra_bytes += 4 + size;
+
+	/* metric spec */
+	size = 0;
+	count = ARRAY_SIZE(metric_bits);
+	if (count > 0)
+		size = METRIC_BYTES(count);
+	extra_bytes += 2 + 4 + 4 + size;
 
 	/* Allocate the message */
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
 			   GFP_NOFS, false);
 	if (!msg) {
-		pr_err("create_session_msg ENOMEM creating msg\n");
-		return NULL;
+		pr_err("ENOMEM creating session open msg\n");
+		return ERR_PTR(-ENOMEM);
 	}
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -974,9 +1341,9 @@
 	 * Serialize client metadata into waiting buffer space, using
 	 * the format that userspace expects for map<string, string>
 	 *
-	 * ClientSession messages with metadata are v2
+	 * ClientSession messages with metadata are v4
 	 */
-	msg->hdr.version = cpu_to_le16(3);
+	msg->hdr.version = cpu_to_le16(4);
 	msg->hdr.compat_version = cpu_to_le16(1);
 
 	/* The write pointer, following the session_head structure */
@@ -998,7 +1365,20 @@
 		p += val_len;
 	}
 
-	encode_supported_features(&p, end);
+	ret = encode_supported_features(&p, end);
+	if (ret) {
+		pr_err("encode_supported_features failed!\n");
+		ceph_msg_put(msg);
+		return ERR_PTR(ret);
+	}
+
+	ret = encode_metric_spec(&p, end);
+	if (ret) {
+		pr_err("encode_metric_spec failed!\n");
+		ceph_msg_put(msg);
+		return ERR_PTR(ret);
+	}
+
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
@@ -1026,8 +1406,8 @@
 
 	/* send connect message */
 	msg = create_session_open_msg(mdsc, session->s_seq);
-	if (!msg)
-		return -ENOMEM;
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
 	ceph_con_send(&session->s_con, msg);
 	return 0;
 }
@@ -1041,6 +1421,7 @@
 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
 {
 	struct ceph_mds_session *session;
+	int ret;
 
 	session = __ceph_lookup_mds_session(mdsc, target);
 	if (!session) {
@@ -1049,8 +1430,11 @@
 			return session;
 	}
 	if (session->s_state == CEPH_MDS_SESSION_NEW ||
-	    session->s_state == CEPH_MDS_SESSION_CLOSING)
-		__open_session(mdsc, session);
+	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
+		ret = __open_session(mdsc, session);
+		if (ret)
+			return ERR_PTR(ret);
+	}
 
 	return session;
 }
@@ -1076,7 +1460,7 @@
 	struct ceph_mds_session *ts;
 	int i, mds = session->s_mds;
 
-	if (mds >= mdsc->mdsmap->m_num_mds)
+	if (mds >= mdsc->mdsmap->possible_max_rank)
 		return;
 
 	mi = &mdsc->mdsmap->m_info[mds];
@@ -1085,8 +1469,7 @@
 
 	for (i = 0; i < mi->num_export_targets; i++) {
 		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
-		if (!IS_ERR(ts))
-			ceph_put_mds_session(ts);
+		ceph_put_mds_session(ts);
 	}
 }
 
@@ -1137,6 +1520,10 @@
 				       struct ceph_mds_request, r_unsafe_item);
 		pr_warn_ratelimited(" dropping unsafe request %llu\n",
 				    req->r_tid);
+		if (req->r_target_inode)
+			mapping_set_error(req->r_target_inode->i_mapping, -EIO);
+		if (req->r_unsafe_dir)
+			mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
 		__unregister_request(mdsc, req);
 	}
 	/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1157,9 +1544,9 @@
  *
  * Caller must hold session s_mutex.
  */
-static int iterate_session_caps(struct ceph_mds_session *session,
-				 int (*cb)(struct inode *, struct ceph_cap *,
-					    void *), void *arg)
+int ceph_iterate_session_caps(struct ceph_mds_session *session,
+			      int (*cb)(struct inode *, struct ceph_cap *,
+					void *), void *arg)
 {
 	struct list_head *p;
 	struct ceph_cap *cap;
@@ -1181,7 +1568,9 @@
 		spin_unlock(&session->s_cap_lock);
 
 		if (last_inode) {
-			iput(last_inode);
+			/* avoid calling iput_final() while holding
+			 * s_mutex or in mds dispatch threads */
+			ceph_async_iput(last_inode);
 			last_inode = NULL;
 		}
 		if (old_cap) {
@@ -1201,13 +1590,11 @@
 			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			if (cap->queue_release) {
-				list_add_tail(&cap->session_caps,
-					      &session->s_cap_releases);
-				session->s_num_cap_releases++;
-			} else {
+			atomic64_dec(&session->s_mdsc->metric.total_caps);
+			if (cap->queue_release)
+				__ceph_queue_cap_release(session, cap);
+			else
 				old_cap = cap;  /* put_cap it w/o locks held */
-			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1217,21 +1604,46 @@
 	session->s_cap_iterator = NULL;
 	spin_unlock(&session->s_cap_lock);
 
-	iput(last_inode);
+	ceph_async_iput(last_inode);
 	if (old_cap)
 		ceph_put_cap(session->s_mdsc, old_cap);
 
 	return ret;
 }
 
+static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_cap_snap *capsnap;
+	int capsnap_release = 0;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
+
+	dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
+
+	while (!list_empty(&ci->i_cap_snaps)) {
+		capsnap = list_first_entry(&ci->i_cap_snaps,
+					   struct ceph_cap_snap, ci_item);
+		__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
+		ceph_put_snap_context(capsnap->context);
+		ceph_put_cap_snap(capsnap);
+		capsnap_release++;
+	}
+	wake_up_all(&ci->i_cap_wq);
+	wake_up_all(&mdsc->cap_flushing_wq);
+	return capsnap_release;
+}
+
 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 				  void *arg)
 {
 	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
+	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	LIST_HEAD(to_remove);
-	bool drop = false;
+	bool dirty_dropped = false;
 	bool invalidate = false;
+	int capsnap_release = 0;
 
 	dout("removing cap %p, ci is %p, inode is %p\n",
 	     cap, ci, &ci->vfs_inode);
@@ -1239,13 +1651,13 @@
 	__ceph_remove_cap(cap, false);
 	if (!ci->i_auth_cap) {
 		struct ceph_cap_flush *cf;
-		struct ceph_mds_client *mdsc = fsc->mdsc;
 
-		ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
-
-		if (ci->i_wrbuffer_ref > 0 &&
-		    READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
-			invalidate = true;
+		if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+			if (inode->i_data.nrpages > 0)
+				invalidate = true;
+			if (ci->i_wrbuffer_ref > 0)
+				mapping_set_error(&inode->i_data, -EIO);
+		}
 
 		while (!list_empty(&ci->i_cap_flush_list)) {
 			cf = list_first_entry(&ci->i_cap_flush_list,
@@ -1256,7 +1668,7 @@
 		spin_lock(&mdsc->cap_dirty_lock);
 
 		list_for_each_entry(cf, &to_remove, i_list)
-			list_del(&cf->g_list);
+			list_del_init(&cf->g_list);
 
 		if (!list_empty(&ci->i_dirty_item)) {
 			pr_warn_ratelimited(
@@ -1265,7 +1677,7 @@
 				inode, ceph_ino(inode));
 			ci->i_dirty_caps = 0;
 			list_del_init(&ci->i_dirty_item);
-			drop = true;
+			dirty_dropped = true;
 		}
 		if (!list_empty(&ci->i_flushing_item)) {
 			pr_warn_ratelimited(
@@ -1275,9 +1687,21 @@
 			ci->i_flushing_caps = 0;
 			list_del_init(&ci->i_flushing_item);
 			mdsc->num_cap_flushing--;
-			drop = true;
+			dirty_dropped = true;
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
+
+		if (dirty_dropped) {
+			mapping_set_error(inode->i_mapping, -EIO);
+
+			if (ci->i_wrbuffer_ref_head == 0 &&
+			    ci->i_wr_ref == 0 &&
+			    ci->i_dirty_caps == 0 &&
+			    ci->i_flushing_caps == 0) {
+				ceph_put_snap_context(ci->i_head_snapc);
+				ci->i_head_snapc = NULL;
+			}
+		}
 
 		if (atomic_read(&ci->i_filelock_ref) > 0) {
 			/* make further file lock syscall return -EIO */
@@ -1291,28 +1715,25 @@
 			ci->i_prealloc_cap_flush = NULL;
 		}
 
-               if (drop &&
-                  ci->i_wrbuffer_ref_head == 0 &&
-                  ci->i_wr_ref == 0 &&
-                  ci->i_dirty_caps == 0 &&
-                  ci->i_flushing_caps == 0) {
-                      ceph_put_snap_context(ci->i_head_snapc);
-                      ci->i_head_snapc = NULL;
-               }
+		if (!list_empty(&ci->i_cap_snaps))
+			capsnap_release = remove_capsnaps(mdsc, inode);
 	}
 	spin_unlock(&ci->i_ceph_lock);
 	while (!list_empty(&to_remove)) {
 		struct ceph_cap_flush *cf;
 		cf = list_first_entry(&to_remove,
 				      struct ceph_cap_flush, i_list);
-		list_del(&cf->i_list);
-		ceph_free_cap_flush(cf);
+		list_del_init(&cf->i_list);
+		if (!cf->is_capsnap)
+			ceph_free_cap_flush(cf);
 	}
 
 	wake_up_all(&ci->i_cap_wq);
 	if (invalidate)
 		ceph_queue_invalidate(inode);
-	if (drop)
+	if (dirty_dropped)
+		iput(inode);
+	while (capsnap_release--)
 		iput(inode);
 	return 0;
 }
@@ -1327,7 +1748,7 @@
 	LIST_HEAD(dispose);
 
 	dout("remove_session_caps on %p\n", session);
-	iterate_session_caps(session, remove_session_caps_cb, fsc);
+	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
 
 	wake_up_all(&fsc->mdsc->cap_flushing_wq);
 
@@ -1353,7 +1774,8 @@
 			spin_unlock(&session->s_cap_lock);
 
 			inode = ceph_find_inode(sb, vino);
-			iput(inode);
+			 /* avoid calling iput_final() while holding s_mutex */
+			ceph_async_iput(inode);
 
 			spin_lock(&session->s_cap_lock);
 		}
@@ -1368,6 +1790,12 @@
 	dispose_cap_releases(session->s_mdsc, &dispose);
 }
 
+enum {
+	RECONNECT,
+	RENEWCAPS,
+	FORCE_RO,
+};
+
 /*
  * wake up any threads waiting on this session's caps.  if the cap is
  * old (didn't get renewed on the client reconnect), remove it now.
@@ -1378,23 +1806,31 @@
 			      void *arg)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	unsigned long ev = (unsigned long)arg;
 
-	if (arg) {
+	if (ev == RECONNECT) {
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_wanted_max_size = 0;
 		ci->i_requested_max_size = 0;
 		spin_unlock(&ci->i_ceph_lock);
+	} else if (ev == RENEWCAPS) {
+		if (cap->cap_gen < cap->session->s_cap_gen) {
+			/* mds did not re-issue stale cap */
+			spin_lock(&ci->i_ceph_lock);
+			cap->issued = cap->implemented = CEPH_CAP_PIN;
+			spin_unlock(&ci->i_ceph_lock);
+		}
+	} else if (ev == FORCE_RO) {
 	}
 	wake_up_all(&ci->i_cap_wq);
 	return 0;
 }
 
-static void wake_up_session_caps(struct ceph_mds_session *session,
-				 int reconnect)
+static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
 {
 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
-	iterate_session_caps(session, wake_up_session_cb,
-			     (void *)(unsigned long)reconnect);
+	ceph_iterate_session_caps(session, wake_up_session_cb,
+				  (void *)(unsigned long)ev);
 }
 
 /*
@@ -1425,8 +1861,8 @@
 
 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
 		ceph_mds_state_name(state));
-	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
-				 ++session->s_renew_seq);
+	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
+				      ++session->s_renew_seq);
 	if (!msg)
 		return -ENOMEM;
 	ceph_con_send(&session->s_con, msg);
@@ -1440,7 +1876,7 @@
 
 	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
 	     session->s_mds, ceph_session_state_name(session->s_state), seq);
-	msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
+	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
 	if (!msg)
 		return -ENOMEM;
 	ceph_con_send(&session->s_con, msg);
@@ -1479,21 +1915,21 @@
 	spin_unlock(&session->s_cap_lock);
 
 	if (wake)
-		wake_up_session_caps(session, 0);
+		wake_up_session_caps(session, RENEWCAPS);
 }
 
 /*
  * send a session close request
  */
-static int request_close_session(struct ceph_mds_client *mdsc,
-				 struct ceph_mds_session *session)
+static int request_close_session(struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg;
 
 	dout("request_close_session mds%d state %s seq %lld\n",
 	     session->s_mds, ceph_session_state_name(session->s_state),
 	     session->s_seq);
-	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
+	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+				      session->s_seq);
 	if (!msg)
 		return -ENOMEM;
 	ceph_con_send(&session->s_con, msg);
@@ -1509,7 +1945,7 @@
 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
 		return 0;
 	session->s_state = CEPH_MDS_SESSION_CLOSING;
-	return request_close_session(mdsc, session);
+	return request_close_session(session);
 }
 
 static bool drop_negative_children(struct dentry *dentry)
@@ -1547,11 +1983,11 @@
  */
 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 {
-	struct ceph_mds_session *session = arg;
+	int *remaining = arg;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int used, wanted, oissued, mine;
 
-	if (session->s_trim_caps <= 0)
+	if (*remaining <= 0)
 		return -1;
 
 	spin_lock(&ci->i_ceph_lock);
@@ -1577,7 +2013,8 @@
 	}
 	/* The inode has cached pages, but it's no longer used.
 	 * we can safely drop it */
-	if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
+	if (S_ISREG(inode->i_mode) &&
+	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
 	    !(oissued & CEPH_CAP_FILE_CACHE)) {
 	  used = 0;
 	  oissued = 0;
@@ -1588,7 +2025,7 @@
 	if (oissued) {
 		/* we aren't the only cap.. just remove us */
 		__ceph_remove_cap(cap, true);
-		session->s_trim_caps--;
+		(*remaining)--;
 	} else {
 		struct dentry *dentry;
 		/* try dropping referring dentries */
@@ -1600,7 +2037,7 @@
 			d_prune_aliases(inode);
 			count = atomic_read(&inode->i_count);
 			if (count == 1)
-				session->s_trim_caps--;
+				(*remaining)--;
 			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
 			     inode, cap, count);
 		} else {
@@ -1626,15 +2063,15 @@
 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
 	if (trim_caps > 0) {
-		session->s_trim_caps = trim_caps;
-		iterate_session_caps(session, trim_caps_cb, session);
+		int remaining = trim_caps;
+
+		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
 		     session->s_mds, session->s_nr_caps, max_caps,
-			trim_caps - session->s_trim_caps);
-		session->s_trim_caps = 0;
+			trim_caps - remaining);
 	}
 
-	ceph_send_cap_releases(mdsc, session);
+	ceph_flush_cap_releases(mdsc, session);
 	return 0;
 }
 
@@ -1677,8 +2114,8 @@
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-			    struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg = NULL;
 	struct ceph_mds_cap_release *head;
@@ -1720,7 +2157,8 @@
 		num_cap_releases--;
 
 		head = msg->front.iov_base;
-		le32_add_cpu(&head->num, 1);
+		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
+				   &head->num);
 		item = msg->front.iov_base + msg->front.iov_len;
 		item->ino = cpu_to_le64(cap->cap_ino);
 		item->cap_id = cpu_to_le64(cap->cap_id);
@@ -1770,6 +2208,81 @@
 	spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+	struct ceph_mds_session *session =
+		container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+	mutex_lock(&session->s_mutex);
+	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+	    session->s_state == CEPH_MDS_SESSION_HUNG)
+		ceph_send_cap_releases(session->s_mdsc, session);
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+		             struct ceph_mds_session *session)
+{
+	if (mdsc->stopping)
+		return;
+
+	ceph_get_mds_session(session);
+	if (queue_work(mdsc->fsc->cap_wq,
+		       &session->s_cap_release_work)) {
+		dout("cap release work queued\n");
+	} else {
+		ceph_put_mds_session(session);
+		dout("failed to queue cap release work\n");
+	}
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+			      struct ceph_cap *cap)
+{
+	list_add_tail(&cap->session_caps, &session->s_cap_releases);
+	session->s_num_cap_releases++;
+
+	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+		ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
+static void ceph_cap_reclaim_work(struct work_struct *work)
+{
+	struct ceph_mds_client *mdsc =
+		container_of(work, struct ceph_mds_client, cap_reclaim_work);
+	int ret = ceph_trim_dentries(mdsc);
+	if (ret == -EAGAIN)
+		ceph_queue_cap_reclaim_work(mdsc);
+}
+
+void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
+{
+	if (mdsc->stopping)
+		return;
+
+        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
+                dout("caps reclaim work queued\n");
+        } else {
+                dout("failed to queue caps release work\n");
+        }
+}
+
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
+{
+	int val;
+	if (!nr)
+		return;
+	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
+	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
+		atomic_set(&mdsc->cap_reclaim_pending, 0);
+		ceph_queue_cap_reclaim_work(mdsc);
+	}
+}
+
 /*
  * requests
  */
@@ -1781,12 +2294,13 @@
 	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
 	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
 	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
-	int order, num_entries;
+	unsigned int num_entries;
+	int order;
 
 	spin_lock(&ci->i_ceph_lock);
 	num_entries = ci->i_files + ci->i_subdirs;
 	spin_unlock(&ci->i_ceph_lock);
-	num_entries = max(num_entries, 1);
+	num_entries = max(num_entries, 1U);
 	num_entries = min(num_entries, opt->max_readdir);
 
 	order = get_order(size * num_entries);
@@ -1817,15 +2331,16 @@
 struct ceph_mds_request *
 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 {
-	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
-	struct timespec64 ts;
+	struct ceph_mds_request *req;
 
+	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
 	if (!req)
 		return ERR_PTR(-ENOMEM);
 
 	mutex_init(&req->r_fill_mutex);
 	req->r_mdsc = mdsc;
 	req->r_started = jiffies;
+	req->r_start_latency = ktime_get();
 	req->r_resend_mds = -1;
 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
 	INIT_LIST_HEAD(&req->r_unsafe_target_item);
@@ -1837,8 +2352,7 @@
 	init_completion(&req->r_safe_completion);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
-	ktime_get_coarse_real_ts64(&ts);
-	req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
+	ktime_get_coarse_real_ts64(&req->r_stamp);
 
 	req->r_op = op;
 	req->r_direct_mode = mode;
@@ -1873,43 +2387,29 @@
  * Encode hidden .snap dirs as a double /, i.e.
  *   foo/.snap/bar -> foo//bar
  */
-char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
 			   int stop_on_nosnap)
 {
 	struct dentry *temp;
 	char *path;
-	int len, pos;
+	int pos;
 	unsigned seq;
+	u64 base;
 
 	if (!dentry)
 		return ERR_PTR(-EINVAL);
 
-retry:
-	len = 0;
-	seq = read_seqbegin(&rename_lock);
-	rcu_read_lock();
-	for (temp = dentry; !IS_ROOT(temp);) {
-		struct inode *inode = d_inode(temp);
-		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
-			len++;  /* slash only */
-		else if (stop_on_nosnap && inode &&
-			 ceph_snap(inode) == CEPH_NOSNAP)
-			break;
-		else
-			len += 1 + temp->d_name.len;
-		temp = temp->d_parent;
-	}
-	rcu_read_unlock();
-	if (len)
-		len--;  /* no leading '/' */
-
-	path = kmalloc(len+1, GFP_NOFS);
+	path = __getname();
 	if (!path)
 		return ERR_PTR(-ENOMEM);
-	pos = len;
-	path[pos] = 0;	/* trailing null */
+retry:
+	pos = PATH_MAX - 1;
+	path[pos] = '\0';
+
+	seq = read_seqbegin(&rename_lock);
 	rcu_read_lock();
-	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+	temp = dentry;
+	for (;;) {
 		struct inode *inode;
 
 		spin_lock(&temp->d_lock);
@@ -1917,9 +2417,10 @@
 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
 			dout("build_path path+%d: %p SNAPDIR\n",
 			     pos, temp);
-		} else if (stop_on_nosnap && inode &&
+		} else if (stop_on_nosnap && inode && dentry != temp &&
 			   ceph_snap(inode) == CEPH_NOSNAP) {
 			spin_unlock(&temp->d_lock);
+			pos++; /* get rid of any prepended '/' */
 			break;
 		} else {
 			pos -= temp->d_name.len;
@@ -1927,83 +2428,58 @@
 				spin_unlock(&temp->d_lock);
 				break;
 			}
-			strncpy(path + pos, temp->d_name.name,
-				temp->d_name.len);
+			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
 		}
 		spin_unlock(&temp->d_lock);
-		if (pos)
-			path[--pos] = '/';
-		temp = temp->d_parent;
+		temp = READ_ONCE(temp->d_parent);
+
+		/* Are we at the root? */
+		if (IS_ROOT(temp))
+			break;
+
+		/* Are we out of buffer? */
+		if (--pos < 0)
+			break;
+
+		path[pos] = '/';
 	}
+	base = ceph_ino(d_inode(temp));
 	rcu_read_unlock();
-	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
-		pr_err("build_path did not end path lookup where "
-		       "expected, namelen is %d, pos is %d\n", len, pos);
-		/* presumably this is only possible if racing with a
-		   rename of one of the parent directories (we can not
-		   lock the dentries above us to prevent this, but
-		   retrying should be harmless) */
-		kfree(path);
+
+	if (read_seqretry(&rename_lock, seq))
+		goto retry;
+
+	if (pos < 0) {
+		/*
+		 * A rename didn't occur, but somehow we didn't end up where
+		 * we thought we would. Throw a warning and try again.
+		 */
+		pr_warn("build_path did not end path lookup where "
+			"expected, pos is %d\n", pos);
 		goto retry;
 	}
 
-	*base = ceph_ino(d_inode(temp));
-	*plen = len;
+	*pbase = base;
+	*plen = PATH_MAX - 1 - pos;
 	dout("build_path on %p %d built %llx '%.*s'\n",
-	     dentry, d_count(dentry), *base, len, path);
-	return path;
-}
-
-/* Duplicate the dentry->d_name.name safely */
-static int clone_dentry_name(struct dentry *dentry, const char **ppath,
-			     int *ppathlen)
-{
-	u32 len;
-	char *name;
-
-retry:
-	len = READ_ONCE(dentry->d_name.len);
-	name = kmalloc(len + 1, GFP_NOFS);
-	if (!name)
-		return -ENOMEM;
-
-	spin_lock(&dentry->d_lock);
-	if (dentry->d_name.len != len) {
-		spin_unlock(&dentry->d_lock);
-		kfree(name);
-		goto retry;
-	}
-	memcpy(name, dentry->d_name.name, len);
-	spin_unlock(&dentry->d_lock);
-
-	name[len] = '\0';
-	*ppath = name;
-	*ppathlen = len;
-	return 0;
+	     dentry, d_count(dentry), base, *plen, path + pos);
+	return path + pos;
 }
 
 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
 			     const char **ppath, int *ppathlen, u64 *pino,
 			     bool *pfreepath, bool parent_locked)
 {
-	int ret;
 	char *path;
 
 	rcu_read_lock();
 	if (!dir)
 		dir = d_inode_rcu(dentry->d_parent);
-	if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
+	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
 		*pino = ceph_ino(dir);
 		rcu_read_unlock();
-		if (parent_locked) {
-			*ppath = dentry->d_name.name;
-			*ppathlen = dentry->d_name.len;
-		} else {
-			ret = clone_dentry_name(dentry, ppath, ppathlen);
-			if (ret)
-				return ret;
-			*pfreepath = true;
-		}
+		*ppath = dentry->d_name.name;
+		*ppathlen = dentry->d_name.len;
 		return 0;
 	}
 	rcu_read_unlock();
@@ -2115,11 +2591,11 @@
 		(!!req->r_inode_drop + !!req->r_dentry_drop +
 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
 	if (req->r_dentry_drop)
-		len += req->r_dentry->d_name.len;
+		len += pathlen1;
 	if (req->r_old_dentry_drop)
-		len += req->r_old_dentry->d_name.len;
+		len += pathlen2;
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
+	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
 	if (!msg) {
 		msg = ERR_PTR(-ENOMEM);
 		goto out_free2;
@@ -2136,6 +2612,7 @@
 	head->op = cpu_to_le32(req->r_op);
 	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
 	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
+	head->ino = cpu_to_le64(req->r_deleg_ino);
 	head->args = req->r_args;
 
 	ceph_encode_filepath(&p, end, ino1, path1);
@@ -2149,7 +2626,8 @@
 	if (req->r_inode_drop)
 		releases += ceph_encode_inode_release(&p,
 		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
-		      mds, req->r_inode_drop, req->r_inode_unless, 0);
+		      mds, req->r_inode_drop, req->r_inode_unless,
+		      req->r_op == CEPH_MDS_OP_READDIR);
 	if (req->r_dentry_drop)
 		releases += ceph_encode_dentry_release(&p, req->r_dentry,
 				req->r_parent, mds, req->r_dentry_drop,
@@ -2178,13 +2656,17 @@
 		ceph_encode_copy(&p, &ts, sizeof(ts));
 	}
 
-	BUG_ON(p > end);
+	if (WARN_ON_ONCE(p > end)) {
+		ceph_msg_put(msg);
+		msg = ERR_PTR(-ERANGE);
+		goto out_free2;
+	}
+
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
 	if (req->r_pagelist) {
 		struct ceph_pagelist *pagelist = req->r_pagelist;
-		refcount_inc(&pagelist->refcnt);
 		ceph_msg_data_add_pagelist(msg, pagelist);
 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
 	} else {
@@ -2195,10 +2677,10 @@
 
 out_free2:
 	if (freepath2)
-		kfree((char *)path2);
+		ceph_mdsc_free_path((char *)path2, pathlen2);
 out_free1:
 	if (freepath1)
-		kfree((char *)path1);
+		ceph_mdsc_free_path((char *)path1, pathlen1);
 out:
 	return msg;
 }
@@ -2210,10 +2692,11 @@
 static void complete_request(struct ceph_mds_client *mdsc,
 			     struct ceph_mds_request *req)
 {
+	req->r_end_latency = ktime_get();
+
 	if (req->r_callback)
 		req->r_callback(mdsc, req);
-	else
-		complete_all(&req->r_completion);
+	complete_all(&req->r_completion);
 }
 
 /*
@@ -2291,15 +2774,36 @@
 	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
 	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
 		flags |= CEPH_MDS_FLAG_REPLAY;
+	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
+		flags |= CEPH_MDS_FLAG_ASYNC;
 	if (req->r_parent)
 		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
 	rhead->flags = cpu_to_le32(flags);
 	rhead->num_fwd = req->r_num_fwd;
 	rhead->num_retry = req->r_attempts - 1;
-	rhead->ino = 0;
 
 	dout(" r_parent = %p\n", req->r_parent);
 	return 0;
+}
+
+/*
+ * called under mdsc->mutex
+ */
+static int __send_request(struct ceph_mds_client *mdsc,
+			  struct ceph_mds_session *session,
+			  struct ceph_mds_request *req,
+			  bool drop_cap_releases)
+{
+	int err;
+
+	err = __prepare_send_request(mdsc, req, session->s_mds,
+				     drop_cap_releases);
+	if (!err) {
+		ceph_msg_get(req->r_request);
+		ceph_con_send(&session->s_con, req->r_request);
+	}
+
+	return err;
 }
 
 /*
@@ -2311,6 +2815,7 @@
 	struct ceph_mds_session *session = NULL;
 	int mds = -1;
 	int err = 0;
+	bool random;
 
 	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
 		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
@@ -2321,7 +2826,7 @@
 	if (req->r_timeout &&
 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
 		dout("do_request timed out\n");
-		err = -EIO;
+		err = -ETIMEDOUT;
 		goto finish;
 	}
 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
@@ -2350,9 +2855,13 @@
 
 	put_request_session(req);
 
-	mds = __choose_mds(mdsc, req);
+	mds = __choose_mds(mdsc, req, &random);
 	if (mds < 0 ||
 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
+			err = -EJUKEBOX;
+			goto finish;
+		}
 		dout("do_request no mds or not active, waiting for map\n");
 		list_add(&req->r_wait, &mdsc->waiting_for_map);
 		return;
@@ -2367,7 +2876,7 @@
 			goto finish;
 		}
 	}
-	req->r_session = get_session(session);
+	req->r_session = ceph_get_mds_session(session);
 
 	dout("do_request mds%d session %p state %s\n", mds, session,
 	     ceph_session_state_name(session->s_state));
@@ -2377,9 +2886,24 @@
 			err = -EACCES;
 			goto out_session;
 		}
+		/*
+		 * We cannot queue async requests since the caps and delegated
+		 * inodes are bound to the session. Just return -EJUKEBOX and
+		 * let the caller retry a sync request in that case.
+		 */
+		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
+			err = -EJUKEBOX;
+			goto out_session;
+		}
 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
-		    session->s_state == CEPH_MDS_SESSION_CLOSING)
-			__open_session(mdsc, session);
+		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
+			err = __open_session(mdsc, session);
+			if (err)
+				goto out_session;
+			/* retry the same mds later */
+			if (random)
+				req->r_resend_mds = mds;
+		}
 		list_add(&req->r_wait, &session->s_waiting);
 		goto out_session;
 	}
@@ -2390,11 +2914,7 @@
 	if (req->r_request_started == 0)   /* note request start time */
 		req->r_request_started = jiffies;
 
-	err = __prepare_send_request(mdsc, req, mds, false);
-	if (!err) {
-		ceph_msg_get(req->r_request);
-		ceph_con_send(&session->s_con, req->r_request);
-	}
+	err = __send_request(mdsc, session, req, false);
 
 out_session:
 	ceph_put_mds_session(session);
@@ -2454,49 +2974,61 @@
 	}
 }
 
-void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
 			      struct ceph_mds_request *req)
 {
-	dout("submit_request on %p\n", req);
-	mutex_lock(&mdsc->mutex);
-	__register_request(mdsc, req, NULL);
-	__do_request(mdsc, req);
-	mutex_unlock(&mdsc->mutex);
-}
-
-/*
- * Synchrously perform an mds request.  Take care of all of the
- * session setup, forwarding, retry details.
- */
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
-			 struct inode *dir,
-			 struct ceph_mds_request *req)
-{
-	int err;
-
-	dout("do_request on %p\n", req);
+	int err = 0;
 
 	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
 	if (req->r_inode)
 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
-	if (req->r_parent)
-		ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+	if (req->r_parent) {
+		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
+		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
+			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
+		spin_lock(&ci->i_ceph_lock);
+		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
+		__ceph_touch_fmode(ci, mdsc, fmode);
+		spin_unlock(&ci->i_ceph_lock);
+		ihold(req->r_parent);
+	}
 	if (req->r_old_dentry_dir)
 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
 
-	/* issue */
+	if (req->r_inode) {
+		err = ceph_wait_on_async_create(req->r_inode);
+		if (err) {
+			dout("%s: wait for async create returned: %d\n",
+			     __func__, err);
+			return err;
+		}
+	}
+
+	if (!err && req->r_old_inode) {
+		err = ceph_wait_on_async_create(req->r_old_inode);
+		if (err) {
+			dout("%s: wait for async create returned: %d\n",
+			     __func__, err);
+			return err;
+		}
+	}
+
+	dout("submit_request on %p for inode %p\n", req, dir);
 	mutex_lock(&mdsc->mutex);
 	__register_request(mdsc, req, dir);
 	__do_request(mdsc, req);
+	err = req->r_err;
+	mutex_unlock(&mdsc->mutex);
+	return err;
+}
 
-	if (req->r_err) {
-		err = req->r_err;
-		goto out;
-	}
+static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+				  struct ceph_mds_request *req)
+{
+	int err;
 
 	/* wait */
-	mutex_unlock(&mdsc->mutex);
 	dout("do_request waiting\n");
 	if (!req->r_timeout && req->r_wait_for_completion) {
 		err = req->r_wait_for_completion(mdsc, req);
@@ -2507,7 +3039,7 @@
 		if (timeleft > 0)
 			err = 0;
 		else if (!timeleft)
-			err = -EIO;  /* timed out */
+			err = -ETIMEDOUT;  /* timed out */
 		else
 			err = timeleft;  /* killed */
 	}
@@ -2537,8 +3069,26 @@
 		err = req->r_err;
 	}
 
-out:
 	mutex_unlock(&mdsc->mutex);
+	return err;
+}
+
+/*
+ * Synchrously perform an mds request.  Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+			 struct inode *dir,
+			 struct ceph_mds_request *req)
+{
+	int err;
+
+	dout("do_request on %p\n", req);
+
+	/* issue */
+	err = ceph_mdsc_submit_request(mdsc, dir, req);
+	if (!err)
+		err = ceph_mdsc_wait_request(mdsc, req);
 	dout("do_request %p done, result %d\n", req, err);
 	return err;
 }
@@ -2641,7 +3191,7 @@
 			mutex_unlock(&mdsc->mutex);
 			goto out;
 		} else  {
-			int mds = __choose_mds(mdsc, req);
+			int mds = __choose_mds(mdsc, req, NULL);
 			if (mds >= 0 && mds != req->r_session->s_mds) {
 				dout("but auth changed, so resending\n");
 				__do_request(mdsc, req);
@@ -2657,6 +3207,10 @@
 		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
 		__unregister_request(mdsc, req);
 
+		/* last request during umount? */
+		if (mdsc->stopping && !__get_oldest_req(mdsc))
+			complete_all(&mdsc->safe_umount_waiters);
+
 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
 			/*
 			 * We already handled the unsafe response, now do the
@@ -2667,28 +3221,20 @@
 			 */
 			dout("got safe reply %llu, mds%d\n", tid, mds);
 
-			/* last unsafe request during umount? */
-			if (mdsc->stopping && !__get_oldest_req(mdsc))
-				complete_all(&mdsc->safe_umount_waiters);
 			mutex_unlock(&mdsc->mutex);
 			goto out;
 		}
 	} else {
 		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
-		if (req->r_unsafe_dir) {
-			struct ceph_inode_info *ci =
-					ceph_inode(req->r_unsafe_dir);
-			spin_lock(&ci->i_unsafe_lock);
-			list_add_tail(&req->r_unsafe_dir_item,
-				      &ci->i_unsafe_dirops);
-			spin_unlock(&ci->i_unsafe_lock);
-		}
 	}
 
 	dout("handle_reply tid %lld result %d\n", tid, result);
 	rinfo = &req->r_reply_info;
-	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
+	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
+		err = parse_reply_info(session, msg, rinfo, (u64)-1);
+	else
+		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
 	mutex_unlock(&mdsc->mutex);
 
 	mutex_lock(&session->s_mutex);
@@ -2719,7 +3265,6 @@
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
 				    req->r_op == CEPH_MDS_OP_LSSNAP))
 			ceph_readdir_prepopulate(req, req->r_session);
-		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
@@ -2728,12 +3273,18 @@
 	if (realm)
 		ceph_put_snap_realm(mdsc, realm);
 
-	if (err == 0 && req->r_target_inode &&
-	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
-		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
-		spin_lock(&ci->i_unsafe_lock);
-		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
-		spin_unlock(&ci->i_unsafe_lock);
+	if (err == 0) {
+		if (req->r_target_inode &&
+		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+			struct ceph_inode_info *ci =
+				ceph_inode(req->r_target_inode);
+			spin_lock(&ci->i_unsafe_lock);
+			list_add_tail(&req->r_unsafe_target_item,
+				      &ci->i_unsafe_iops);
+			spin_unlock(&ci->i_unsafe_lock);
+		}
+
+		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 out_err:
 	mutex_lock(&mdsc->mutex);
@@ -2753,6 +3304,9 @@
 
 	/* kick calling process */
 	complete_request(mdsc, req);
+
+	ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
+				     req->r_end_latency, err);
 out:
 	ceph_mdsc_put_request(req);
 	return;
@@ -2812,6 +3366,34 @@
 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
 }
 
+static int __decode_session_metadata(void **p, void *end,
+				     bool *blocklisted)
+{
+	/* map<string,string> */
+	u32 n;
+	bool err_str;
+	ceph_decode_32_safe(p, end, n, bad);
+	while (n-- > 0) {
+		u32 len;
+		ceph_decode_32_safe(p, end, len, bad);
+		ceph_decode_need(p, end, len, bad);
+		err_str = !strncmp(*p, "error_string", len);
+		*p += len;
+		ceph_decode_32_safe(p, end, len, bad);
+		ceph_decode_need(p, end, len, bad);
+		/*
+		 * Match "blocklisted (blacklisted)" from newer MDSes,
+		 * or "blacklisted" from older MDSes.
+		 */
+		if (err_str && strnstr(*p, "blacklisted", len))
+			*blocklisted = true;
+		*p += len;
+	}
+	return 0;
+bad:
+	return -1;
+}
+
 /*
  * handle a mds session control message
  */
@@ -2819,21 +3401,40 @@
 			   struct ceph_msg *msg)
 {
 	struct ceph_mds_client *mdsc = session->s_mdsc;
-	u32 op;
-	u64 seq;
 	int mds = session->s_mds;
-	struct ceph_mds_session_head *h = msg->front.iov_base;
+	int msg_version = le16_to_cpu(msg->hdr.version);
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	struct ceph_mds_session_head *h;
+	u32 op;
+	u64 seq, features = 0;
 	int wake = 0;
+	bool blocklisted = false;
 
 	/* decode */
-	if (msg->front.iov_len < sizeof(*h))
-		goto bad;
+	ceph_decode_need(&p, end, sizeof(*h), bad);
+	h = p;
+	p += sizeof(*h);
+
 	op = le32_to_cpu(h->op);
 	seq = le64_to_cpu(h->seq);
 
+	if (msg_version >= 3) {
+		u32 len;
+		/* version >= 2, metadata */
+		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
+			goto bad;
+		/* version >= 3, feature bits */
+		ceph_decode_32_safe(&p, end, len, bad);
+		if (len) {
+			ceph_decode_64_safe(&p, end, features, bad);
+			p += len - sizeof(features);
+		}
+	}
+
 	mutex_lock(&mdsc->mutex);
 	if (op == CEPH_SESSION_CLOSE) {
-		get_session(session);
+		ceph_get_mds_session(session);
 		__unregister_session(mdsc, session);
 	}
 	/* FIXME: this ttl calculation is generous */
@@ -2856,7 +3457,10 @@
 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
 			pr_info("mds%d reconnect success\n", session->s_mds);
 		session->s_state = CEPH_MDS_SESSION_OPEN;
+		session->s_features = features;
 		renewed_caps(mdsc, session, 0);
+		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
+			metric_schedule_delayed(&mdsc->metric);
 		wake = 1;
 		if (mdsc->stopping)
 			__close_session(mdsc, session);
@@ -2870,6 +3474,7 @@
 	case CEPH_SESSION_CLOSE:
 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
 			pr_info("mds%d reconnect denied\n", session->s_mds);
+		session->s_state = CEPH_MDS_SESSION_CLOSED;
 		cleanup_session_requests(mdsc, session);
 		remove_session_caps(session);
 		wake = 2; /* for good measure */
@@ -2891,6 +3496,12 @@
 		break;
 
 	case CEPH_SESSION_FLUSHMSG:
+		/* flush cap releases */
+		spin_lock(&session->s_cap_lock);
+		if (session->s_num_cap_releases)
+			ceph_flush_cap_releases(mdsc, session);
+		spin_unlock(&session->s_cap_lock);
+
 		send_flushmsg_ack(mdsc, session, seq);
 		break;
 
@@ -2899,7 +3510,7 @@
 		spin_lock(&session->s_cap_lock);
 		session->s_readonly = true;
 		spin_unlock(&session->s_cap_lock);
-		wake_up_session_caps(session, 0);
+		wake_up_session_caps(session, FORCE_RO);
 		break;
 
 	case CEPH_SESSION_REJECT:
@@ -2908,6 +3519,8 @@
 		session->s_state = CEPH_MDS_SESSION_REJECTED;
 		cleanup_session_requests(mdsc, session);
 		remove_session_caps(session);
+		if (blocklisted)
+			mdsc->fsc->blocklisted = true;
 		wake = 2; /* for good measure */
 		break;
 
@@ -2935,6 +3548,28 @@
 	return;
 }
 
+void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
+{
+	int dcaps;
+
+	dcaps = xchg(&req->r_dir_caps, 0);
+	if (dcaps) {
+		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
+	}
+}
+
+void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
+{
+	int dcaps;
+
+	dcaps = xchg(&req->r_dir_caps, 0);
+	if (dcaps) {
+		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
+		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
+						dcaps);
+	}
+}
 
 /*
  * called under session->mutex.
@@ -2944,18 +3579,12 @@
 {
 	struct ceph_mds_request *req, *nreq;
 	struct rb_node *p;
-	int err;
 
 	dout("replay_unsafe_requests mds%d\n", session->s_mds);
 
 	mutex_lock(&mdsc->mutex);
-	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
-		err = __prepare_send_request(mdsc, req, session->s_mds, true);
-		if (!err) {
-			ceph_msg_get(req->r_request);
-			ceph_con_send(&session->s_con, req->r_request);
-		}
-	}
+	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
+		__send_request(mdsc, session, req, true);
 
 	/*
 	 * also re-send old requests when MDS enters reconnect stage. So that MDS
@@ -2969,23 +3598,131 @@
 			continue;
 		if (req->r_attempts == 0)
 			continue; /* only old requests */
-		if (req->r_session &&
-		    req->r_session->s_mds == session->s_mds) {
-			err = __prepare_send_request(mdsc, req,
-						     session->s_mds, true);
-			if (!err) {
-				ceph_msg_get(req->r_request);
-				ceph_con_send(&session->s_con, req->r_request);
-			}
-		}
+		if (!req->r_session)
+			continue;
+		if (req->r_session->s_mds != session->s_mds)
+			continue;
+
+		ceph_mdsc_release_dir_caps_no_check(req);
+
+		__send_request(mdsc, session, req, true);
 	}
 	mutex_unlock(&mdsc->mutex);
+}
+
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
+{
+	struct ceph_msg *reply;
+	struct ceph_pagelist *_pagelist;
+	struct page *page;
+	__le32 *addr;
+	int err = -ENOMEM;
+
+	if (!recon_state->allow_multi)
+		return -ENOSPC;
+
+	/* can't handle message that contains both caps and realm */
+	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
+
+	/* pre-allocate new pagelist */
+	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
+	if (!_pagelist)
+		return -ENOMEM;
+
+	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
+	if (!reply)
+		goto fail_msg;
+
+	/* placeholder for nr_caps */
+	err = ceph_pagelist_encode_32(_pagelist, 0);
+	if (err < 0)
+		goto fail;
+
+	if (recon_state->nr_caps) {
+		/* currently encoding caps */
+		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+		if (err)
+			goto fail;
+	} else {
+		/* placeholder for nr_realms (currently encoding relams) */
+		err = ceph_pagelist_encode_32(_pagelist, 0);
+		if (err < 0)
+			goto fail;
+	}
+
+	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+	if (err)
+		goto fail;
+
+	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
+	addr = kmap_atomic(page);
+	if (recon_state->nr_caps) {
+		/* currently encoding caps */
+		*addr = cpu_to_le32(recon_state->nr_caps);
+	} else {
+		/* currently encoding relams */
+		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
+	}
+	kunmap_atomic(addr);
+
+	reply->hdr.version = cpu_to_le16(5);
+	reply->hdr.compat_version = cpu_to_le16(4);
+
+	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
+	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+
+	ceph_con_send(&recon_state->session->s_con, reply);
+	ceph_pagelist_release(recon_state->pagelist);
+
+	recon_state->pagelist = _pagelist;
+	recon_state->nr_caps = 0;
+	recon_state->nr_realms = 0;
+	recon_state->msg_version = 5;
+	return 0;
+fail:
+	ceph_msg_put(reply);
+fail_msg:
+	ceph_pagelist_release(_pagelist);
+	return err;
+}
+
+static struct dentry* d_find_primary(struct inode *inode)
+{
+	struct dentry *alias, *dn = NULL;
+
+	if (hlist_empty(&inode->i_dentry))
+		return NULL;
+
+	spin_lock(&inode->i_lock);
+	if (hlist_empty(&inode->i_dentry))
+		goto out_unlock;
+
+	if (S_ISDIR(inode->i_mode)) {
+		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
+		if (!IS_ROOT(alias))
+			dn = dget(alias);
+		goto out_unlock;
+	}
+
+	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
+		spin_lock(&alias->d_lock);
+		if (!d_unhashed(alias) &&
+		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
+			dn = dget_dlock(alias);
+		}
+		spin_unlock(&alias->d_lock);
+		if (dn)
+			break;
+	}
+out_unlock:
+	spin_unlock(&inode->i_lock);
+	return dn;
 }
 
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
-static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
+static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			  void *arg)
 {
 	union {
@@ -2995,29 +3732,28 @@
 	struct ceph_inode_info *ci = cap->ci;
 	struct ceph_reconnect_state *recon_state = arg;
 	struct ceph_pagelist *pagelist = recon_state->pagelist;
+	struct dentry *dentry;
 	char *path;
-	int pathlen, err;
+	int pathlen = 0, err;
 	u64 pathbase;
 	u64 snap_follows;
-	struct dentry *dentry;
 
 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
 	     inode, ceph_vinop(inode), cap, cap->cap_id,
 	     ceph_cap_string(cap->issued));
-	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
-	if (err)
-		return err;
 
-	dentry = d_find_alias(inode);
+	dentry = d_find_primary(inode);
 	if (dentry) {
-		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
+		/* set pathbase to parent dir when msg_version >= 2 */
+		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
+					    recon_state->msg_version >= 2);
+		dput(dentry);
 		if (IS_ERR(path)) {
 			err = PTR_ERR(path);
-			goto out_dput;
+			goto out_err;
 		}
 	} else {
 		path = NULL;
-		pathlen = 0;
 		pathbase = 0;
 	}
 
@@ -3026,6 +3762,15 @@
 	cap->issue_seq = 0;  /* and issue_seq */
 	cap->mseq = 0;       /* and migrate_seq */
 	cap->cap_gen = cap->session->s_cap_gen;
+
+	/* These are lost when the session goes away */
+	if (S_ISDIR(inode->i_mode)) {
+		if (cap->issued & CEPH_CAP_DIR_CREATE) {
+			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
+			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
+		}
+		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
+	}
 
 	if (recon_state->msg_version >= 2) {
 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -3059,7 +3804,7 @@
 	if (recon_state->msg_version >= 2) {
 		int num_fcntl_locks, num_flock_locks;
 		struct ceph_filelock *flocks = NULL;
-		size_t struct_len, total_len = 0;
+		size_t struct_len, total_len = sizeof(u64);
 		u8 struct_v = 0;
 
 encode_again:
@@ -3075,7 +3820,7 @@
 					       GFP_NOFS);
 			if (!flocks) {
 				err = -ENOMEM;
-				goto out_free;
+				goto out_err;
 			}
 			err = ceph_encode_locks_to_buffer(inode, flocks,
 							  num_fcntl_locks,
@@ -3085,7 +3830,7 @@
 				flocks = NULL;
 				if (err == -ENOSPC)
 					goto encode_again;
-				goto out_free;
+				goto out_err;
 			}
 		} else {
 			kfree(flocks);
@@ -3094,7 +3839,7 @@
 
 		if (recon_state->msg_version >= 3) {
 			/* version, compat_version and struct_len */
-			total_len = 2 * sizeof(u8) + sizeof(u32);
+			total_len += 2 * sizeof(u8) + sizeof(u32);
 			struct_v = 2;
 		}
 		/*
@@ -3105,44 +3850,113 @@
 			    sizeof(struct ceph_filelock);
 		rec.v2.flock_len = cpu_to_le32(struct_len);
 
-		struct_len += sizeof(rec.v2);
-		struct_len += sizeof(u32) + pathlen;
+		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
 
 		if (struct_v >= 2)
 			struct_len += sizeof(u64); /* snap_follows */
 
 		total_len += struct_len;
-		err = ceph_pagelist_reserve(pagelist, total_len);
 
-		if (!err) {
-			if (recon_state->msg_version >= 3) {
-				ceph_pagelist_encode_8(pagelist, struct_v);
-				ceph_pagelist_encode_8(pagelist, 1);
-				ceph_pagelist_encode_32(pagelist, struct_len);
-			}
-			ceph_pagelist_encode_string(pagelist, path, pathlen);
-			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
-			ceph_locks_to_pagelist(flocks, pagelist,
-					       num_fcntl_locks,
-					       num_flock_locks);
-			if (struct_v >= 2)
-				ceph_pagelist_encode_64(pagelist, snap_follows);
+		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+			err = send_reconnect_partial(recon_state);
+			if (err)
+				goto out_freeflocks;
+			pagelist = recon_state->pagelist;
 		}
+
+		err = ceph_pagelist_reserve(pagelist, total_len);
+		if (err)
+			goto out_freeflocks;
+
+		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+		if (recon_state->msg_version >= 3) {
+			ceph_pagelist_encode_8(pagelist, struct_v);
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_32(pagelist, struct_len);
+		}
+		ceph_pagelist_encode_string(pagelist, path, pathlen);
+		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+		ceph_locks_to_pagelist(flocks, pagelist,
+				       num_fcntl_locks, num_flock_locks);
+		if (struct_v >= 2)
+			ceph_pagelist_encode_64(pagelist, snap_follows);
+out_freeflocks:
 		kfree(flocks);
 	} else {
-		size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
-		err = ceph_pagelist_reserve(pagelist, size);
-		if (!err) {
-			ceph_pagelist_encode_string(pagelist, path, pathlen);
-			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
-		}
+		err = ceph_pagelist_reserve(pagelist,
+					    sizeof(u64) + sizeof(u32) +
+					    pathlen + sizeof(rec.v1));
+		if (err)
+			goto out_err;
+
+		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+		ceph_pagelist_encode_string(pagelist, path, pathlen);
+		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
 	}
 
-	recon_state->nr_caps++;
-out_free:
-	kfree(path);
-out_dput:
-	dput(dentry);
+out_err:
+	ceph_mdsc_free_path(path, pathlen);
+	if (!err)
+		recon_state->nr_caps++;
+	return err;
+}
+
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
+			      struct ceph_reconnect_state *recon_state)
+{
+	struct rb_node *p;
+	struct ceph_pagelist *pagelist = recon_state->pagelist;
+	int err = 0;
+
+	if (recon_state->msg_version >= 4) {
+		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+		if (err < 0)
+			goto fail;
+	}
+
+	/*
+	 * snaprealms.  we provide mds with the ino, seq (version), and
+	 * parent for all of our realms.  If the mds has any newer info,
+	 * it will tell us.
+	 */
+	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+		struct ceph_snap_realm *realm =
+		       rb_entry(p, struct ceph_snap_realm, node);
+		struct ceph_mds_snaprealm_reconnect sr_rec;
+
+		if (recon_state->msg_version >= 4) {
+			size_t need = sizeof(u8) * 2 + sizeof(u32) +
+				      sizeof(sr_rec);
+
+			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+				err = send_reconnect_partial(recon_state);
+				if (err)
+					goto fail;
+				pagelist = recon_state->pagelist;
+			}
+
+			err = ceph_pagelist_reserve(pagelist, need);
+			if (err)
+				goto fail;
+
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_8(pagelist, 1);
+			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+		}
+
+		dout(" adding snap realm %llx seq %lld parent %llx\n",
+		     realm->ino, realm->seq, realm->parent_ino);
+		sr_rec.ino = cpu_to_le64(realm->ino);
+		sr_rec.seq = cpu_to_le64(realm->seq);
+		sr_rec.parent = cpu_to_le64(realm->parent_ino);
+
+		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+		if (err)
+			goto fail;
+
+		recon_state->nr_realms++;
+	}
+fail:
 	return err;
 }
 
@@ -3156,31 +3970,29 @@
  * recovering MDS might have.
  *
  * This is a relatively heavyweight operation, but it's rare.
- *
- * called with mdsc->mutex held.
  */
 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 			       struct ceph_mds_session *session)
 {
 	struct ceph_msg *reply;
-	struct rb_node *p;
 	int mds = session->s_mds;
 	int err = -ENOMEM;
-	int s_nr_caps;
-	struct ceph_pagelist *pagelist;
-	struct ceph_reconnect_state recon_state;
+	struct ceph_reconnect_state recon_state = {
+		.session = session,
+	};
 	LIST_HEAD(dispose);
 
 	pr_info("mds%d reconnect start\n", mds);
 
-	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
-	if (!pagelist)
+	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
+	if (!recon_state.pagelist)
 		goto fail_nopagelist;
-	ceph_pagelist_init(pagelist);
 
-	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
+	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
 	if (!reply)
 		goto fail_nomsg;
+
+	xa_destroy(&session->s_delegated_inos);
 
 	mutex_lock(&session->s_mutex);
 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
@@ -3219,65 +4031,90 @@
 	/* replay unsafe requests */
 	replay_unsafe_requests(mdsc, session);
 
+	ceph_early_kick_flushing_caps(mdsc, session);
+
 	down_read(&mdsc->snap_rwsem);
 
-	/* traverse this session's caps */
-	s_nr_caps = session->s_nr_caps;
-	err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
+	/* placeholder for nr_caps */
+	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
 	if (err)
 		goto fail;
 
-	recon_state.nr_caps = 0;
-	recon_state.pagelist = pagelist;
-	if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
 		recon_state.msg_version = 3;
-	else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+		recon_state.allow_multi = true;
+	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
+		recon_state.msg_version = 3;
+	} else {
 		recon_state.msg_version = 2;
-	else
-		recon_state.msg_version = 1;
-	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
-	if (err < 0)
-		goto fail;
+	}
+	/* trsaverse this session's caps */
+	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
 
 	spin_lock(&session->s_cap_lock);
 	session->s_cap_reconnect = 0;
 	spin_unlock(&session->s_cap_lock);
 
-	/*
-	 * snaprealms.  we provide mds with the ino, seq (version), and
-	 * parent for all of our realms.  If the mds has any newer info,
-	 * it will tell us.
-	 */
-	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
-		struct ceph_snap_realm *realm =
-			rb_entry(p, struct ceph_snap_realm, node);
-		struct ceph_mds_snaprealm_reconnect sr_rec;
+	if (err < 0)
+		goto fail;
 
-		dout(" adding snap realm %llx seq %lld parent %llx\n",
-		     realm->ino, realm->seq, realm->parent_ino);
-		sr_rec.ino = cpu_to_le64(realm->ino);
-		sr_rec.seq = cpu_to_le64(realm->seq);
-		sr_rec.parent = cpu_to_le64(realm->parent_ino);
-		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
-		if (err)
+	/* check if all realms can be encoded into current message */
+	if (mdsc->num_snap_realms) {
+		size_t total_len =
+			recon_state.pagelist->length +
+			mdsc->num_snap_realms *
+			sizeof(struct ceph_mds_snaprealm_reconnect);
+		if (recon_state.msg_version >= 4) {
+			/* number of realms */
+			total_len += sizeof(u32);
+			/* version, compat_version and struct_len */
+			total_len += mdsc->num_snap_realms *
+				     (2 * sizeof(u8) + sizeof(u32));
+		}
+		if (total_len > RECONNECT_MAX_SIZE) {
+			if (!recon_state.allow_multi) {
+				err = -ENOSPC;
+				goto fail;
+			}
+			if (recon_state.nr_caps) {
+				err = send_reconnect_partial(&recon_state);
+				if (err)
+					goto fail;
+			}
+			recon_state.msg_version = 5;
+		}
+	}
+
+	err = encode_snap_realms(mdsc, &recon_state);
+	if (err < 0)
+		goto fail;
+
+	if (recon_state.msg_version >= 5) {
+		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+		if (err < 0)
 			goto fail;
 	}
 
-	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
-
-	/* raced with cap release? */
-	if (s_nr_caps != recon_state.nr_caps) {
-		struct page *page = list_first_entry(&pagelist->head,
-						     struct page, lru);
+	if (recon_state.nr_caps || recon_state.nr_realms) {
+		struct page *page =
+			list_first_entry(&recon_state.pagelist->head,
+					struct page, lru);
 		__le32 *addr = kmap_atomic(page);
-		*addr = cpu_to_le32(recon_state.nr_caps);
+		if (recon_state.nr_caps) {
+			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
+			*addr = cpu_to_le32(recon_state.nr_caps);
+		} else if (recon_state.msg_version >= 4) {
+			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
+		}
 		kunmap_atomic(addr);
 	}
 
-	reply->hdr.data_len = cpu_to_le32(pagelist->length);
-	ceph_msg_data_add_pagelist(reply, pagelist);
+	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+	if (recon_state.msg_version >= 4)
+		reply->hdr.compat_version = cpu_to_le16(4);
 
-	ceph_early_kick_flushing_caps(mdsc, session);
+	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
+	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
 
 	ceph_con_send(&session->s_con, reply);
 
@@ -3288,6 +4125,7 @@
 	mutex_unlock(&mdsc->mutex);
 
 	up_read(&mdsc->snap_rwsem);
+	ceph_pagelist_release(recon_state.pagelist);
 	return;
 
 fail:
@@ -3295,7 +4133,7 @@
 	up_read(&mdsc->snap_rwsem);
 	mutex_unlock(&session->s_mutex);
 fail_nomsg:
-	ceph_pagelist_release(pagelist);
+	ceph_pagelist_release(recon_state.pagelist);
 fail_nopagelist:
 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
 	return;
@@ -3319,7 +4157,7 @@
 	dout("check_new_map new %u old %u\n",
 	     newmap->m_epoch, oldmap->m_epoch);
 
-	for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
+	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
 		if (!mdsc->sessions[i])
 			continue;
 		s = mdsc->sessions[i];
@@ -3333,42 +4171,35 @@
 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
 		     ceph_session_state_name(s->s_state));
 
-		if (i >= newmap->m_num_mds ||
-		    memcmp(ceph_mdsmap_get_addr(oldmap, i),
+		if (i >= newmap->possible_max_rank) {
+			/* force close session for stopped mds */
+			ceph_get_mds_session(s);
+			__unregister_session(mdsc, s);
+			__wake_requests(mdsc, &s->s_waiting);
+			mutex_unlock(&mdsc->mutex);
+
+			mutex_lock(&s->s_mutex);
+			cleanup_session_requests(mdsc, s);
+			remove_session_caps(s);
+			mutex_unlock(&s->s_mutex);
+
+			ceph_put_mds_session(s);
+
+			mutex_lock(&mdsc->mutex);
+			kick_requests(mdsc, i);
+			continue;
+		}
+
+		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
 			   ceph_mdsmap_get_addr(newmap, i),
 			   sizeof(struct ceph_entity_addr))) {
-			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
-				/* the session never opened, just close it
-				 * out now */
-				get_session(s);
-				__unregister_session(mdsc, s);
-				__wake_requests(mdsc, &s->s_waiting);
-				ceph_put_mds_session(s);
-			} else if (i >= newmap->m_num_mds) {
-				/* force close session for stopped mds */
-				get_session(s);
-				__unregister_session(mdsc, s);
-				__wake_requests(mdsc, &s->s_waiting);
-				kick_requests(mdsc, i);
-				mutex_unlock(&mdsc->mutex);
-
-				mutex_lock(&s->s_mutex);
-				cleanup_session_requests(mdsc, s);
-				remove_session_caps(s);
-				mutex_unlock(&s->s_mutex);
-
-				ceph_put_mds_session(s);
-
-				mutex_lock(&mdsc->mutex);
-			} else {
-				/* just close it */
-				mutex_unlock(&mdsc->mutex);
-				mutex_lock(&s->s_mutex);
-				mutex_lock(&mdsc->mutex);
-				ceph_con_close(&s->s_con);
-				mutex_unlock(&s->s_mutex);
-				s->s_state = CEPH_MDS_SESSION_RESTARTING;
-			}
+			/* just close it */
+			mutex_unlock(&mdsc->mutex);
+			mutex_lock(&s->s_mutex);
+			mutex_lock(&mdsc->mutex);
+			ceph_con_close(&s->s_con);
+			mutex_unlock(&s->s_mutex);
+			s->s_state = CEPH_MDS_SESSION_RESTARTING;
 		} else if (oldstate == newstate) {
 			continue;  /* nothing new with this mds */
 		}
@@ -3392,12 +4223,16 @@
 			    oldstate != CEPH_MDS_STATE_STARTING)
 				pr_info("mds%d recovery completed\n", s->s_mds);
 			kick_requests(mdsc, i);
+			mutex_unlock(&mdsc->mutex);
+			mutex_lock(&s->s_mutex);
+			mutex_lock(&mdsc->mutex);
 			ceph_kick_flushing_caps(mdsc, s);
-			wake_up_session_caps(s, 1);
+			mutex_unlock(&s->s_mutex);
+			wake_up_session_caps(s, RECONNECT);
 		}
 	}
 
-	for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
+	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
 		s = mdsc->sessions[i];
 		if (!s)
 			continue;
@@ -3465,7 +4300,7 @@
 	     dname.len, dname.name);
 
 	mutex_lock(&session->s_mutex);
-	session->s_seq++;
+	inc_session_sequence(session);
 
 	if (!inode) {
 		dout("handle_lease no inode %llx\n", vino.ino);
@@ -3526,8 +4361,9 @@
 	ceph_con_send(&session->s_con, msg);
 
 out:
-	iput(inode);
 	mutex_unlock(&session->s_mutex);
+	/* avoid calling iput_final() in mds dispatch threads */
+	ceph_async_iput(inode);
 	return;
 
 bad:
@@ -3536,31 +4372,33 @@
 }
 
 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
-			      struct inode *inode,
 			      struct dentry *dentry, char action,
 			      u32 seq)
 {
 	struct ceph_msg *msg;
 	struct ceph_mds_lease *lease;
-	int len = sizeof(*lease) + sizeof(u32);
-	int dnamelen = 0;
+	struct inode *dir;
+	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
 
-	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
-	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
-	dnamelen = dentry->d_name.len;
-	len += dnamelen;
+	dout("lease_send_msg identry %p %s to mds%d\n",
+	     dentry, ceph_lease_op_name(action), session->s_mds);
 
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
 	if (!msg)
 		return;
 	lease = msg->front.iov_base;
 	lease->action = action;
-	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
-	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
 	lease->seq = cpu_to_le32(seq);
-	put_unaligned_le32(dnamelen, lease + 1);
-	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
 
+	spin_lock(&dentry->d_lock);
+	dir = d_inode(dentry->d_parent);
+	lease->ino = cpu_to_le64(ceph_ino(dir));
+	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
+
+	put_unaligned_le32(dentry->d_name.len, lease + 1);
+	memcpy((void *)(lease + 1) + 4,
+	       dentry->d_name.name, dentry->d_name.len);
+	spin_unlock(&dentry->d_lock);
 	/*
 	 * if this is a preemptive lease RELEASE, no need to
 	 * flush request stream, since the actual request will
@@ -3572,50 +4410,108 @@
 }
 
 /*
- * lock unlock sessions, to wait ongoing session activities
+ * lock unlock the session, to wait ongoing session activities
  */
-static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
+static void lock_unlock_session(struct ceph_mds_session *s)
 {
-	int i;
-
-	mutex_lock(&mdsc->mutex);
-	for (i = 0; i < mdsc->max_sessions; i++) {
-		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
-		if (!s)
-			continue;
-		mutex_unlock(&mdsc->mutex);
-		mutex_lock(&s->s_mutex);
-		mutex_unlock(&s->s_mutex);
-		ceph_put_mds_session(s);
-		mutex_lock(&mdsc->mutex);
-	}
-	mutex_unlock(&mdsc->mutex);
+	mutex_lock(&s->s_mutex);
+	mutex_unlock(&s->s_mutex);
 }
 
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
+{
+	struct ceph_fs_client *fsc = mdsc->fsc;
 
+	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
+		return;
+
+	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
+		return;
+
+	if (!READ_ONCE(fsc->blocklisted))
+		return;
+
+	if (fsc->last_auto_reconnect &&
+	    time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
+		return;
+
+	pr_info("auto reconnect after blocklisted\n");
+	fsc->last_auto_reconnect = jiffies;
+	ceph_force_reconnect(fsc->sb);
+}
+
+bool check_session_state(struct ceph_mds_session *s)
+{
+	switch (s->s_state) {
+	case CEPH_MDS_SESSION_OPEN:
+		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
+			s->s_state = CEPH_MDS_SESSION_HUNG;
+			pr_info("mds%d hung\n", s->s_mds);
+		}
+		break;
+	case CEPH_MDS_SESSION_CLOSING:
+		/* Should never reach this when we're unmounting */
+		WARN_ON_ONCE(s->s_ttl);
+		fallthrough;
+	case CEPH_MDS_SESSION_NEW:
+	case CEPH_MDS_SESSION_RESTARTING:
+	case CEPH_MDS_SESSION_CLOSED:
+	case CEPH_MDS_SESSION_REJECTED:
+		return false;
+	}
+
+	return true;
+}
 
 /*
- * delayed work -- periodically trim expired leases, renew caps with mds
+ * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
+ * then we need to retransmit that request.
  */
-static void schedule_delayed(struct ceph_mds_client *mdsc)
+void inc_session_sequence(struct ceph_mds_session *s)
 {
-	int delay = 5;
-	unsigned hz = round_jiffies_relative(HZ * delay);
-	schedule_delayed_work(&mdsc->delayed_work, hz);
+	lockdep_assert_held(&s->s_mutex);
+
+	s->s_seq++;
+
+	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
+		int ret;
+
+		dout("resending session close request for mds%d\n", s->s_mds);
+		ret = request_close_session(s);
+		if (ret < 0)
+			pr_err("unable to close session to mds%d: %d\n",
+			       s->s_mds, ret);
+	}
+}
+
+/*
+ * delayed work -- periodically trim expired leases, renew caps with mds.  If
+ * the @delay parameter is set to 0 or if it's more than 5 secs, the default
+ * workqueue delay value of 5 secs will be used.
+ */
+static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
+{
+	unsigned long max_delay = HZ * 5;
+
+	/* 5 secs default delay */
+	if (!delay || (delay > max_delay))
+		delay = max_delay;
+	schedule_delayed_work(&mdsc->delayed_work,
+			      round_jiffies_relative(delay));
 }
 
 static void delayed_work(struct work_struct *work)
 {
-	int i;
 	struct ceph_mds_client *mdsc =
 		container_of(work, struct ceph_mds_client, delayed_work.work);
+	unsigned long delay;
 	int renew_interval;
 	int renew_caps;
+	int i;
 
 	dout("mdsc delayed_work\n");
-	ceph_check_delayed_caps(mdsc);
 
-	if (mdsc->stopping)
+	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
 		return;
 
 	mutex_lock(&mdsc->mutex);
@@ -3629,23 +4525,8 @@
 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
 		if (!s)
 			continue;
-		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
-			dout("resending session close request for mds%d\n",
-			     s->s_mds);
-			request_close_session(mdsc, s);
-			ceph_put_mds_session(s);
-			continue;
-		}
-		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
-			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
-				s->s_state = CEPH_MDS_SESSION_HUNG;
-				pr_info("mds%d hung\n", s->s_mds);
-			}
-		}
-		if (s->s_state == CEPH_MDS_SESSION_NEW ||
-		    s->s_state == CEPH_MDS_SESSION_RESTARTING ||
-		    s->s_state == CEPH_MDS_SESSION_REJECTED) {
-			/* this mds is failed or recovering, just wait */
+
+		if (!check_session_state(s)) {
 			ceph_put_mds_session(s);
 			continue;
 		}
@@ -3666,13 +4547,22 @@
 	}
 	mutex_unlock(&mdsc->mutex);
 
-	schedule_delayed(mdsc);
+	delay = ceph_check_delayed_caps(mdsc);
+
+	ceph_queue_cap_reclaim_work(mdsc);
+
+	ceph_trim_snapid_map(mdsc);
+
+	maybe_recover_session(mdsc);
+
+	schedule_delayed(mdsc, delay);
 }
 
 int ceph_mdsc_init(struct ceph_fs_client *fsc)
 
 {
 	struct ceph_mds_client *mdsc;
+	int err;
 
 	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
 	if (!mdsc)
@@ -3681,8 +4571,8 @@
 	mutex_init(&mdsc->mutex);
 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
 	if (!mdsc->mdsmap) {
-		kfree(mdsc);
-		return -ENOMEM;
+		err = -ENOMEM;
+		goto err_mdsc;
 	}
 
 	init_completion(&mdsc->safe_umount_waiters);
@@ -3693,10 +4583,13 @@
 	mdsc->max_sessions = 0;
 	mdsc->stopping = 0;
 	atomic64_set(&mdsc->quotarealms_count, 0);
+	mdsc->quotarealms_inodes = RB_ROOT;
+	mutex_init(&mdsc->quotarealms_inodes_mutex);
 	mdsc->last_snap_seq = 0;
 	init_rwsem(&mdsc->snap_rwsem);
 	mdsc->snap_realms = RB_ROOT;
 	INIT_LIST_HEAD(&mdsc->snap_empty);
+	mdsc->num_snap_realms = 0;
 	spin_lock_init(&mdsc->snap_empty_lock);
 	mdsc->last_tid = 0;
 	mdsc->oldest_tid = 0;
@@ -3704,21 +4597,32 @@
 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
 	mdsc->last_renew_caps = jiffies;
 	INIT_LIST_HEAD(&mdsc->cap_delay_list);
+	INIT_LIST_HEAD(&mdsc->cap_wait_list);
 	spin_lock_init(&mdsc->cap_delay_lock);
 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
 	spin_lock_init(&mdsc->snap_flush_lock);
 	mdsc->last_cap_flush_tid = 1;
 	INIT_LIST_HEAD(&mdsc->cap_flush_list);
-	INIT_LIST_HEAD(&mdsc->cap_dirty);
 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
 	mdsc->num_cap_flushing = 0;
 	spin_lock_init(&mdsc->cap_dirty_lock);
 	init_waitqueue_head(&mdsc->cap_flushing_wq);
-	spin_lock_init(&mdsc->dentry_lru_lock);
-	INIT_LIST_HEAD(&mdsc->dentry_lru);
+	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+	atomic_set(&mdsc->cap_reclaim_pending, 0);
+	err = ceph_metric_init(&mdsc->metric);
+	if (err)
+		goto err_mdsmap;
+
+	spin_lock_init(&mdsc->dentry_list_lock);
+	INIT_LIST_HEAD(&mdsc->dentry_leases);
+	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
 
 	ceph_caps_init(mdsc);
-	ceph_adjust_min_caps(mdsc, fsc->min_caps);
+	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
+
+	spin_lock_init(&mdsc->snapid_map_lock);
+	mdsc->snapid_map_tree = RB_ROOT;
+	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
 
 	init_rwsem(&mdsc->pool_perm_rwsem);
 	mdsc->pool_perm_tree = RB_ROOT;
@@ -3728,6 +4632,12 @@
 
 	fsc->mdsc = mdsc;
 	return 0;
+
+err_mdsmap:
+	kfree(mdsc->mdsmap);
+err_mdsc:
+	kfree(mdsc);
+	return err;
 }
 
 /*
@@ -3752,11 +4662,36 @@
 		while ((req = __get_oldest_req(mdsc))) {
 			dout("wait_requests timed out on tid %llu\n",
 			     req->r_tid);
+			list_del_init(&req->r_wait);
 			__unregister_request(mdsc, req);
 		}
 	}
 	mutex_unlock(&mdsc->mutex);
 	dout("wait_requests done\n");
+}
+
+void send_flush_mdlog(struct ceph_mds_session *s)
+{
+	struct ceph_msg *msg;
+
+	/*
+	 * Pre-luminous MDS crashes when it sees an unknown session request
+	 */
+	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
+		return;
+
+	mutex_lock(&s->s_mutex);
+	dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
+	     ceph_session_state_name(s->s_state), s->s_seq);
+	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
+				      s->s_seq);
+	if (!msg) {
+		pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
+		       s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
+	} else {
+		ceph_con_send(&s->s_con, msg);
+	}
+	mutex_unlock(&s->s_mutex);
 }
 
 /*
@@ -3766,9 +4701,10 @@
 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
 {
 	dout("pre_umount\n");
-	mdsc->stopping = 1;
+	mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
 
-	lock_unlock_sessions(mdsc);
+	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
+	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
 	ceph_flush_dirty_caps(mdsc);
 	wait_requests(mdsc);
 
@@ -3777,6 +4713,8 @@
 	 * their inode/dcache refs
 	 */
 	ceph_msgr_flush();
+
+	ceph_cleanup_quotarealms_inodes(mdsc);
 }
 
 /*
@@ -3902,7 +4840,7 @@
 	mutex_lock(&mdsc->mutex);
 	for (i = 0; i < mdsc->max_sessions; i++) {
 		if (mdsc->sessions[i]) {
-			session = get_session(mdsc->sessions[i]);
+			session = ceph_get_mds_session(mdsc->sessions[i]);
 			__unregister_session(mdsc, session);
 			mutex_unlock(&mdsc->mutex);
 			mutex_lock(&session->s_mutex);
@@ -3915,8 +4853,10 @@
 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
 	mutex_unlock(&mdsc->mutex);
 
+	ceph_cleanup_snapid_map(mdsc);
 	ceph_cleanup_empty_realms(mdsc);
 
+	cancel_work_sync(&mdsc->cap_reclaim_work);
 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
 
 	dout("stopped\n");
@@ -3934,7 +4874,12 @@
 		session = __ceph_lookup_mds_session(mdsc, mds);
 		if (!session)
 			continue;
+
+		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
+			__unregister_session(mdsc, session);
+		__wake_requests(mdsc, &session->s_waiting);
 		mutex_unlock(&mdsc->mutex);
+
 		mutex_lock(&session->s_mutex);
 		__close_session(mdsc, session);
 		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
@@ -3943,6 +4888,7 @@
 		}
 		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
+
 		mutex_lock(&mdsc->mutex);
 		kick_requests(mdsc, mds);
 	}
@@ -3982,6 +4928,8 @@
 	ceph_msgr_flush();
 
 	ceph_mdsc_stop(mdsc);
+
+	ceph_metric_destroy(&mdsc->metric);
 
 	fsc->mdsc = NULL;
 	kfree(mdsc);
@@ -4117,7 +5065,7 @@
 			  mdsc->mdsmap->m_epoch);
 
 	mutex_unlock(&mdsc->mutex);
-	schedule_delayed(mdsc);
+	schedule_delayed(mdsc, 0);
 	return;
 
 bad_unlock:
@@ -4131,11 +5079,8 @@
 {
 	struct ceph_mds_session *s = con->private;
 
-	if (get_session(s)) {
-		dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
+	if (ceph_get_mds_session(s))
 		return con;
-	}
-	dout("mdsc con_get %p FAIL\n", s);
 	return NULL;
 }
 
@@ -4143,7 +5088,6 @@
 {
 	struct ceph_mds_session *s = con->private;
 
-	dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
 	ceph_put_mds_session(s);
 }
 

--
Gitblit v1.6.2