From a5969cabbb4660eab42b6ef0412cbbd1200cf14d Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Sat, 12 Oct 2024 07:10:09 +0000
Subject: [PATCH] 修改led为gpio

---
 kernel/fs/ceph/file.c | 1104 +++++++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 902 insertions(+), 202 deletions(-)

diff --git a/kernel/fs/ceph/file.c b/kernel/fs/ceph/file.c
index 4ce2752..943655e 100644
--- a/kernel/fs/ceph/file.c
+++ b/kernel/fs/ceph/file.c
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -9,10 +10,14 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/falloc.h>
+#include <linux/iversion.h>
+#include <linux/ktime.h>
 
 #include "super.h"
 #include "mds_client.h"
 #include "cache.h"
+#include "io.h"
+#include "metric.h"
 
 static __le32 ceph_flags_sys2wire(u32 flags)
 {
@@ -177,8 +182,7 @@
 static struct ceph_mds_request *
 prepare_open_request(struct super_block *sb, int flags, int create_mode)
 {
-	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
-	struct ceph_mds_client *mdsc = fsc->mdsc;
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb);
 	struct ceph_mds_request *req;
 	int want_auth = USE_ANY_MDS;
 	int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
@@ -199,6 +203,7 @@
 static int ceph_init_file_info(struct inode *inode, struct file *file,
 					int fmode, bool isdir)
 {
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi;
 
 	dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
@@ -208,10 +213,8 @@
 	if (isdir) {
 		struct ceph_dir_file_info *dfi =
 			kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
-		if (!dfi) {
-			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+		if (!dfi)
 			return -ENOMEM;
-		}
 
 		file->private_data = dfi;
 		fi = &dfi->file_info;
@@ -219,17 +222,18 @@
 		dfi->readdir_cache_idx = -1;
 	} else {
 		fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
-		if (!fi) {
-			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+		if (!fi)
 			return -ENOMEM;
-		}
 
 		file->private_data = fi;
 	}
 
+	ceph_get_fmode(ci, fmode, 1);
 	fi->fmode = fmode;
+
 	spin_lock_init(&fi->rw_contexts_lock);
 	INIT_LIST_HEAD(&fi->rw_contexts);
+	fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
 
 	return 0;
 }
@@ -246,17 +250,15 @@
 	case S_IFREG:
 		ceph_fscache_register_inode_cookie(inode);
 		ceph_fscache_file_set_cookie(inode, file);
+		fallthrough;
 	case S_IFDIR:
 		ret = ceph_init_file_info(inode, file, fmode,
 						S_ISDIR(inode->i_mode));
-		if (ret)
-			return ret;
 		break;
 
 	case S_IFLNK:
 		dout("init_file %p %p 0%o (symlink)\n", inode, file,
 		     inode->i_mode);
-		ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 		break;
 
 	default:
@@ -266,7 +268,6 @@
 		 * we need to drop the open ref now, since we don't
 		 * have .release set to ceph_release.
 		 */
-		ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 		BUG_ON(inode->i_fop->release == ceph_release);
 
 		/* call the proper open fop */
@@ -278,14 +279,15 @@
 /*
  * try renew caps after session gets killed.
  */
-int ceph_renew_caps(struct inode *inode)
+int ceph_renew_caps(struct inode *inode, int fmode)
 {
-	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_request *req;
 	int err, flags, wanted;
 
 	spin_lock(&ci->i_ceph_lock);
+	__ceph_touch_fmode(ci, mdsc, fmode);
 	wanted = __ceph_caps_file_wanted(ci);
 	if (__ceph_is_any_real_caps(ci) &&
 	    (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
@@ -319,7 +321,6 @@
 	req->r_inode = inode;
 	ihold(inode);
 	req->r_num_caps = 1;
-	req->r_fmode = -1;
 
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	ceph_mdsc_put_request(req);
@@ -365,9 +366,6 @@
 
 	/* trivially open snapdir */
 	if (ceph_snap(inode) == CEPH_SNAPDIR) {
-		spin_lock(&ci->i_ceph_lock);
-		__ceph_get_fmode(ci, fmode);
-		spin_unlock(&ci->i_ceph_lock);
 		return ceph_init_file(inode, file, fmode);
 	}
 
@@ -385,7 +383,7 @@
 		dout("open %p fmode %d want %s issued %s using existing\n",
 		     inode, fmode, ceph_cap_string(wanted),
 		     ceph_cap_string(issued));
-		__ceph_get_fmode(ci, fmode);
+		__ceph_touch_fmode(ci, mdsc, fmode);
 		spin_unlock(&ci->i_ceph_lock);
 
 		/* adjust wanted? */
@@ -397,7 +395,7 @@
 		return ceph_init_file(inode, file, fmode);
 	} else if (ceph_snap(inode) != CEPH_NOSNAP &&
 		   (ci->i_snap_caps & wanted) == wanted) {
-		__ceph_get_fmode(ci, fmode);
+		__ceph_touch_fmode(ci, mdsc, fmode);
 		spin_unlock(&ci->i_ceph_lock);
 		return ceph_init_file(inode, file, fmode);
 	}
@@ -423,6 +421,264 @@
 	return err;
 }
 
+/* Clone the layout from a synchronous create, if the dir now has Dc caps */
+static void
+cache_file_layout(struct inode *dst, struct inode *src)
+{
+	struct ceph_inode_info *cdst = ceph_inode(dst);
+	struct ceph_inode_info *csrc = ceph_inode(src);
+
+	spin_lock(&cdst->i_ceph_lock);
+	if ((__ceph_caps_issued(cdst, NULL) & CEPH_CAP_DIR_CREATE) &&
+	    !ceph_file_layout_is_valid(&cdst->i_cached_layout)) {
+		memcpy(&cdst->i_cached_layout, &csrc->i_layout,
+			sizeof(cdst->i_cached_layout));
+		rcu_assign_pointer(cdst->i_cached_layout.pool_ns,
+				   ceph_try_get_string(csrc->i_layout.pool_ns));
+	}
+	spin_unlock(&cdst->i_ceph_lock);
+}
+
+/*
+ * Try to set up an async create. We need caps, a file layout, and inode number,
+ * and either a lease on the dentry or complete dir info. If any of those
+ * criteria are not satisfied, then return false and the caller can go
+ * synchronous.
+ */
+static int try_prep_async_create(struct inode *dir, struct dentry *dentry,
+				 struct ceph_file_layout *lo, u64 *pino)
+{
+	struct ceph_inode_info *ci = ceph_inode(dir);
+	struct ceph_dentry_info *di = ceph_dentry(dentry);
+	int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE;
+	u64 ino;
+
+	spin_lock(&ci->i_ceph_lock);
+	/* No auth cap means no chance for Dc caps */
+	if (!ci->i_auth_cap)
+		goto no_async;
+
+	/* Any delegated inos? */
+	if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos))
+		goto no_async;
+
+	if (!ceph_file_layout_is_valid(&ci->i_cached_layout))
+		goto no_async;
+
+	if ((__ceph_caps_issued(ci, NULL) & want) != want)
+		goto no_async;
+
+	if (d_in_lookup(dentry)) {
+		if (!__ceph_dir_is_complete(ci))
+			goto no_async;
+		spin_lock(&dentry->d_lock);
+		di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
+		spin_unlock(&dentry->d_lock);
+	} else if (atomic_read(&ci->i_shared_gen) !=
+		   READ_ONCE(di->lease_shared_gen)) {
+		goto no_async;
+	}
+
+	ino = ceph_get_deleg_ino(ci->i_auth_cap->session);
+	if (!ino)
+		goto no_async;
+
+	*pino = ino;
+	ceph_take_cap_refs(ci, want, false);
+	memcpy(lo, &ci->i_cached_layout, sizeof(*lo));
+	rcu_assign_pointer(lo->pool_ns,
+			   ceph_try_get_string(ci->i_cached_layout.pool_ns));
+	got = want;
+no_async:
+	spin_unlock(&ci->i_ceph_lock);
+	return got;
+}
+
+static void restore_deleg_ino(struct inode *dir, u64 ino)
+{
+	struct ceph_inode_info *ci = ceph_inode(dir);
+	struct ceph_mds_session *s = NULL;
+
+	spin_lock(&ci->i_ceph_lock);
+	if (ci->i_auth_cap)
+		s = ceph_get_mds_session(ci->i_auth_cap->session);
+	spin_unlock(&ci->i_ceph_lock);
+	if (s) {
+		int err = ceph_restore_deleg_ino(s, ino);
+		if (err)
+			pr_warn("ceph: unable to restore delegated ino 0x%llx to session: %d\n",
+				ino, err);
+		ceph_put_mds_session(s);
+	}
+}
+
+static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
+                                 struct ceph_mds_request *req)
+{
+	int result = req->r_err ? req->r_err :
+			le32_to_cpu(req->r_reply_info.head->result);
+
+	if (result == -EJUKEBOX)
+		goto out;
+
+	mapping_set_error(req->r_parent->i_mapping, result);
+
+	if (result) {
+		struct dentry *dentry = req->r_dentry;
+		int pathlen = 0;
+		u64 base = 0;
+		char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
+						  &base, 0);
+
+		ceph_dir_clear_complete(req->r_parent);
+		if (!d_unhashed(dentry))
+			d_drop(dentry);
+
+		/* FIXME: start returning I/O errors on all accesses? */
+		pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
+			base, IS_ERR(path) ? "<<bad>>" : path, result);
+		ceph_mdsc_free_path(path, pathlen);
+	}
+
+	if (req->r_target_inode) {
+		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+		u64 ino = ceph_vino(req->r_target_inode).ino;
+
+		if (req->r_deleg_ino != ino)
+			pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n",
+				__func__, req->r_err, req->r_deleg_ino, ino);
+		mapping_set_error(req->r_target_inode->i_mapping, result);
+
+		spin_lock(&ci->i_ceph_lock);
+		if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
+			ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
+			wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
+		}
+		ceph_kick_flushing_inode_caps(req->r_session, ci);
+		spin_unlock(&ci->i_ceph_lock);
+	} else {
+		pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
+			req->r_deleg_ino);
+	}
+out:
+	ceph_mdsc_release_dir_caps(req);
+}
+
+static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
+				    struct file *file, umode_t mode,
+				    struct ceph_mds_request *req,
+				    struct ceph_acl_sec_ctx *as_ctx,
+				    struct ceph_file_layout *lo)
+{
+	int ret;
+	char xattr_buf[4];
+	struct ceph_mds_reply_inode in = { };
+	struct ceph_mds_reply_info_in iinfo = { .in = &in };
+	struct ceph_inode_info *ci = ceph_inode(dir);
+	struct inode *inode;
+	struct timespec64 now;
+	struct ceph_string *pool_ns;
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
+	struct ceph_vino vino = { .ino = req->r_deleg_ino,
+				  .snap = CEPH_NOSNAP };
+
+	ktime_get_real_ts64(&now);
+
+	inode = ceph_get_inode(dentry->d_sb, vino);
+	if (IS_ERR(inode))
+		return PTR_ERR(inode);
+
+	iinfo.inline_version = CEPH_INLINE_NONE;
+	iinfo.change_attr = 1;
+	ceph_encode_timespec64(&iinfo.btime, &now);
+
+	if (req->r_pagelist) {
+		iinfo.xattr_len = req->r_pagelist->length;
+		iinfo.xattr_data = req->r_pagelist->mapped_tail;
+	} else {
+		/* fake it */
+		iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
+		iinfo.xattr_data = xattr_buf;
+		memset(iinfo.xattr_data, 0, iinfo.xattr_len);
+	}
+
+	in.ino = cpu_to_le64(vino.ino);
+	in.snapid = cpu_to_le64(CEPH_NOSNAP);
+	in.version = cpu_to_le64(1);	// ???
+	in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE);
+	in.cap.cap_id = cpu_to_le64(1);
+	in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino);
+	in.cap.flags = CEPH_CAP_FLAG_AUTH;
+	in.ctime = in.mtime = in.atime = iinfo.btime;
+	in.truncate_seq = cpu_to_le32(1);
+	in.truncate_size = cpu_to_le64(-1ULL);
+	in.xattr_version = cpu_to_le64(1);
+	in.uid = cpu_to_le32(from_kuid(&init_user_ns, current_fsuid()));
+	if (dir->i_mode & S_ISGID) {
+		in.gid = cpu_to_le32(from_kgid(&init_user_ns, dir->i_gid));
+
+		/* Directories always inherit the setgid bit. */
+		if (S_ISDIR(mode))
+			mode |= S_ISGID;
+		else if ((mode & (S_ISGID | S_IXGRP)) == (S_ISGID | S_IXGRP) &&
+			 !in_group_p(dir->i_gid) &&
+			 !capable_wrt_inode_uidgid(dir, CAP_FSETID))
+			mode &= ~S_ISGID;
+	} else {
+		in.gid = cpu_to_le32(from_kgid(&init_user_ns, current_fsgid()));
+	}
+	in.mode = cpu_to_le32((u32)mode);
+
+	in.nlink = cpu_to_le32(1);
+	in.max_size = cpu_to_le64(lo->stripe_unit);
+
+	ceph_file_layout_to_legacy(lo, &in.layout);
+	/* lo is private, so pool_ns can't change */
+	pool_ns = rcu_dereference_raw(lo->pool_ns);
+	if (pool_ns) {
+		iinfo.pool_ns_len = pool_ns->len;
+		iinfo.pool_ns_data = pool_ns->str;
+	}
+
+	down_read(&mdsc->snap_rwsem);
+	ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
+			      req->r_fmode, NULL);
+	up_read(&mdsc->snap_rwsem);
+	if (ret) {
+		dout("%s failed to fill inode: %d\n", __func__, ret);
+		ceph_dir_clear_complete(dir);
+		if (!d_unhashed(dentry))
+			d_drop(dentry);
+		if (inode->i_state & I_NEW)
+			discard_new_inode(inode);
+	} else {
+		struct dentry *dn;
+
+		dout("%s d_adding new inode 0x%llx to 0x%llx/%s\n", __func__,
+			vino.ino, ceph_ino(dir), dentry->d_name.name);
+		ceph_dir_clear_ordered(dir);
+		ceph_init_inode_acls(inode, as_ctx);
+		if (inode->i_state & I_NEW) {
+			/*
+			 * If it's not I_NEW, then someone created this before
+			 * we got here. Assume the server is aware of it at
+			 * that point and don't worry about setting
+			 * CEPH_I_ASYNC_CREATE.
+			 */
+			ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE;
+			unlock_new_inode(inode);
+		}
+		if (d_in_lookup(dentry) || d_really_is_negative(dentry)) {
+			if (!d_unhashed(dentry))
+				d_drop(dentry);
+			dn = d_splice_alias(inode, dentry);
+			WARN_ON_ONCE(dn && dn != dentry);
+		}
+		file->f_mode |= FMODE_CREATED;
+		ret = finish_open(file, dentry, ceph_open);
+	}
+	return ret;
+}
 
 /*
  * Do a lookup + open with a single request.  If we get a non-existent
@@ -435,7 +691,8 @@
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
 	struct dentry *dn;
-	struct ceph_acls_info acls = {};
+	struct ceph_acl_sec_ctx as_ctx = {};
+	bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
 	int mask;
 	int err;
 
@@ -446,41 +703,79 @@
 	if (dentry->d_name.len > NAME_MAX)
 		return -ENAMETOOLONG;
 
+	/*
+	 * Do not truncate the file, since atomic_open is called before the
+	 * permission check. The caller will do the truncation afterward.
+	 */
+	flags &= ~O_TRUNC;
+
 	if (flags & O_CREAT) {
 		if (ceph_quota_is_max_files_exceeded(dir))
 			return -EDQUOT;
-		err = ceph_pre_init_acls(dir, &mode, &acls);
+		err = ceph_pre_init_acls(dir, &mode, &as_ctx);
 		if (err < 0)
 			return err;
+		err = ceph_security_init_secctx(dentry, mode, &as_ctx);
+		if (err < 0)
+			goto out_ctx;
+		/* Async create can't handle more than a page of xattrs */
+		if (as_ctx.pagelist &&
+		    !list_is_singular(&as_ctx.pagelist->head))
+			try_async = false;
+	} else if (!d_in_lookup(dentry)) {
+		/* If it's not being looked up, it's negative */
+		return -ENOENT;
 	}
-
+retry:
 	/* do the open */
 	req = prepare_open_request(dir->i_sb, flags, mode);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
-		goto out_acl;
+		goto out_ctx;
 	}
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
+	mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+	if (ceph_security_xattr_wanted(dir))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.open.mask = cpu_to_le32(mask);
+	req->r_parent = dir;
+
 	if (flags & O_CREAT) {
+		struct ceph_file_layout lo;
+
 		req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
-		if (acls.pagelist) {
-			req->r_pagelist = acls.pagelist;
-			acls.pagelist = NULL;
+		if (as_ctx.pagelist) {
+			req->r_pagelist = as_ctx.pagelist;
+			as_ctx.pagelist = NULL;
+		}
+		if (try_async &&
+		    (req->r_dir_caps =
+		      try_prep_async_create(dir, dentry, &lo,
+					    &req->r_deleg_ino))) {
+			set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
+			req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
+			req->r_callback = ceph_async_create_cb;
+			err = ceph_mdsc_submit_request(mdsc, dir, req);
+			if (!err) {
+				err = ceph_finish_async_create(dir, dentry,
+							file, mode, req,
+							&as_ctx, &lo);
+			} else if (err == -EJUKEBOX) {
+				restore_deleg_ino(dir, req->r_deleg_ino);
+				ceph_mdsc_put_request(req);
+				try_async = false;
+				ceph_put_string(rcu_dereference_raw(lo.pool_ns));
+				goto retry;
+			}
+			ceph_put_string(rcu_dereference_raw(lo.pool_ns));
+			goto out_req;
 		}
 	}
 
-       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
-       if (ceph_security_xattr_wanted(dir))
-               mask |= CEPH_CAP_XATTR_SHARED;
-       req->r_args.open.mask = cpu_to_le32(mask);
-
-	req->r_parent = dir;
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
-	err = ceph_mdsc_do_request(mdsc,
-				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
-				   req);
+	err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req);
 	err = ceph_handle_snapdir(req, dentry, err);
 	if (err)
 		goto out_req;
@@ -505,17 +800,18 @@
 	} else {
 		dout("atomic_open finish_open on dn %p\n", dn);
 		if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
-			ceph_init_inode_acls(d_inode(dentry), &acls);
+			struct inode *newino = d_inode(dentry);
+
+			cache_file_layout(dir, newino);
+			ceph_init_inode_acls(newino, &as_ctx);
 			file->f_mode |= FMODE_CREATED;
 		}
 		err = finish_open(file, dentry, ceph_open);
 	}
 out_req:
-	if (!req->r_err && req->r_target_inode)
-		ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
 	ceph_mdsc_put_request(req);
-out_acl:
-	ceph_release_acls_info(&acls);
+out_ctx:
+	ceph_release_acl_sec_ctx(&as_ctx);
 	dout("atomic_open result=%d\n", err);
 	return err;
 }
@@ -529,7 +825,7 @@
 		dout("release inode %p dir file %p\n", inode, file);
 		WARN_ON(!list_empty(&dfi->file_info.rw_contexts));
 
-		ceph_put_fmode(ci, dfi->file_info.fmode);
+		ceph_put_fmode(ci, dfi->file_info.fmode, 1);
 
 		if (dfi->last_readdir)
 			ceph_mdsc_put_request(dfi->last_readdir);
@@ -541,7 +837,8 @@
 		dout("release inode %p regular file %p\n", inode, file);
 		WARN_ON(!list_empty(&fi->rw_contexts));
 
-		ceph_put_fmode(ci, fi->fmode);
+		ceph_put_fmode(ci, fi->fmode, 1);
+
 		kmem_cache_free(ceph_file_cachep, fi);
 	}
 
@@ -557,90 +854,26 @@
 };
 
 /*
- * Read a range of bytes striped over one or more objects.  Iterate over
- * objects we stripe over.  (That's not atomic, but good enough for now.)
+ * Completely synchronous read and write methods.  Direct from __user
+ * buffer to osd, or directly to user pages (if O_DIRECT).
+ *
+ * If the read spans object boundary, just do multiple reads.  (That's not
+ * atomic, but good enough for now.)
  *
  * If we get a short result from the OSD, check against i_size; we need to
  * only return a short read to the caller if we hit EOF.
  */
-static int striped_read(struct inode *inode,
-			u64 pos, u64 len,
-			struct page **pages, int num_pages,
-			int page_align, int *checkeof)
-{
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	u64 this_len;
-	loff_t i_size;
-	int page_idx;
-	int ret, read = 0;
-	bool hit_stripe, was_short;
-
-	/*
-	 * we may need to do multiple reads.  not atomic, unfortunately.
-	 */
-more:
-	this_len = len;
-	page_idx = (page_align + read) >> PAGE_SHIFT;
-	ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
-				  &ci->i_layout, pos, &this_len,
-				  ci->i_truncate_seq, ci->i_truncate_size,
-				  pages + page_idx, num_pages - page_idx,
-				  ((page_align + read) & ~PAGE_MASK));
-	if (ret == -ENOENT)
-		ret = 0;
-	hit_stripe = this_len < len;
-	was_short = ret >= 0 && ret < this_len;
-	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
-	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
-
-	i_size = i_size_read(inode);
-	if (ret >= 0) {
-		if (was_short && (pos + ret < i_size)) {
-			int zlen = min(this_len - ret, i_size - pos - ret);
-			int zoff = page_align + read + ret;
-			dout(" zero gap %llu to %llu\n",
-			     pos + ret, pos + ret + zlen);
-			ceph_zero_page_vector_range(zoff, zlen, pages);
-			ret += zlen;
-		}
-
-		read += ret;
-		pos += ret;
-		len -= ret;
-
-		/* hit stripe and need continue*/
-		if (len && hit_stripe && pos < i_size)
-			goto more;
-	}
-
-	if (read > 0) {
-		ret = read;
-		/* did we bounce off eof? */
-		if (pos + len > i_size)
-			*checkeof = CHECK_EOF;
-	}
-
-	dout("striped_read returns %d\n", ret);
-	return ret;
-}
-
-/*
- * Completely synchronous read and write methods.  Direct from __user
- * buffer to osd, or directly to user pages (if O_DIRECT).
- *
- * If the read spans object boundary, just do multiple reads.
- */
 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
-			      int *checkeof)
+			      int *retry_op)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
-	struct page **pages;
-	u64 off = iocb->ki_pos;
-	int num_pages;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_osd_client *osdc = &fsc->client->osdc;
 	ssize_t ret;
-	size_t len = iov_iter_count(to);
+	u64 off = iocb->ki_pos;
+	u64 len = iov_iter_count(to);
 
 	dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
 	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,61 +886,108 @@
 	 * but it will at least behave sensibly when they are
 	 * in sequence.
 	 */
-	ret = filemap_write_and_wait_range(inode->i_mapping, off,
-						off + len);
+	ret = filemap_write_and_wait_range(inode->i_mapping,
+					   off, off + len - 1);
 	if (ret < 0)
 		return ret;
 
-	if (unlikely(to->type & ITER_PIPE)) {
+	ret = 0;
+	while ((len = iov_iter_count(to)) > 0) {
+		struct ceph_osd_request *req;
+		struct page **pages;
+		int num_pages;
 		size_t page_off;
-		ret = iov_iter_get_pages_alloc(to, &pages, len,
-					       &page_off);
-		if (ret <= 0)
-			return -ENOMEM;
-		num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+		u64 i_size;
+		bool more;
+		int idx;
+		size_t left;
 
-		ret = striped_read(inode, off, ret, pages, num_pages,
-				   page_off, checkeof);
-		if (ret > 0) {
-			iov_iter_advance(to, ret);
-			off += ret;
-		} else {
-			iov_iter_advance(to, 0);
+		req = ceph_osdc_new_request(osdc, &ci->i_layout,
+					ci->i_vino, off, &len, 0, 1,
+					CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+					NULL, ci->i_truncate_seq,
+					ci->i_truncate_size, false);
+		if (IS_ERR(req)) {
+			ret = PTR_ERR(req);
+			break;
 		}
-		ceph_put_page_vector(pages, num_pages, false);
-	} else {
+
+		more = len < iov_iter_count(to);
+
 		num_pages = calc_pages_for(off, len);
+		page_off = off & ~PAGE_MASK;
 		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-		if (IS_ERR(pages))
-			return PTR_ERR(pages);
+		if (IS_ERR(pages)) {
+			ceph_osdc_put_request(req);
+			ret = PTR_ERR(pages);
+			break;
+		}
 
-		ret = striped_read(inode, off, len, pages, num_pages,
-				   (off & ~PAGE_MASK), checkeof);
-		if (ret > 0) {
-			int l, k = 0;
-			size_t left = ret;
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
+						 false, false);
+		ret = ceph_osdc_start_request(osdc, req, false);
+		if (!ret)
+			ret = ceph_osdc_wait_request(osdc, req);
 
-			while (left) {
-				size_t page_off = off & ~PAGE_MASK;
-				size_t copy = min_t(size_t, left,
-						    PAGE_SIZE - page_off);
-				l = copy_page_to_iter(pages[k++], page_off,
-						      copy, to);
-				off += l;
-				left -= l;
-				if (l < copy)
-					break;
+		ceph_update_read_latency(&fsc->mdsc->metric,
+					 req->r_start_latency,
+					 req->r_end_latency,
+					 ret);
+
+		ceph_osdc_put_request(req);
+
+		i_size = i_size_read(inode);
+		dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
+		     off, len, ret, i_size, (more ? " MORE" : ""));
+
+		if (ret == -ENOENT)
+			ret = 0;
+		if (ret >= 0 && ret < len && (off + ret < i_size)) {
+			int zlen = min(len - ret, i_size - off - ret);
+			int zoff = page_off + ret;
+			dout("sync_read zero gap %llu~%llu\n",
+                             off + ret, off + ret + zlen);
+			ceph_zero_page_vector_range(zoff, zlen, pages);
+			ret += zlen;
+		}
+
+		idx = 0;
+		left = ret > 0 ? ret : 0;
+		while (left > 0) {
+			size_t len, copied;
+			page_off = off & ~PAGE_MASK;
+			len = min_t(size_t, left, PAGE_SIZE - page_off);
+			SetPageUptodate(pages[idx]);
+			copied = copy_page_to_iter(pages[idx++],
+						   page_off, len, to);
+			off += copied;
+			left -= copied;
+			if (copied < len) {
+				ret = -EFAULT;
+				break;
 			}
 		}
 		ceph_release_page_vector(pages, num_pages);
+
+		if (ret < 0) {
+			if (ret == -EBLOCKLISTED)
+				fsc->blocklisted = true;
+			break;
+		}
+
+		if (off >= i_size || !more)
+			break;
 	}
 
 	if (off > iocb->ki_pos) {
+		if (ret >= 0 &&
+		    iov_iter_count(to) > 0 && off >= i_size_read(inode))
+			*retry_op = CHECK_EOF;
 		ret = off - iocb->ki_pos;
 		iocb->ki_pos = off;
 	}
 
-	dout("sync_read result %zd\n", ret);
+	dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
 	return ret;
 }
 
@@ -739,6 +1019,9 @@
 
 	if (!atomic_dec_and_test(&aio_req->pending_reqs))
 		return;
+
+	if (aio_req->iocb->ki_flags & IOCB_DIRECT)
+		inode_dio_end(inode);
 
 	ret = aio_req->error;
 	if (!ret)
@@ -780,12 +1063,23 @@
 	struct inode *inode = req->r_inode;
 	struct ceph_aio_request *aio_req = req->r_priv;
 	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
 
 	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
 	BUG_ON(!osd_data->num_bvecs);
 
 	dout("ceph_aio_complete_req %p rc %d bytes %u\n",
 	     inode, rc, osd_data->bvec_pos.iter.bi_size);
+
+	/* r_start_latency == 0 means the request was not submitted */
+	if (req->r_start_latency) {
+		if (aio_req->write)
+			ceph_update_write_latency(metric, req->r_start_latency,
+						  req->r_end_latency, rc);
+		else
+			ceph_update_read_latency(metric, req->r_start_latency,
+						 req->r_end_latency, rc);
+	}
 
 	if (rc == -EOLDSNAPC) {
 		struct ceph_aio_work *aio_work;
@@ -795,7 +1089,7 @@
 		if (aio_work) {
 			INIT_WORK(&aio_work->work, ceph_aio_retry_work);
 			aio_work->req = req;
-			queue_work(ceph_inode_to_client(inode)->wb_wq,
+			queue_work(ceph_inode_to_client(inode)->inode_wq,
 				   &aio_work->work);
 			return;
 		}
@@ -821,7 +1115,7 @@
 				aio_req->total_len = rc + zlen;
 			}
 
-			iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs,
+			iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
 				      osd_data->num_bvecs,
 				      osd_data->bvec_pos.iter.bi_size);
 			iov_iter_advance(&i, rc);
@@ -865,7 +1159,7 @@
 	}
 	spin_unlock(&ci->i_ceph_lock);
 
-	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
 			false, GFP_NOFS);
 	if (!req) {
 		ret = -ENOMEM;
@@ -877,17 +1171,17 @@
 	ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
 	ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 
+	req->r_ops[0] = orig_req->r_ops[0];
+
+	req->r_mtime = aio_req->mtime;
+	req->r_data_offset = req->r_ops[0].extent.offset;
+
 	ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
 	if (ret) {
 		ceph_osdc_put_request(req);
 		req = orig_req;
 		goto out;
 	}
-
-	req->r_ops[0] = orig_req->r_ops[0];
-
-	req->r_mtime = aio_req->mtime;
-	req->r_data_offset = req->r_ops[0].extent.offset;
 
 	ceph_osdc_put_request(orig_req);
 
@@ -915,13 +1209,14 @@
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_client_metric *metric = &fsc->mdsc->metric;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct bio_vec *bvecs;
 	struct ceph_aio_request *aio_req = NULL;
 	int num_pages = 0;
 	int flags;
-	int ret;
+	int ret = 0;
 	struct timespec64 mtime = current_time(inode);
 	size_t count = iov_iter_count(iter);
 	loff_t pos = iocb->ki_pos;
@@ -933,16 +1228,12 @@
 
 	dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
 	     (write ? "write" : "read"), file, pos, (unsigned)count,
-	     snapc, snapc->seq);
-
-	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
-	if (ret < 0)
-		return ret;
+	     snapc, snapc ? snapc->seq : 0);
 
 	if (write) {
 		int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
 					pos >> PAGE_SHIFT,
-					(pos + count) >> PAGE_SHIFT);
+					(pos + count - 1) >> PAGE_SHIFT);
 		if (ret2 < 0)
 			dout("invalidate_inode_pages2_range returned %d\n", ret2);
 
@@ -1010,7 +1301,7 @@
 			 * may block.
 			 */
 			truncate_inode_pages_range(inode->i_mapping, pos,
-					(pos+len) | (PAGE_SIZE - 1));
+						   PAGE_ALIGN(pos + len) - 1);
 
 			req->r_mtime = mtime;
 		}
@@ -1025,7 +1316,7 @@
 			req->r_callback = ceph_aio_complete_req;
 			req->r_inode = inode;
 			req->r_priv = aio_req;
-			list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
+			list_add_tail(&req->r_private_item, &aio_req->osd_reqs);
 
 			pos += len;
 			continue;
@@ -1034,6 +1325,13 @@
 		ret = ceph_osdc_start_request(req->r_osdc, req, false);
 		if (!ret)
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+		if (write)
+			ceph_update_write_latency(metric, req->r_start_latency,
+						  req->r_end_latency, ret);
+		else
+			ceph_update_read_latency(metric, req->r_start_latency,
+						 req->r_end_latency, ret);
 
 		size = i_size_read(inode);
 		if (!write) {
@@ -1044,8 +1342,7 @@
 				int zlen = min_t(size_t, len - ret,
 						 size - pos - ret);
 
-				iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages,
-					      len);
+				iov_iter_bvec(&i, READ, bvecs, num_pages, len);
 				iov_iter_advance(&i, ret);
 				iov_iter_zero(zlen, &i);
 				ret += zlen;
@@ -1083,11 +1380,12 @@
 					      CEPH_CAP_FILE_RD);
 
 		list_splice(&aio_req->osd_reqs, &osd_reqs);
+		inode_dio_begin(inode);
 		while (!list_empty(&osd_reqs)) {
 			req = list_first_entry(&osd_reqs,
 					       struct ceph_osd_request,
-					       r_unsafe_item);
-			list_del_init(&req->r_unsafe_item);
+					       r_private_item);
+			list_del_init(&req->r_private_item);
 			if (ret >= 0)
 				ret = ceph_osdc_start_request(req->r_osdc,
 							      req, false);
@@ -1139,13 +1437,14 @@
 	dout("sync_write on file %p %lld~%u snapc %p seq %lld\n",
 	     file, pos, (unsigned)count, snapc, snapc->seq);
 
-	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
+	ret = filemap_write_and_wait_range(inode->i_mapping,
+					   pos, pos + count - 1);
 	if (ret < 0)
 		return ret;
 
 	ret = invalidate_inode_pages2_range(inode->i_mapping,
 					    pos >> PAGE_SHIFT,
-					    (pos + count) >> PAGE_SHIFT);
+					    (pos + count - 1) >> PAGE_SHIFT);
 	if (ret < 0)
 		dout("invalidate_inode_pages2_range returned %d\n", ret);
 
@@ -1205,6 +1504,8 @@
 		if (!ret)
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
+		ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+					  req->r_end_latency, ret);
 out:
 		ceph_osdc_put_request(req);
 		if (ret != 0) {
@@ -1247,6 +1548,7 @@
 	struct inode *inode = file_inode(filp);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct page *pinned_page = NULL;
+	bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
 	ssize_t ret;
 	int want, got = 0;
 	int retry_op = 0, read = 0;
@@ -1255,13 +1557,24 @@
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
 	     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
 
+	if (direct_lock)
+		ceph_start_io_direct(inode);
+	else
+		ceph_start_io_read(inode);
+
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
 		want = CEPH_CAP_FILE_CACHE;
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
-	if (ret < 0)
+	ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
+			    &got, &pinned_page);
+	if (ret < 0) {
+		if (iocb->ki_flags & IOCB_DIRECT)
+			ceph_end_io_direct(inode);
+		else
+			ceph_end_io_read(inode);
 		return ret;
+	}
 
 	if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_flags & IOCB_DIRECT) ||
@@ -1292,6 +1605,7 @@
 		ret = generic_file_read_iter(iocb, to);
 		ceph_del_rw_context(fi, &rw_ctx);
 	}
+
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
 	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
 	if (pinned_page) {
@@ -1299,6 +1613,12 @@
 		pinned_page = NULL;
 	}
 	ceph_put_cap_refs(ci, got);
+
+	if (direct_lock)
+		ceph_end_io_direct(inode);
+	else
+		ceph_end_io_read(inode);
+
 	if (retry_op > HAVE_RETRIED && ret >= 0) {
 		int statret;
 		struct page *page = NULL;
@@ -1388,6 +1708,7 @@
 	struct ceph_cap_flush *prealloc_cf;
 	ssize_t count, written = 0;
 	int err, want, got;
+	bool direct_lock = false;
 	u32 map_flags;
 	u64 pool_flags;
 	loff_t pos;
@@ -1400,8 +1721,14 @@
 	if (!prealloc_cf)
 		return -ENOMEM;
 
+	if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT)
+		direct_lock = true;
+
 retry_snap:
-	inode_lock(inode);
+	if (direct_lock)
+		ceph_start_io_direct(inode);
+	else
+		ceph_start_io_write(inode);
 
 	/* We can write back this queue in page reclaim */
 	current->backing_dev_info = inode_to_bdi(inode);
@@ -1430,20 +1757,6 @@
 		goto out;
 	}
 
-	err = file_remove_privs(file);
-	if (err)
-		goto out;
-
-	err = file_update_time(file);
-	if (err)
-		goto out;
-
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		err = ceph_uninline_data(file, NULL);
-		if (err < 0)
-			goto out;
-	}
-
 	down_read(&osdc->lock);
 	map_flags = osdc->osdmap->flags;
 	pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
@@ -1454,6 +1767,16 @@
 		goto out;
 	}
 
+	err = file_remove_privs(file);
+	if (err)
+		goto out;
+
+	if (ci->i_inline_version != CEPH_INLINE_NONE) {
+		err = ceph_uninline_data(file, NULL);
+		if (err < 0)
+			goto out;
+	}
+
 	dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
 	     inode, ceph_vinop(inode), pos, count, i_size_read(inode));
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
@@ -1461,10 +1784,16 @@
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 	got = 0;
-	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+	err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
 			    &got, NULL);
 	if (err < 0)
 		goto out;
+
+	err = file_update_time(file);
+	if (err)
+		goto out_caps;
+
+	inode_inc_iversion_raw(inode);
 
 	dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
 	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
@@ -1474,7 +1803,6 @@
 	    (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
 		struct ceph_snap_context *snapc;
 		struct iov_iter data;
-		inode_unlock(inode);
 
 		spin_lock(&ci->i_ceph_lock);
 		if (__ceph_have_pending_cap_snap(ci)) {
@@ -1496,6 +1824,10 @@
 							 &prealloc_cf);
 		else
 			written = ceph_sync_write(iocb, &data, pos, snapc);
+		if (direct_lock)
+			ceph_end_io_direct(inode);
+		else
+			ceph_end_io_write(inode);
 		if (written > 0)
 			iov_iter_advance(from, written);
 		ceph_put_snap_context(snapc);
@@ -1510,7 +1842,7 @@
 		written = generic_perform_write(file, from, pos);
 		if (likely(written >= 0))
 			iocb->ki_pos = pos + written;
-		inode_unlock(inode);
+		ceph_end_io_write(inode);
 	}
 
 	if (written >= 0) {
@@ -1524,7 +1856,7 @@
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
 		if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
-			ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
+			ceph_check_caps(ci, 0, NULL);
 	}
 
 	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
@@ -1546,9 +1878,13 @@
 	}
 
 	goto out_unlocked;
-
+out_caps:
+	ceph_put_cap_refs(ci, got);
 out:
-	inode_unlock(inode);
+	if (direct_lock)
+		ceph_end_io_direct(inode);
+	else
+		ceph_end_io_write(inode);
 out_unlocked:
 	ceph_free_cap_flush(prealloc_cf);
 	current->backing_dev_info = NULL;
@@ -1786,7 +2122,7 @@
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
+	ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
 	if (ret < 0)
 		goto unlock;
 
@@ -1810,6 +2146,370 @@
 	return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct file *src_filp, int *src_got,
+			  struct file *dst_filp,
+			  loff_t dst_endoff, int *dst_got)
+{
+	int ret = 0;
+	bool retrying = false;
+
+retry_caps:
+	ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+			    dst_endoff, dst_got, NULL);
+	if (ret < 0)
+		return ret;
+
+	/*
+	 * Since we're already holding the FILE_WR capability for the dst file,
+	 * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+	 * retry dance instead to try to get both capabilities.
+	 */
+	ret = ceph_try_get_caps(file_inode(src_filp),
+				CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+				false, src_got);
+	if (ret <= 0) {
+		/* Start by dropping dst_ci caps and getting src_ci caps */
+		ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got);
+		if (retrying) {
+			if (!ret)
+				/* ceph_try_get_caps masks EAGAIN */
+				ret = -EAGAIN;
+			return ret;
+		}
+		ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
+		if (ret < 0)
+			return ret;
+		/*... drop src_ci caps too, and retry */
+		ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got);
+		retrying = true;
+		goto retry_caps;
+	}
+	return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+			   struct ceph_inode_info *dst_ci, int dst_got)
+{
+	ceph_put_cap_refs(src_ci, src_got);
+	ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is not OK (inode_newsize_ok())
+ *  - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+			   loff_t src_off, loff_t dst_off, size_t len)
+{
+	loff_t size, endoff;
+
+	size = i_size_read(src_inode);
+	/*
+	 * Don't copy beyond source file EOF.  Instead of simply setting length
+	 * to (size - src_off), just drop to VFS default implementation, as the
+	 * local i_size may be stale due to other clients writing to the source
+	 * inode.
+	 */
+	if (src_off + len > size) {
+		dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+		     src_off, len, size);
+		return -EOPNOTSUPP;
+	}
+	size = i_size_read(dst_inode);
+
+	endoff = dst_off + len;
+	if (inode_newsize_ok(dst_inode, endoff))
+		return -EOPNOTSUPP;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+		return -EDQUOT;
+
+	return 0;
+}
+
+static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off,
+				    struct ceph_inode_info *dst_ci, u64 *dst_off,
+				    struct ceph_fs_client *fsc,
+				    size_t len, unsigned int flags)
+{
+	struct ceph_object_locator src_oloc, dst_oloc;
+	struct ceph_object_id src_oid, dst_oid;
+	size_t bytes = 0;
+	u64 src_objnum, src_objoff, dst_objnum, dst_objoff;
+	u32 src_objlen, dst_objlen;
+	u32 object_size = src_ci->i_layout.object_size;
+	int ret;
+
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+	while (len >= object_size) {
+		ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off,
+					      object_size, &src_objnum,
+					      &src_objoff, &src_objlen);
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, *dst_off,
+					      object_size, &dst_objnum,
+					      &dst_objoff, &dst_objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, src_objnum);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, dst_objnum);
+		/* Do an object remote copy */
+		ret = ceph_osdc_copy_from(&fsc->client->osdc,
+					  src_ci->i_vino.snap, 0,
+					  &src_oid, &src_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+					  CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+					  &dst_oid, &dst_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+					  CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
+					  dst_ci->i_truncate_seq,
+					  dst_ci->i_truncate_size,
+					  CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
+		if (ret) {
+			if (ret == -EOPNOTSUPP) {
+				fsc->have_copy_from2 = false;
+				pr_notice("OSDs don't support copy-from2; disabling copy offload\n");
+			}
+			dout("ceph_osdc_copy_from returned %d\n", ret);
+			if (!bytes)
+				bytes = ret;
+			goto out;
+		}
+		len -= object_size;
+		bytes += object_size;
+		*src_off += object_size;
+		*dst_off += object_size;
+	}
+
+out:
+	ceph_oloc_destroy(&src_oloc);
+	ceph_oloc_destroy(&dst_oloc);
+	return bytes;
+}
+
+static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				      struct file *dst_file, loff_t dst_off,
+				      size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
+	loff_t size;
+	ssize_t ret = -EIO, bytes;
+	u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+	u32 src_objlen, dst_objlen;
+	int src_got = 0, dst_got = 0, err, dirty;
+
+	if (src_inode->i_sb != dst_inode->i_sb) {
+		struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
+
+		if (ceph_fsid_compare(&src_fsc->client->fsid,
+				      &dst_fsc->client->fsid)) {
+			dout("Copying files across clusters: src: %pU dst: %pU\n",
+			     &src_fsc->client->fsid, &dst_fsc->client->fsid);
+			return -EXDEV;
+		}
+	}
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	/*
+	 * Some of the checks below will return -EOPNOTSUPP, which will force a
+	 * fallback to the default VFS copy_file_range implementation.  This is
+	 * desirable in several cases (for ex, the 'len' is smaller than the
+	 * size of the objects, or in cases where that would be more
+	 * efficient).
+	 */
+
+	if (ceph_test_mount_opt(src_fsc, NOCOPYFROM))
+		return -EOPNOTSUPP;
+
+	if (!src_fsc->have_copy_from2)
+		return -EOPNOTSUPP;
+
+	/*
+	 * Striped file layouts require that we copy partial objects, but the
+	 * OSD copy-from operation only supports full-object copies.  Limit
+	 * this to non-striped file layouts for now.
+	 */
+	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+	    (src_ci->i_layout.stripe_count != 1) ||
+	    (dst_ci->i_layout.stripe_count != 1) ||
+	    (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) {
+		dout("Invalid src/dst files layout\n");
+		return -EOPNOTSUPP;
+	}
+
+	if (len < src_ci->i_layout.object_size)
+		return -EOPNOTSUPP; /* no remote copy will be done */
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source and destination files */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0) {
+		dout("failed to write src file (%zd)\n", ret);
+		goto out;
+	}
+	ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len));
+	if (ret < 0) {
+		dout("failed to write dst file (%zd)\n", ret);
+		goto out;
+	}
+
+	/*
+	 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+	 * clients may have dirty data in their caches.  And OSDs know nothing
+	 * about caps, so they can't safely do the remote object copies.
+	 */
+	err = get_rd_wr_caps(src_file, &src_got,
+			     dst_file, (dst_off + len), &dst_got);
+	if (err < 0) {
+		dout("get_rd_wr_caps returned %d\n", err);
+		ret = -EOPNOTSUPP;
+		goto out;
+	}
+
+	ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+	if (ret < 0)
+		goto out_caps;
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    (dst_off + len) >> PAGE_SHIFT);
+	if (ret < 0) {
+		dout("Failed to invalidate inode pages (%zd)\n", ret);
+		ret = 0; /* XXX */
+	}
+	ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+				      src_ci->i_layout.object_size,
+				      &src_objnum, &src_objoff, &src_objlen);
+	ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+				      dst_ci->i_layout.object_size,
+				      &dst_objnum, &dst_objoff, &dst_objlen);
+	/* object-level offsets need to the same */
+	if (src_objoff != dst_objoff) {
+		ret = -EOPNOTSUPP;
+		goto out_caps;
+	}
+
+	/*
+	 * Do a manual copy if the object offset isn't object aligned.
+	 * 'src_objlen' contains the bytes left until the end of the object,
+	 * starting at the src_off
+	 */
+	if (src_objoff) {
+		dout("Initial partial copy of %u bytes\n", src_objlen);
+
+		/*
+		 * we need to temporarily drop all caps as we'll be calling
+		 * {read,write}_iter, which will get caps again.
+		 */
+		put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+		ret = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, src_objlen, flags);
+		/* Abort on short copies or on error */
+		if (ret < src_objlen) {
+			dout("Failed partial copy (%zd)\n", ret);
+			goto out;
+		}
+		len -= ret;
+		err = get_rd_wr_caps(src_file, &src_got,
+				     dst_file, (dst_off + len), &dst_got);
+		if (err < 0)
+			goto out;
+		err = is_file_size_ok(src_inode, dst_inode,
+				      src_off, dst_off, len);
+		if (err < 0)
+			goto out_caps;
+	}
+
+	size = i_size_read(dst_inode);
+	bytes = ceph_do_objects_copy(src_ci, &src_off, dst_ci, &dst_off,
+				     src_fsc, len, flags);
+	if (bytes <= 0) {
+		if (!ret)
+			ret = bytes;
+		goto out_caps;
+	}
+	dout("Copied %zu bytes out of %zu\n", bytes, len);
+	len -= bytes;
+	ret += bytes;
+
+	file_update_time(dst_file);
+	inode_inc_iversion_raw(dst_inode);
+
+	if (dst_off > size) {
+		/* Let the MDS know about dst file size change */
+		if (ceph_inode_set_size(dst_inode, dst_off) ||
+		    ceph_quota_is_max_bytes_approaching(dst_inode, dst_off))
+			ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY, NULL);
+	}
+	/* Mark Fw dirty */
+	spin_lock(&dst_ci->i_ceph_lock);
+	dst_ci->i_inline_version = CEPH_INLINE_NONE;
+	dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+	spin_unlock(&dst_ci->i_ceph_lock);
+	if (dirty)
+		__mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+	put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+	/*
+	 * Do the final manual copy if we still have some bytes left, unless
+	 * there were errors in remote object copies (len >= object_size).
+	 */
+	if (len && (len < src_ci->i_layout.object_size)) {
+		dout("Final partial copy of %zu bytes\n", len);
+		bytes = do_splice_direct(src_file, &src_off, dst_file,
+					 &dst_off, len, flags);
+		if (bytes > 0)
+			ret += bytes;
+		else
+			dout("Failed partial copy (%zd)\n", bytes);
+	}
+
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	ssize_t ret;
+
+	ret = __ceph_copy_file_range(src_file, src_off, dst_file, dst_off,
+				     len, flags);
+
+	if (ret == -EOPNOTSUPP || ret == -EXDEV)
+		ret = generic_copy_file_range(src_file, src_off, dst_file,
+					      dst_off, len, flags);
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1824,7 +2524,7 @@
 	.splice_read = generic_file_splice_read,
 	.splice_write = iter_file_splice_write,
 	.unlocked_ioctl = ceph_ioctl,
-	.compat_ioctl	= ceph_ioctl,
+	.compat_ioctl = compat_ptr_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };
-

--
Gitblit v1.6.2