From a5969cabbb4660eab42b6ef0412cbbd1200cf14d Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Sat, 12 Oct 2024 07:10:09 +0000 Subject: [PATCH] 修改led为gpio --- kernel/fs/ceph/file.c | 1104 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 902 insertions(+), 202 deletions(-) diff --git a/kernel/fs/ceph/file.c b/kernel/fs/ceph/file.c index 4ce2752..943655e 100644 --- a/kernel/fs/ceph/file.c +++ b/kernel/fs/ceph/file.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 #include <linux/ceph/ceph_debug.h> +#include <linux/ceph/striper.h> #include <linux/module.h> #include <linux/sched.h> @@ -9,10 +10,14 @@ #include <linux/namei.h> #include <linux/writeback.h> #include <linux/falloc.h> +#include <linux/iversion.h> +#include <linux/ktime.h> #include "super.h" #include "mds_client.h" #include "cache.h" +#include "io.h" +#include "metric.h" static __le32 ceph_flags_sys2wire(u32 flags) { @@ -177,8 +182,7 @@ static struct ceph_mds_request * prepare_open_request(struct super_block *sb, int flags, int create_mode) { - struct ceph_fs_client *fsc = ceph_sb_to_client(sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb); struct ceph_mds_request *req; int want_auth = USE_ANY_MDS; int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN; @@ -199,6 +203,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file, int fmode, bool isdir) { + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi; dout("%s %p %p 0%o (%s)\n", __func__, inode, file, @@ -208,10 +213,8 @@ if (isdir) { struct ceph_dir_file_info *dfi = kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL); - if (!dfi) { - ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ + if (!dfi) return -ENOMEM; - } file->private_data = dfi; fi = &dfi->file_info; @@ -219,17 +222,18 @@ dfi->readdir_cache_idx = -1; } else { fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL); - if (!fi) { - ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ + if (!fi) return -ENOMEM; - } file->private_data = fi; } + ceph_get_fmode(ci, fmode, 1); fi->fmode = fmode; + spin_lock_init(&fi->rw_contexts_lock); INIT_LIST_HEAD(&fi->rw_contexts); + fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen); return 0; } @@ -246,17 +250,15 @@ case S_IFREG: ceph_fscache_register_inode_cookie(inode); ceph_fscache_file_set_cookie(inode, file); + fallthrough; case S_IFDIR: ret = ceph_init_file_info(inode, file, fmode, S_ISDIR(inode->i_mode)); - if (ret) - return ret; break; case S_IFLNK: dout("init_file %p %p 0%o (symlink)\n", inode, file, inode->i_mode); - ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ break; default: @@ -266,7 +268,6 @@ * we need to drop the open ref now, since we don't * have .release set to ceph_release. */ - ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ BUG_ON(inode->i_fop->release == ceph_release); /* call the proper open fop */ @@ -278,14 +279,15 @@ /* * try renew caps after session gets killed. */ -int ceph_renew_caps(struct inode *inode) +int ceph_renew_caps(struct inode *inode, int fmode) { - struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_request *req; int err, flags, wanted; spin_lock(&ci->i_ceph_lock); + __ceph_touch_fmode(ci, mdsc, fmode); wanted = __ceph_caps_file_wanted(ci); if (__ceph_is_any_real_caps(ci) && (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) { @@ -319,7 +321,6 @@ req->r_inode = inode; ihold(inode); req->r_num_caps = 1; - req->r_fmode = -1; err = ceph_mdsc_do_request(mdsc, NULL, req); ceph_mdsc_put_request(req); @@ -365,9 +366,6 @@ /* trivially open snapdir */ if (ceph_snap(inode) == CEPH_SNAPDIR) { - spin_lock(&ci->i_ceph_lock); - __ceph_get_fmode(ci, fmode); - spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } @@ -385,7 +383,7 @@ dout("open %p fmode %d want %s issued %s using existing\n", inode, fmode, ceph_cap_string(wanted), ceph_cap_string(issued)); - __ceph_get_fmode(ci, fmode); + __ceph_touch_fmode(ci, mdsc, fmode); spin_unlock(&ci->i_ceph_lock); /* adjust wanted? */ @@ -397,7 +395,7 @@ return ceph_init_file(inode, file, fmode); } else if (ceph_snap(inode) != CEPH_NOSNAP && (ci->i_snap_caps & wanted) == wanted) { - __ceph_get_fmode(ci, fmode); + __ceph_touch_fmode(ci, mdsc, fmode); spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } @@ -423,6 +421,264 @@ return err; } +/* Clone the layout from a synchronous create, if the dir now has Dc caps */ +static void +cache_file_layout(struct inode *dst, struct inode *src) +{ + struct ceph_inode_info *cdst = ceph_inode(dst); + struct ceph_inode_info *csrc = ceph_inode(src); + + spin_lock(&cdst->i_ceph_lock); + if ((__ceph_caps_issued(cdst, NULL) & CEPH_CAP_DIR_CREATE) && + !ceph_file_layout_is_valid(&cdst->i_cached_layout)) { + memcpy(&cdst->i_cached_layout, &csrc->i_layout, + sizeof(cdst->i_cached_layout)); + rcu_assign_pointer(cdst->i_cached_layout.pool_ns, + ceph_try_get_string(csrc->i_layout.pool_ns)); + } + spin_unlock(&cdst->i_ceph_lock); +} + +/* + * Try to set up an async create. We need caps, a file layout, and inode number, + * and either a lease on the dentry or complete dir info. If any of those + * criteria are not satisfied, then return false and the caller can go + * synchronous. + */ +static int try_prep_async_create(struct inode *dir, struct dentry *dentry, + struct ceph_file_layout *lo, u64 *pino) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + struct ceph_dentry_info *di = ceph_dentry(dentry); + int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE; + u64 ino; + + spin_lock(&ci->i_ceph_lock); + /* No auth cap means no chance for Dc caps */ + if (!ci->i_auth_cap) + goto no_async; + + /* Any delegated inos? */ + if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos)) + goto no_async; + + if (!ceph_file_layout_is_valid(&ci->i_cached_layout)) + goto no_async; + + if ((__ceph_caps_issued(ci, NULL) & want) != want) + goto no_async; + + if (d_in_lookup(dentry)) { + if (!__ceph_dir_is_complete(ci)) + goto no_async; + spin_lock(&dentry->d_lock); + di->lease_shared_gen = atomic_read(&ci->i_shared_gen); + spin_unlock(&dentry->d_lock); + } else if (atomic_read(&ci->i_shared_gen) != + READ_ONCE(di->lease_shared_gen)) { + goto no_async; + } + + ino = ceph_get_deleg_ino(ci->i_auth_cap->session); + if (!ino) + goto no_async; + + *pino = ino; + ceph_take_cap_refs(ci, want, false); + memcpy(lo, &ci->i_cached_layout, sizeof(*lo)); + rcu_assign_pointer(lo->pool_ns, + ceph_try_get_string(ci->i_cached_layout.pool_ns)); + got = want; +no_async: + spin_unlock(&ci->i_ceph_lock); + return got; +} + +static void restore_deleg_ino(struct inode *dir, u64 ino) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + struct ceph_mds_session *s = NULL; + + spin_lock(&ci->i_ceph_lock); + if (ci->i_auth_cap) + s = ceph_get_mds_session(ci->i_auth_cap->session); + spin_unlock(&ci->i_ceph_lock); + if (s) { + int err = ceph_restore_deleg_ino(s, ino); + if (err) + pr_warn("ceph: unable to restore delegated ino 0x%llx to session: %d\n", + ino, err); + ceph_put_mds_session(s); + } +} + +static void ceph_async_create_cb(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + int result = req->r_err ? req->r_err : + le32_to_cpu(req->r_reply_info.head->result); + + if (result == -EJUKEBOX) + goto out; + + mapping_set_error(req->r_parent->i_mapping, result); + + if (result) { + struct dentry *dentry = req->r_dentry; + int pathlen = 0; + u64 base = 0; + char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, + &base, 0); + + ceph_dir_clear_complete(req->r_parent); + if (!d_unhashed(dentry)) + d_drop(dentry); + + /* FIXME: start returning I/O errors on all accesses? */ + pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n", + base, IS_ERR(path) ? "<<bad>>" : path, result); + ceph_mdsc_free_path(path, pathlen); + } + + if (req->r_target_inode) { + struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); + u64 ino = ceph_vino(req->r_target_inode).ino; + + if (req->r_deleg_ino != ino) + pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n", + __func__, req->r_err, req->r_deleg_ino, ino); + mapping_set_error(req->r_target_inode->i_mapping, result); + + spin_lock(&ci->i_ceph_lock); + if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) { + ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE; + wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT); + } + ceph_kick_flushing_inode_caps(req->r_session, ci); + spin_unlock(&ci->i_ceph_lock); + } else { + pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__, + req->r_deleg_ino); + } +out: + ceph_mdsc_release_dir_caps(req); +} + +static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry, + struct file *file, umode_t mode, + struct ceph_mds_request *req, + struct ceph_acl_sec_ctx *as_ctx, + struct ceph_file_layout *lo) +{ + int ret; + char xattr_buf[4]; + struct ceph_mds_reply_inode in = { }; + struct ceph_mds_reply_info_in iinfo = { .in = &in }; + struct ceph_inode_info *ci = ceph_inode(dir); + struct inode *inode; + struct timespec64 now; + struct ceph_string *pool_ns; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); + struct ceph_vino vino = { .ino = req->r_deleg_ino, + .snap = CEPH_NOSNAP }; + + ktime_get_real_ts64(&now); + + inode = ceph_get_inode(dentry->d_sb, vino); + if (IS_ERR(inode)) + return PTR_ERR(inode); + + iinfo.inline_version = CEPH_INLINE_NONE; + iinfo.change_attr = 1; + ceph_encode_timespec64(&iinfo.btime, &now); + + if (req->r_pagelist) { + iinfo.xattr_len = req->r_pagelist->length; + iinfo.xattr_data = req->r_pagelist->mapped_tail; + } else { + /* fake it */ + iinfo.xattr_len = ARRAY_SIZE(xattr_buf); + iinfo.xattr_data = xattr_buf; + memset(iinfo.xattr_data, 0, iinfo.xattr_len); + } + + in.ino = cpu_to_le64(vino.ino); + in.snapid = cpu_to_le64(CEPH_NOSNAP); + in.version = cpu_to_le64(1); // ??? + in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE); + in.cap.cap_id = cpu_to_le64(1); + in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino); + in.cap.flags = CEPH_CAP_FLAG_AUTH; + in.ctime = in.mtime = in.atime = iinfo.btime; + in.truncate_seq = cpu_to_le32(1); + in.truncate_size = cpu_to_le64(-1ULL); + in.xattr_version = cpu_to_le64(1); + in.uid = cpu_to_le32(from_kuid(&init_user_ns, current_fsuid())); + if (dir->i_mode & S_ISGID) { + in.gid = cpu_to_le32(from_kgid(&init_user_ns, dir->i_gid)); + + /* Directories always inherit the setgid bit. */ + if (S_ISDIR(mode)) + mode |= S_ISGID; + else if ((mode & (S_ISGID | S_IXGRP)) == (S_ISGID | S_IXGRP) && + !in_group_p(dir->i_gid) && + !capable_wrt_inode_uidgid(dir, CAP_FSETID)) + mode &= ~S_ISGID; + } else { + in.gid = cpu_to_le32(from_kgid(&init_user_ns, current_fsgid())); + } + in.mode = cpu_to_le32((u32)mode); + + in.nlink = cpu_to_le32(1); + in.max_size = cpu_to_le64(lo->stripe_unit); + + ceph_file_layout_to_legacy(lo, &in.layout); + /* lo is private, so pool_ns can't change */ + pool_ns = rcu_dereference_raw(lo->pool_ns); + if (pool_ns) { + iinfo.pool_ns_len = pool_ns->len; + iinfo.pool_ns_data = pool_ns->str; + } + + down_read(&mdsc->snap_rwsem); + ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session, + req->r_fmode, NULL); + up_read(&mdsc->snap_rwsem); + if (ret) { + dout("%s failed to fill inode: %d\n", __func__, ret); + ceph_dir_clear_complete(dir); + if (!d_unhashed(dentry)) + d_drop(dentry); + if (inode->i_state & I_NEW) + discard_new_inode(inode); + } else { + struct dentry *dn; + + dout("%s d_adding new inode 0x%llx to 0x%llx/%s\n", __func__, + vino.ino, ceph_ino(dir), dentry->d_name.name); + ceph_dir_clear_ordered(dir); + ceph_init_inode_acls(inode, as_ctx); + if (inode->i_state & I_NEW) { + /* + * If it's not I_NEW, then someone created this before + * we got here. Assume the server is aware of it at + * that point and don't worry about setting + * CEPH_I_ASYNC_CREATE. + */ + ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE; + unlock_new_inode(inode); + } + if (d_in_lookup(dentry) || d_really_is_negative(dentry)) { + if (!d_unhashed(dentry)) + d_drop(dentry); + dn = d_splice_alias(inode, dentry); + WARN_ON_ONCE(dn && dn != dentry); + } + file->f_mode |= FMODE_CREATED; + ret = finish_open(file, dentry, ceph_open); + } + return ret; +} /* * Do a lookup + open with a single request. If we get a non-existent @@ -435,7 +691,8 @@ struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct dentry *dn; - struct ceph_acls_info acls = {}; + struct ceph_acl_sec_ctx as_ctx = {}; + bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS); int mask; int err; @@ -446,41 +703,79 @@ if (dentry->d_name.len > NAME_MAX) return -ENAMETOOLONG; + /* + * Do not truncate the file, since atomic_open is called before the + * permission check. The caller will do the truncation afterward. + */ + flags &= ~O_TRUNC; + if (flags & O_CREAT) { if (ceph_quota_is_max_files_exceeded(dir)) return -EDQUOT; - err = ceph_pre_init_acls(dir, &mode, &acls); + err = ceph_pre_init_acls(dir, &mode, &as_ctx); if (err < 0) return err; + err = ceph_security_init_secctx(dentry, mode, &as_ctx); + if (err < 0) + goto out_ctx; + /* Async create can't handle more than a page of xattrs */ + if (as_ctx.pagelist && + !list_is_singular(&as_ctx.pagelist->head)) + try_async = false; + } else if (!d_in_lookup(dentry)) { + /* If it's not being looked up, it's negative */ + return -ENOENT; } - +retry: /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); if (IS_ERR(req)) { err = PTR_ERR(req); - goto out_acl; + goto out_ctx; } req->r_dentry = dget(dentry); req->r_num_caps = 2; + mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; + if (ceph_security_xattr_wanted(dir)) + mask |= CEPH_CAP_XATTR_SHARED; + req->r_args.open.mask = cpu_to_le32(mask); + req->r_parent = dir; + if (flags & O_CREAT) { + struct ceph_file_layout lo; + req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; - if (acls.pagelist) { - req->r_pagelist = acls.pagelist; - acls.pagelist = NULL; + if (as_ctx.pagelist) { + req->r_pagelist = as_ctx.pagelist; + as_ctx.pagelist = NULL; + } + if (try_async && + (req->r_dir_caps = + try_prep_async_create(dir, dentry, &lo, + &req->r_deleg_ino))) { + set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags); + req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL); + req->r_callback = ceph_async_create_cb; + err = ceph_mdsc_submit_request(mdsc, dir, req); + if (!err) { + err = ceph_finish_async_create(dir, dentry, + file, mode, req, + &as_ctx, &lo); + } else if (err == -EJUKEBOX) { + restore_deleg_ino(dir, req->r_deleg_ino); + ceph_mdsc_put_request(req); + try_async = false; + ceph_put_string(rcu_dereference_raw(lo.pool_ns)); + goto retry; + } + ceph_put_string(rcu_dereference_raw(lo.pool_ns)); + goto out_req; } } - mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; - if (ceph_security_xattr_wanted(dir)) - mask |= CEPH_CAP_XATTR_SHARED; - req->r_args.open.mask = cpu_to_le32(mask); - - req->r_parent = dir; set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); - err = ceph_mdsc_do_request(mdsc, - (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, - req); + err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req); err = ceph_handle_snapdir(req, dentry, err); if (err) goto out_req; @@ -505,17 +800,18 @@ } else { dout("atomic_open finish_open on dn %p\n", dn); if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { - ceph_init_inode_acls(d_inode(dentry), &acls); + struct inode *newino = d_inode(dentry); + + cache_file_layout(dir, newino); + ceph_init_inode_acls(newino, &as_ctx); file->f_mode |= FMODE_CREATED; } err = finish_open(file, dentry, ceph_open); } out_req: - if (!req->r_err && req->r_target_inode) - ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); ceph_mdsc_put_request(req); -out_acl: - ceph_release_acls_info(&acls); +out_ctx: + ceph_release_acl_sec_ctx(&as_ctx); dout("atomic_open result=%d\n", err); return err; } @@ -529,7 +825,7 @@ dout("release inode %p dir file %p\n", inode, file); WARN_ON(!list_empty(&dfi->file_info.rw_contexts)); - ceph_put_fmode(ci, dfi->file_info.fmode); + ceph_put_fmode(ci, dfi->file_info.fmode, 1); if (dfi->last_readdir) ceph_mdsc_put_request(dfi->last_readdir); @@ -541,7 +837,8 @@ dout("release inode %p regular file %p\n", inode, file); WARN_ON(!list_empty(&fi->rw_contexts)); - ceph_put_fmode(ci, fi->fmode); + ceph_put_fmode(ci, fi->fmode, 1); + kmem_cache_free(ceph_file_cachep, fi); } @@ -557,90 +854,26 @@ }; /* - * Read a range of bytes striped over one or more objects. Iterate over - * objects we stripe over. (That's not atomic, but good enough for now.) + * Completely synchronous read and write methods. Direct from __user + * buffer to osd, or directly to user pages (if O_DIRECT). + * + * If the read spans object boundary, just do multiple reads. (That's not + * atomic, but good enough for now.) * * If we get a short result from the OSD, check against i_size; we need to * only return a short read to the caller if we hit EOF. */ -static int striped_read(struct inode *inode, - u64 pos, u64 len, - struct page **pages, int num_pages, - int page_align, int *checkeof) -{ - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_inode_info *ci = ceph_inode(inode); - u64 this_len; - loff_t i_size; - int page_idx; - int ret, read = 0; - bool hit_stripe, was_short; - - /* - * we may need to do multiple reads. not atomic, unfortunately. - */ -more: - this_len = len; - page_idx = (page_align + read) >> PAGE_SHIFT; - ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), - &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, ci->i_truncate_size, - pages + page_idx, num_pages - page_idx, - ((page_align + read) & ~PAGE_MASK)); - if (ret == -ENOENT) - ret = 0; - hit_stripe = this_len < len; - was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, - ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - - i_size = i_size_read(inode); - if (ret >= 0) { - if (was_short && (pos + ret < i_size)) { - int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = page_align + read + ret; - dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); - ceph_zero_page_vector_range(zoff, zlen, pages); - ret += zlen; - } - - read += ret; - pos += ret; - len -= ret; - - /* hit stripe and need continue*/ - if (len && hit_stripe && pos < i_size) - goto more; - } - - if (read > 0) { - ret = read; - /* did we bounce off eof? */ - if (pos + len > i_size) - *checkeof = CHECK_EOF; - } - - dout("striped_read returns %d\n", ret); - return ret; -} - -/* - * Completely synchronous read and write methods. Direct from __user - * buffer to osd, or directly to user pages (if O_DIRECT). - * - * If the read spans object boundary, just do multiple reads. - */ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, - int *checkeof) + int *retry_op) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); - struct page **pages; - u64 off = iocb->ki_pos; - int num_pages; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_client *osdc = &fsc->client->osdc; ssize_t ret; - size_t len = iov_iter_count(to); + u64 off = iocb->ki_pos; + u64 len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); @@ -653,61 +886,108 @@ * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait_range(inode->i_mapping, off, - off + len); + ret = filemap_write_and_wait_range(inode->i_mapping, + off, off + len - 1); if (ret < 0) return ret; - if (unlikely(to->type & ITER_PIPE)) { + ret = 0; + while ((len = iov_iter_count(to)) > 0) { + struct ceph_osd_request *req; + struct page **pages; + int num_pages; size_t page_off; - ret = iov_iter_get_pages_alloc(to, &pages, len, - &page_off); - if (ret <= 0) - return -ENOMEM; - num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + u64 i_size; + bool more; + int idx; + size_t left; - ret = striped_read(inode, off, ret, pages, num_pages, - page_off, checkeof); - if (ret > 0) { - iov_iter_advance(to, ret); - off += ret; - } else { - iov_iter_advance(to, 0); + req = ceph_osdc_new_request(osdc, &ci->i_layout, + ci->i_vino, off, &len, 0, 1, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + break; } - ceph_put_page_vector(pages, num_pages, false); - } else { + + more = len < iov_iter_count(to); + num_pages = calc_pages_for(off, len); + page_off = off & ~PAGE_MASK; pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + if (IS_ERR(pages)) { + ceph_osdc_put_request(req); + ret = PTR_ERR(pages); + break; + } - ret = striped_read(inode, off, len, pages, num_pages, - (off & ~PAGE_MASK), checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, + false, false); + ret = ceph_osdc_start_request(osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(osdc, req); - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, - copy, to); - off += l; - left -= l; - if (l < copy) - break; + ceph_update_read_latency(&fsc->mdsc->metric, + req->r_start_latency, + req->r_end_latency, + ret); + + ceph_osdc_put_request(req); + + i_size = i_size_read(inode); + dout("sync_read %llu~%llu got %zd i_size %llu%s\n", + off, len, ret, i_size, (more ? " MORE" : "")); + + if (ret == -ENOENT) + ret = 0; + if (ret >= 0 && ret < len && (off + ret < i_size)) { + int zlen = min(len - ret, i_size - off - ret); + int zoff = page_off + ret; + dout("sync_read zero gap %llu~%llu\n", + off + ret, off + ret + zlen); + ceph_zero_page_vector_range(zoff, zlen, pages); + ret += zlen; + } + + idx = 0; + left = ret > 0 ? ret : 0; + while (left > 0) { + size_t len, copied; + page_off = off & ~PAGE_MASK; + len = min_t(size_t, left, PAGE_SIZE - page_off); + SetPageUptodate(pages[idx]); + copied = copy_page_to_iter(pages[idx++], + page_off, len, to); + off += copied; + left -= copied; + if (copied < len) { + ret = -EFAULT; + break; } } ceph_release_page_vector(pages, num_pages); + + if (ret < 0) { + if (ret == -EBLOCKLISTED) + fsc->blocklisted = true; + break; + } + + if (off >= i_size || !more) + break; } if (off > iocb->ki_pos) { + if (ret >= 0 && + iov_iter_count(to) > 0 && off >= i_size_read(inode)) + *retry_op = CHECK_EOF; ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %zd\n", ret); + dout("sync_read result %zd retry_op %d\n", ret, *retry_op); return ret; } @@ -739,6 +1019,9 @@ if (!atomic_dec_and_test(&aio_req->pending_reqs)) return; + + if (aio_req->iocb->ki_flags & IOCB_DIRECT) + inode_dio_end(inode); ret = aio_req->error; if (!ret) @@ -780,12 +1063,23 @@ struct inode *inode = req->r_inode; struct ceph_aio_request *aio_req = req->r_priv; struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); + struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric; BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); BUG_ON(!osd_data->num_bvecs); dout("ceph_aio_complete_req %p rc %d bytes %u\n", inode, rc, osd_data->bvec_pos.iter.bi_size); + + /* r_start_latency == 0 means the request was not submitted */ + if (req->r_start_latency) { + if (aio_req->write) + ceph_update_write_latency(metric, req->r_start_latency, + req->r_end_latency, rc); + else + ceph_update_read_latency(metric, req->r_start_latency, + req->r_end_latency, rc); + } if (rc == -EOLDSNAPC) { struct ceph_aio_work *aio_work; @@ -795,7 +1089,7 @@ if (aio_work) { INIT_WORK(&aio_work->work, ceph_aio_retry_work); aio_work->req = req; - queue_work(ceph_inode_to_client(inode)->wb_wq, + queue_work(ceph_inode_to_client(inode)->inode_wq, &aio_work->work); return; } @@ -821,7 +1115,7 @@ aio_req->total_len = rc + zlen; } - iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs, + iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs, osd_data->num_bvecs, osd_data->bvec_pos.iter.bi_size); iov_iter_advance(&i, rc); @@ -865,7 +1159,7 @@ } spin_unlock(&ci->i_ceph_lock); - req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2, + req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1, false, GFP_NOFS); if (!req) { ret = -ENOMEM; @@ -877,17 +1171,17 @@ ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); + req->r_ops[0] = orig_req->r_ops[0]; + + req->r_mtime = aio_req->mtime; + req->r_data_offset = req->r_ops[0].extent.offset; + ret = ceph_osdc_alloc_messages(req, GFP_NOFS); if (ret) { ceph_osdc_put_request(req); req = orig_req; goto out; } - - req->r_ops[0] = orig_req->r_ops[0]; - - req->r_mtime = aio_req->mtime; - req->r_data_offset = req->r_ops[0].extent.offset; ceph_osdc_put_request(orig_req); @@ -915,13 +1209,14 @@ struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_client_metric *metric = &fsc->mdsc->metric; struct ceph_vino vino; struct ceph_osd_request *req; struct bio_vec *bvecs; struct ceph_aio_request *aio_req = NULL; int num_pages = 0; int flags; - int ret; + int ret = 0; struct timespec64 mtime = current_time(inode); size_t count = iov_iter_count(iter); loff_t pos = iocb->ki_pos; @@ -933,16 +1228,12 @@ dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n", (write ? "write" : "read"), file, pos, (unsigned)count, - snapc, snapc->seq); - - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); - if (ret < 0) - return ret; + snapc, snapc ? snapc->seq : 0); if (write) { int ret2 = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_SHIFT, - (pos + count) >> PAGE_SHIFT); + (pos + count - 1) >> PAGE_SHIFT); if (ret2 < 0) dout("invalidate_inode_pages2_range returned %d\n", ret2); @@ -1010,7 +1301,7 @@ * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_SIZE - 1)); + PAGE_ALIGN(pos + len) - 1); req->r_mtime = mtime; } @@ -1025,7 +1316,7 @@ req->r_callback = ceph_aio_complete_req; req->r_inode = inode; req->r_priv = aio_req; - list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); + list_add_tail(&req->r_private_item, &aio_req->osd_reqs); pos += len; continue; @@ -1034,6 +1325,13 @@ ret = ceph_osdc_start_request(req->r_osdc, req, false); if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + if (write) + ceph_update_write_latency(metric, req->r_start_latency, + req->r_end_latency, ret); + else + ceph_update_read_latency(metric, req->r_start_latency, + req->r_end_latency, ret); size = i_size_read(inode); if (!write) { @@ -1044,8 +1342,7 @@ int zlen = min_t(size_t, len - ret, size - pos - ret); - iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages, - len); + iov_iter_bvec(&i, READ, bvecs, num_pages, len); iov_iter_advance(&i, ret); iov_iter_zero(zlen, &i); ret += zlen; @@ -1083,11 +1380,12 @@ CEPH_CAP_FILE_RD); list_splice(&aio_req->osd_reqs, &osd_reqs); + inode_dio_begin(inode); while (!list_empty(&osd_reqs)) { req = list_first_entry(&osd_reqs, struct ceph_osd_request, - r_unsafe_item); - list_del_init(&req->r_unsafe_item); + r_private_item); + list_del_init(&req->r_private_item); if (ret >= 0) ret = ceph_osdc_start_request(req->r_osdc, req, false); @@ -1139,13 +1437,14 @@ dout("sync_write on file %p %lld~%u snapc %p seq %lld\n", file, pos, (unsigned)count, snapc, snapc->seq); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + ret = filemap_write_and_wait_range(inode->i_mapping, + pos, pos + count - 1); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_SHIFT, - (pos + count) >> PAGE_SHIFT); + (pos + count - 1) >> PAGE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); @@ -1205,6 +1504,8 @@ if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency, + req->r_end_latency, ret); out: ceph_osdc_put_request(req); if (ret != 0) { @@ -1247,6 +1548,7 @@ struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); struct page *pinned_page = NULL; + bool direct_lock = iocb->ki_flags & IOCB_DIRECT; ssize_t ret; int want, got = 0; int retry_op = 0, read = 0; @@ -1255,13 +1557,24 @@ dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode); + if (direct_lock) + ceph_start_io_direct(inode); + else + ceph_start_io_read(inode); + if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_CACHE; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page); - if (ret < 0) + ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1, + &got, &pinned_page); + if (ret < 0) { + if (iocb->ki_flags & IOCB_DIRECT) + ceph_end_io_direct(inode); + else + ceph_end_io_read(inode); return ret; + } if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_flags & IOCB_DIRECT) || @@ -1292,6 +1605,7 @@ ret = generic_file_read_iter(iocb, to); ceph_del_rw_context(fi, &rw_ctx); } + dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); if (pinned_page) { @@ -1299,6 +1613,12 @@ pinned_page = NULL; } ceph_put_cap_refs(ci, got); + + if (direct_lock) + ceph_end_io_direct(inode); + else + ceph_end_io_read(inode); + if (retry_op > HAVE_RETRIED && ret >= 0) { int statret; struct page *page = NULL; @@ -1388,6 +1708,7 @@ struct ceph_cap_flush *prealloc_cf; ssize_t count, written = 0; int err, want, got; + bool direct_lock = false; u32 map_flags; u64 pool_flags; loff_t pos; @@ -1400,8 +1721,14 @@ if (!prealloc_cf) return -ENOMEM; + if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT) + direct_lock = true; + retry_snap: - inode_lock(inode); + if (direct_lock) + ceph_start_io_direct(inode); + else + ceph_start_io_write(inode); /* We can write back this queue in page reclaim */ current->backing_dev_info = inode_to_bdi(inode); @@ -1430,20 +1757,6 @@ goto out; } - err = file_remove_privs(file); - if (err) - goto out; - - err = file_update_time(file); - if (err) - goto out; - - if (ci->i_inline_version != CEPH_INLINE_NONE) { - err = ceph_uninline_data(file, NULL); - if (err < 0) - goto out; - } - down_read(&osdc->lock); map_flags = osdc->osdmap->flags; pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id); @@ -1454,6 +1767,16 @@ goto out; } + err = file_remove_privs(file); + if (err) + goto out; + + if (ci->i_inline_version != CEPH_INLINE_NONE) { + err = ceph_uninline_data(file, NULL); + if (err < 0) + goto out; + } + dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", inode, ceph_vinop(inode), pos, count, i_size_read(inode)); if (fi->fmode & CEPH_FILE_MODE_LAZY) @@ -1461,10 +1784,16 @@ else want = CEPH_CAP_FILE_BUFFER; got = 0; - err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count, + err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count, &got, NULL); if (err < 0) goto out; + + err = file_update_time(file); + if (err) + goto out_caps; + + inode_inc_iversion_raw(inode); dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n", inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); @@ -1474,7 +1803,6 @@ (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) { struct ceph_snap_context *snapc; struct iov_iter data; - inode_unlock(inode); spin_lock(&ci->i_ceph_lock); if (__ceph_have_pending_cap_snap(ci)) { @@ -1496,6 +1824,10 @@ &prealloc_cf); else written = ceph_sync_write(iocb, &data, pos, snapc); + if (direct_lock) + ceph_end_io_direct(inode); + else + ceph_end_io_write(inode); if (written > 0) iov_iter_advance(from, written); ceph_put_snap_context(snapc); @@ -1510,7 +1842,7 @@ written = generic_perform_write(file, from, pos); if (likely(written >= 0)) iocb->ki_pos = pos + written; - inode_unlock(inode); + ceph_end_io_write(inode); } if (written >= 0) { @@ -1524,7 +1856,7 @@ if (dirty) __mark_inode_dirty(inode, dirty); if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos)) - ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL); + ceph_check_caps(ci, 0, NULL); } dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", @@ -1546,9 +1878,13 @@ } goto out_unlocked; - +out_caps: + ceph_put_cap_refs(ci, got); out: - inode_unlock(inode); + if (direct_lock) + ceph_end_io_direct(inode); + else + ceph_end_io_write(inode); out_unlocked: ceph_free_cap_flush(prealloc_cf); current->backing_dev_info = NULL; @@ -1786,7 +2122,7 @@ else want = CEPH_CAP_FILE_BUFFER; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL); + ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL); if (ret < 0) goto unlock; @@ -1810,6 +2146,370 @@ return ret; } +/* + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for + * src_ci. Two attempts are made to obtain both caps, and an error is return if + * this fails; zero is returned on success. + */ +static int get_rd_wr_caps(struct file *src_filp, int *src_got, + struct file *dst_filp, + loff_t dst_endoff, int *dst_got) +{ + int ret = 0; + bool retrying = false; + +retry_caps: + ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, + dst_endoff, dst_got, NULL); + if (ret < 0) + return ret; + + /* + * Since we're already holding the FILE_WR capability for the dst file, + * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some + * retry dance instead to try to get both capabilities. + */ + ret = ceph_try_get_caps(file_inode(src_filp), + CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, + false, src_got); + if (ret <= 0) { + /* Start by dropping dst_ci caps and getting src_ci caps */ + ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got); + if (retrying) { + if (!ret) + /* ceph_try_get_caps masks EAGAIN */ + ret = -EAGAIN; + return ret; + } + ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD, + CEPH_CAP_FILE_SHARED, -1, src_got, NULL); + if (ret < 0) + return ret; + /*... drop src_ci caps too, and retry */ + ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got); + retrying = true; + goto retry_caps; + } + return ret; +} + +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, + struct ceph_inode_info *dst_ci, int dst_got) +{ + ceph_put_cap_refs(src_ci, src_got); + ceph_put_cap_refs(dst_ci, dst_got); +} + +/* + * This function does several size-related checks, returning an error if: + * - source file is smaller than off+len + * - destination file size is not OK (inode_newsize_ok()) + * - max bytes quotas is exceeded + */ +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, + loff_t src_off, loff_t dst_off, size_t len) +{ + loff_t size, endoff; + + size = i_size_read(src_inode); + /* + * Don't copy beyond source file EOF. Instead of simply setting length + * to (size - src_off), just drop to VFS default implementation, as the + * local i_size may be stale due to other clients writing to the source + * inode. + */ + if (src_off + len > size) { + dout("Copy beyond EOF (%llu + %zu > %llu)\n", + src_off, len, size); + return -EOPNOTSUPP; + } + size = i_size_read(dst_inode); + + endoff = dst_off + len; + if (inode_newsize_ok(dst_inode, endoff)) + return -EOPNOTSUPP; + + if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) + return -EDQUOT; + + return 0; +} + +static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off, + struct ceph_inode_info *dst_ci, u64 *dst_off, + struct ceph_fs_client *fsc, + size_t len, unsigned int flags) +{ + struct ceph_object_locator src_oloc, dst_oloc; + struct ceph_object_id src_oid, dst_oid; + size_t bytes = 0; + u64 src_objnum, src_objoff, dst_objnum, dst_objoff; + u32 src_objlen, dst_objlen; + u32 object_size = src_ci->i_layout.object_size; + int ret; + + src_oloc.pool = src_ci->i_layout.pool_id; + src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); + dst_oloc.pool = dst_ci->i_layout.pool_id; + dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); + + while (len >= object_size) { + ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off, + object_size, &src_objnum, + &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, *dst_off, + object_size, &dst_objnum, + &dst_objoff, &dst_objlen); + ceph_oid_init(&src_oid); + ceph_oid_printf(&src_oid, "%llx.%08llx", + src_ci->i_vino.ino, src_objnum); + ceph_oid_init(&dst_oid); + ceph_oid_printf(&dst_oid, "%llx.%08llx", + dst_ci->i_vino.ino, dst_objnum); + /* Do an object remote copy */ + ret = ceph_osdc_copy_from(&fsc->client->osdc, + src_ci->i_vino.snap, 0, + &src_oid, &src_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE, + &dst_oid, &dst_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, + dst_ci->i_truncate_seq, + dst_ci->i_truncate_size, + CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ); + if (ret) { + if (ret == -EOPNOTSUPP) { + fsc->have_copy_from2 = false; + pr_notice("OSDs don't support copy-from2; disabling copy offload\n"); + } + dout("ceph_osdc_copy_from returned %d\n", ret); + if (!bytes) + bytes = ret; + goto out; + } + len -= object_size; + bytes += object_size; + *src_off += object_size; + *dst_off += object_size; + } + +out: + ceph_oloc_destroy(&src_oloc); + ceph_oloc_destroy(&dst_oloc); + return bytes; +} + +static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off, + struct file *dst_file, loff_t dst_off, + size_t len, unsigned int flags) +{ + struct inode *src_inode = file_inode(src_file); + struct inode *dst_inode = file_inode(dst_file); + struct ceph_inode_info *src_ci = ceph_inode(src_inode); + struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); + struct ceph_cap_flush *prealloc_cf; + struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode); + loff_t size; + ssize_t ret = -EIO, bytes; + u64 src_objnum, dst_objnum, src_objoff, dst_objoff; + u32 src_objlen, dst_objlen; + int src_got = 0, dst_got = 0, err, dirty; + + if (src_inode->i_sb != dst_inode->i_sb) { + struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode); + + if (ceph_fsid_compare(&src_fsc->client->fsid, + &dst_fsc->client->fsid)) { + dout("Copying files across clusters: src: %pU dst: %pU\n", + &src_fsc->client->fsid, &dst_fsc->client->fsid); + return -EXDEV; + } + } + if (ceph_snap(dst_inode) != CEPH_NOSNAP) + return -EROFS; + + /* + * Some of the checks below will return -EOPNOTSUPP, which will force a + * fallback to the default VFS copy_file_range implementation. This is + * desirable in several cases (for ex, the 'len' is smaller than the + * size of the objects, or in cases where that would be more + * efficient). + */ + + if (ceph_test_mount_opt(src_fsc, NOCOPYFROM)) + return -EOPNOTSUPP; + + if (!src_fsc->have_copy_from2) + return -EOPNOTSUPP; + + /* + * Striped file layouts require that we copy partial objects, but the + * OSD copy-from operation only supports full-object copies. Limit + * this to non-striped file layouts for now. + */ + if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || + (src_ci->i_layout.stripe_count != 1) || + (dst_ci->i_layout.stripe_count != 1) || + (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) { + dout("Invalid src/dst files layout\n"); + return -EOPNOTSUPP; + } + + if (len < src_ci->i_layout.object_size) + return -EOPNOTSUPP; /* no remote copy will be done */ + + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + + /* Start by sync'ing the source and destination files */ + ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); + if (ret < 0) { + dout("failed to write src file (%zd)\n", ret); + goto out; + } + ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len)); + if (ret < 0) { + dout("failed to write dst file (%zd)\n", ret); + goto out; + } + + /* + * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other + * clients may have dirty data in their caches. And OSDs know nothing + * about caps, so they can't safely do the remote object copies. + */ + err = get_rd_wr_caps(src_file, &src_got, + dst_file, (dst_off + len), &dst_got); + if (err < 0) { + dout("get_rd_wr_caps returned %d\n", err); + ret = -EOPNOTSUPP; + goto out; + } + + ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); + if (ret < 0) + goto out_caps; + + /* Drop dst file cached pages */ + ret = invalidate_inode_pages2_range(dst_inode->i_mapping, + dst_off >> PAGE_SHIFT, + (dst_off + len) >> PAGE_SHIFT); + if (ret < 0) { + dout("Failed to invalidate inode pages (%zd)\n", ret); + ret = 0; /* XXX */ + } + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + src_ci->i_layout.object_size, + &src_objnum, &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + dst_ci->i_layout.object_size, + &dst_objnum, &dst_objoff, &dst_objlen); + /* object-level offsets need to the same */ + if (src_objoff != dst_objoff) { + ret = -EOPNOTSUPP; + goto out_caps; + } + + /* + * Do a manual copy if the object offset isn't object aligned. + * 'src_objlen' contains the bytes left until the end of the object, + * starting at the src_off + */ + if (src_objoff) { + dout("Initial partial copy of %u bytes\n", src_objlen); + + /* + * we need to temporarily drop all caps as we'll be calling + * {read,write}_iter, which will get caps again. + */ + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + ret = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, src_objlen, flags); + /* Abort on short copies or on error */ + if (ret < src_objlen) { + dout("Failed partial copy (%zd)\n", ret); + goto out; + } + len -= ret; + err = get_rd_wr_caps(src_file, &src_got, + dst_file, (dst_off + len), &dst_got); + if (err < 0) + goto out; + err = is_file_size_ok(src_inode, dst_inode, + src_off, dst_off, len); + if (err < 0) + goto out_caps; + } + + size = i_size_read(dst_inode); + bytes = ceph_do_objects_copy(src_ci, &src_off, dst_ci, &dst_off, + src_fsc, len, flags); + if (bytes <= 0) { + if (!ret) + ret = bytes; + goto out_caps; + } + dout("Copied %zu bytes out of %zu\n", bytes, len); + len -= bytes; + ret += bytes; + + file_update_time(dst_file); + inode_inc_iversion_raw(dst_inode); + + if (dst_off > size) { + /* Let the MDS know about dst file size change */ + if (ceph_inode_set_size(dst_inode, dst_off) || + ceph_quota_is_max_bytes_approaching(dst_inode, dst_off)) + ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY, NULL); + } + /* Mark Fw dirty */ + spin_lock(&dst_ci->i_ceph_lock); + dst_ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); + spin_unlock(&dst_ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(dst_inode, dirty); + +out_caps: + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + + /* + * Do the final manual copy if we still have some bytes left, unless + * there were errors in remote object copies (len >= object_size). + */ + if (len && (len < src_ci->i_layout.object_size)) { + dout("Final partial copy of %zu bytes\n", len); + bytes = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, len, flags); + if (bytes > 0) + ret += bytes; + else + dout("Failed partial copy (%zd)\n", bytes); + } + +out: + ceph_free_cap_flush(prealloc_cf); + + return ret; +} + +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, + struct file *dst_file, loff_t dst_off, + size_t len, unsigned int flags) +{ + ssize_t ret; + + ret = __ceph_copy_file_range(src_file, src_off, dst_file, dst_off, + len, flags); + + if (ret == -EOPNOTSUPP || ret == -EXDEV) + ret = generic_copy_file_range(src_file, src_off, dst_file, + dst_off, len, flags); + return ret; +} + const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, @@ -1824,7 +2524,7 @@ .splice_read = generic_file_splice_read, .splice_write = iter_file_splice_write, .unlocked_ioctl = ceph_ioctl, - .compat_ioctl = ceph_ioctl, + .compat_ioctl = compat_ptr_ioctl, .fallocate = ceph_fallocate, + .copy_file_range = ceph_copy_file_range, }; - -- Gitblit v1.6.2