hc
2024-05-10 9999e48639b3cecb08ffb37358bcba3b48161b29
kernel/fs/ceph/file.c
....@@ -1,5 +1,6 @@
11 // SPDX-License-Identifier: GPL-2.0
22 #include <linux/ceph/ceph_debug.h>
3
+#include <linux/ceph/striper.h>
34
45 #include <linux/module.h>
56 #include <linux/sched.h>
....@@ -9,10 +10,14 @@
910 #include <linux/namei.h>
1011 #include <linux/writeback.h>
1112 #include <linux/falloc.h>
13
+#include <linux/iversion.h>
14
+#include <linux/ktime.h>
1215
1316 #include "super.h"
1417 #include "mds_client.h"
1518 #include "cache.h"
19
+#include "io.h"
20
+#include "metric.h"
1621
1722 static __le32 ceph_flags_sys2wire(u32 flags)
1823 {
....@@ -177,8 +182,7 @@
177182 static struct ceph_mds_request *
178183 prepare_open_request(struct super_block *sb, int flags, int create_mode)
179184 {
180
- struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
181
- struct ceph_mds_client *mdsc = fsc->mdsc;
185
+ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb);
182186 struct ceph_mds_request *req;
183187 int want_auth = USE_ANY_MDS;
184188 int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
....@@ -199,6 +203,7 @@
199203 static int ceph_init_file_info(struct inode *inode, struct file *file,
200204 int fmode, bool isdir)
201205 {
206
+ struct ceph_inode_info *ci = ceph_inode(inode);
202207 struct ceph_file_info *fi;
203208
204209 dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
....@@ -208,10 +213,8 @@
208213 if (isdir) {
209214 struct ceph_dir_file_info *dfi =
210215 kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
211
- if (!dfi) {
212
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
216
+ if (!dfi)
213217 return -ENOMEM;
214
- }
215218
216219 file->private_data = dfi;
217220 fi = &dfi->file_info;
....@@ -219,17 +222,18 @@
219222 dfi->readdir_cache_idx = -1;
220223 } else {
221224 fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
222
- if (!fi) {
223
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
225
+ if (!fi)
224226 return -ENOMEM;
225
- }
226227
227228 file->private_data = fi;
228229 }
229230
231
+ ceph_get_fmode(ci, fmode, 1);
230232 fi->fmode = fmode;
233
+
231234 spin_lock_init(&fi->rw_contexts_lock);
232235 INIT_LIST_HEAD(&fi->rw_contexts);
236
+ fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
233237
234238 return 0;
235239 }
....@@ -246,17 +250,15 @@
246250 case S_IFREG:
247251 ceph_fscache_register_inode_cookie(inode);
248252 ceph_fscache_file_set_cookie(inode, file);
253
+ fallthrough;
249254 case S_IFDIR:
250255 ret = ceph_init_file_info(inode, file, fmode,
251256 S_ISDIR(inode->i_mode));
252
- if (ret)
253
- return ret;
254257 break;
255258
256259 case S_IFLNK:
257260 dout("init_file %p %p 0%o (symlink)\n", inode, file,
258261 inode->i_mode);
259
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
260262 break;
261263
262264 default:
....@@ -266,7 +268,6 @@
266268 * we need to drop the open ref now, since we don't
267269 * have .release set to ceph_release.
268270 */
269
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
270271 BUG_ON(inode->i_fop->release == ceph_release);
271272
272273 /* call the proper open fop */
....@@ -278,14 +279,15 @@
278279 /*
279280 * try renew caps after session gets killed.
280281 */
281
-int ceph_renew_caps(struct inode *inode)
282
+int ceph_renew_caps(struct inode *inode, int fmode)
282283 {
283
- struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
284
+ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
284285 struct ceph_inode_info *ci = ceph_inode(inode);
285286 struct ceph_mds_request *req;
286287 int err, flags, wanted;
287288
288289 spin_lock(&ci->i_ceph_lock);
290
+ __ceph_touch_fmode(ci, mdsc, fmode);
289291 wanted = __ceph_caps_file_wanted(ci);
290292 if (__ceph_is_any_real_caps(ci) &&
291293 (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
....@@ -319,7 +321,6 @@
319321 req->r_inode = inode;
320322 ihold(inode);
321323 req->r_num_caps = 1;
322
- req->r_fmode = -1;
323324
324325 err = ceph_mdsc_do_request(mdsc, NULL, req);
325326 ceph_mdsc_put_request(req);
....@@ -365,9 +366,6 @@
365366
366367 /* trivially open snapdir */
367368 if (ceph_snap(inode) == CEPH_SNAPDIR) {
368
- spin_lock(&ci->i_ceph_lock);
369
- __ceph_get_fmode(ci, fmode);
370
- spin_unlock(&ci->i_ceph_lock);
371369 return ceph_init_file(inode, file, fmode);
372370 }
373371
....@@ -385,7 +383,7 @@
385383 dout("open %p fmode %d want %s issued %s using existing\n",
386384 inode, fmode, ceph_cap_string(wanted),
387385 ceph_cap_string(issued));
388
- __ceph_get_fmode(ci, fmode);
386
+ __ceph_touch_fmode(ci, mdsc, fmode);
389387 spin_unlock(&ci->i_ceph_lock);
390388
391389 /* adjust wanted? */
....@@ -397,7 +395,7 @@
397395 return ceph_init_file(inode, file, fmode);
398396 } else if (ceph_snap(inode) != CEPH_NOSNAP &&
399397 (ci->i_snap_caps & wanted) == wanted) {
400
- __ceph_get_fmode(ci, fmode);
398
+ __ceph_touch_fmode(ci, mdsc, fmode);
401399 spin_unlock(&ci->i_ceph_lock);
402400 return ceph_init_file(inode, file, fmode);
403401 }
....@@ -423,6 +421,264 @@
423421 return err;
424422 }
425423
424
+/* Clone the layout from a synchronous create, if the dir now has Dc caps */
425
+static void
426
+cache_file_layout(struct inode *dst, struct inode *src)
427
+{
428
+ struct ceph_inode_info *cdst = ceph_inode(dst);
429
+ struct ceph_inode_info *csrc = ceph_inode(src);
430
+
431
+ spin_lock(&cdst->i_ceph_lock);
432
+ if ((__ceph_caps_issued(cdst, NULL) & CEPH_CAP_DIR_CREATE) &&
433
+ !ceph_file_layout_is_valid(&cdst->i_cached_layout)) {
434
+ memcpy(&cdst->i_cached_layout, &csrc->i_layout,
435
+ sizeof(cdst->i_cached_layout));
436
+ rcu_assign_pointer(cdst->i_cached_layout.pool_ns,
437
+ ceph_try_get_string(csrc->i_layout.pool_ns));
438
+ }
439
+ spin_unlock(&cdst->i_ceph_lock);
440
+}
441
+
442
+/*
443
+ * Try to set up an async create. We need caps, a file layout, and inode number,
444
+ * and either a lease on the dentry or complete dir info. If any of those
445
+ * criteria are not satisfied, then return false and the caller can go
446
+ * synchronous.
447
+ */
448
+static int try_prep_async_create(struct inode *dir, struct dentry *dentry,
449
+ struct ceph_file_layout *lo, u64 *pino)
450
+{
451
+ struct ceph_inode_info *ci = ceph_inode(dir);
452
+ struct ceph_dentry_info *di = ceph_dentry(dentry);
453
+ int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE;
454
+ u64 ino;
455
+
456
+ spin_lock(&ci->i_ceph_lock);
457
+ /* No auth cap means no chance for Dc caps */
458
+ if (!ci->i_auth_cap)
459
+ goto no_async;
460
+
461
+ /* Any delegated inos? */
462
+ if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos))
463
+ goto no_async;
464
+
465
+ if (!ceph_file_layout_is_valid(&ci->i_cached_layout))
466
+ goto no_async;
467
+
468
+ if ((__ceph_caps_issued(ci, NULL) & want) != want)
469
+ goto no_async;
470
+
471
+ if (d_in_lookup(dentry)) {
472
+ if (!__ceph_dir_is_complete(ci))
473
+ goto no_async;
474
+ spin_lock(&dentry->d_lock);
475
+ di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
476
+ spin_unlock(&dentry->d_lock);
477
+ } else if (atomic_read(&ci->i_shared_gen) !=
478
+ READ_ONCE(di->lease_shared_gen)) {
479
+ goto no_async;
480
+ }
481
+
482
+ ino = ceph_get_deleg_ino(ci->i_auth_cap->session);
483
+ if (!ino)
484
+ goto no_async;
485
+
486
+ *pino = ino;
487
+ ceph_take_cap_refs(ci, want, false);
488
+ memcpy(lo, &ci->i_cached_layout, sizeof(*lo));
489
+ rcu_assign_pointer(lo->pool_ns,
490
+ ceph_try_get_string(ci->i_cached_layout.pool_ns));
491
+ got = want;
492
+no_async:
493
+ spin_unlock(&ci->i_ceph_lock);
494
+ return got;
495
+}
496
+
497
+static void restore_deleg_ino(struct inode *dir, u64 ino)
498
+{
499
+ struct ceph_inode_info *ci = ceph_inode(dir);
500
+ struct ceph_mds_session *s = NULL;
501
+
502
+ spin_lock(&ci->i_ceph_lock);
503
+ if (ci->i_auth_cap)
504
+ s = ceph_get_mds_session(ci->i_auth_cap->session);
505
+ spin_unlock(&ci->i_ceph_lock);
506
+ if (s) {
507
+ int err = ceph_restore_deleg_ino(s, ino);
508
+ if (err)
509
+ pr_warn("ceph: unable to restore delegated ino 0x%llx to session: %d\n",
510
+ ino, err);
511
+ ceph_put_mds_session(s);
512
+ }
513
+}
514
+
515
+static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
516
+ struct ceph_mds_request *req)
517
+{
518
+ int result = req->r_err ? req->r_err :
519
+ le32_to_cpu(req->r_reply_info.head->result);
520
+
521
+ if (result == -EJUKEBOX)
522
+ goto out;
523
+
524
+ mapping_set_error(req->r_parent->i_mapping, result);
525
+
526
+ if (result) {
527
+ struct dentry *dentry = req->r_dentry;
528
+ int pathlen = 0;
529
+ u64 base = 0;
530
+ char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
531
+ &base, 0);
532
+
533
+ ceph_dir_clear_complete(req->r_parent);
534
+ if (!d_unhashed(dentry))
535
+ d_drop(dentry);
536
+
537
+ /* FIXME: start returning I/O errors on all accesses? */
538
+ pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
539
+ base, IS_ERR(path) ? "<<bad>>" : path, result);
540
+ ceph_mdsc_free_path(path, pathlen);
541
+ }
542
+
543
+ if (req->r_target_inode) {
544
+ struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
545
+ u64 ino = ceph_vino(req->r_target_inode).ino;
546
+
547
+ if (req->r_deleg_ino != ino)
548
+ pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n",
549
+ __func__, req->r_err, req->r_deleg_ino, ino);
550
+ mapping_set_error(req->r_target_inode->i_mapping, result);
551
+
552
+ spin_lock(&ci->i_ceph_lock);
553
+ if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
554
+ ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
555
+ wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
556
+ }
557
+ ceph_kick_flushing_inode_caps(req->r_session, ci);
558
+ spin_unlock(&ci->i_ceph_lock);
559
+ } else {
560
+ pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
561
+ req->r_deleg_ino);
562
+ }
563
+out:
564
+ ceph_mdsc_release_dir_caps(req);
565
+}
566
+
567
+static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
568
+ struct file *file, umode_t mode,
569
+ struct ceph_mds_request *req,
570
+ struct ceph_acl_sec_ctx *as_ctx,
571
+ struct ceph_file_layout *lo)
572
+{
573
+ int ret;
574
+ char xattr_buf[4];
575
+ struct ceph_mds_reply_inode in = { };
576
+ struct ceph_mds_reply_info_in iinfo = { .in = &in };
577
+ struct ceph_inode_info *ci = ceph_inode(dir);
578
+ struct inode *inode;
579
+ struct timespec64 now;
580
+ struct ceph_string *pool_ns;
581
+ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
582
+ struct ceph_vino vino = { .ino = req->r_deleg_ino,
583
+ .snap = CEPH_NOSNAP };
584
+
585
+ ktime_get_real_ts64(&now);
586
+
587
+ inode = ceph_get_inode(dentry->d_sb, vino);
588
+ if (IS_ERR(inode))
589
+ return PTR_ERR(inode);
590
+
591
+ iinfo.inline_version = CEPH_INLINE_NONE;
592
+ iinfo.change_attr = 1;
593
+ ceph_encode_timespec64(&iinfo.btime, &now);
594
+
595
+ if (req->r_pagelist) {
596
+ iinfo.xattr_len = req->r_pagelist->length;
597
+ iinfo.xattr_data = req->r_pagelist->mapped_tail;
598
+ } else {
599
+ /* fake it */
600
+ iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
601
+ iinfo.xattr_data = xattr_buf;
602
+ memset(iinfo.xattr_data, 0, iinfo.xattr_len);
603
+ }
604
+
605
+ in.ino = cpu_to_le64(vino.ino);
606
+ in.snapid = cpu_to_le64(CEPH_NOSNAP);
607
+ in.version = cpu_to_le64(1); // ???
608
+ in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE);
609
+ in.cap.cap_id = cpu_to_le64(1);
610
+ in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino);
611
+ in.cap.flags = CEPH_CAP_FLAG_AUTH;
612
+ in.ctime = in.mtime = in.atime = iinfo.btime;
613
+ in.truncate_seq = cpu_to_le32(1);
614
+ in.truncate_size = cpu_to_le64(-1ULL);
615
+ in.xattr_version = cpu_to_le64(1);
616
+ in.uid = cpu_to_le32(from_kuid(&init_user_ns, current_fsuid()));
617
+ if (dir->i_mode & S_ISGID) {
618
+ in.gid = cpu_to_le32(from_kgid(&init_user_ns, dir->i_gid));
619
+
620
+ /* Directories always inherit the setgid bit. */
621
+ if (S_ISDIR(mode))
622
+ mode |= S_ISGID;
623
+ else if ((mode & (S_ISGID | S_IXGRP)) == (S_ISGID | S_IXGRP) &&
624
+ !in_group_p(dir->i_gid) &&
625
+ !capable_wrt_inode_uidgid(dir, CAP_FSETID))
626
+ mode &= ~S_ISGID;
627
+ } else {
628
+ in.gid = cpu_to_le32(from_kgid(&init_user_ns, current_fsgid()));
629
+ }
630
+ in.mode = cpu_to_le32((u32)mode);
631
+
632
+ in.nlink = cpu_to_le32(1);
633
+ in.max_size = cpu_to_le64(lo->stripe_unit);
634
+
635
+ ceph_file_layout_to_legacy(lo, &in.layout);
636
+ /* lo is private, so pool_ns can't change */
637
+ pool_ns = rcu_dereference_raw(lo->pool_ns);
638
+ if (pool_ns) {
639
+ iinfo.pool_ns_len = pool_ns->len;
640
+ iinfo.pool_ns_data = pool_ns->str;
641
+ }
642
+
643
+ down_read(&mdsc->snap_rwsem);
644
+ ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
645
+ req->r_fmode, NULL);
646
+ up_read(&mdsc->snap_rwsem);
647
+ if (ret) {
648
+ dout("%s failed to fill inode: %d\n", __func__, ret);
649
+ ceph_dir_clear_complete(dir);
650
+ if (!d_unhashed(dentry))
651
+ d_drop(dentry);
652
+ if (inode->i_state & I_NEW)
653
+ discard_new_inode(inode);
654
+ } else {
655
+ struct dentry *dn;
656
+
657
+ dout("%s d_adding new inode 0x%llx to 0x%llx/%s\n", __func__,
658
+ vino.ino, ceph_ino(dir), dentry->d_name.name);
659
+ ceph_dir_clear_ordered(dir);
660
+ ceph_init_inode_acls(inode, as_ctx);
661
+ if (inode->i_state & I_NEW) {
662
+ /*
663
+ * If it's not I_NEW, then someone created this before
664
+ * we got here. Assume the server is aware of it at
665
+ * that point and don't worry about setting
666
+ * CEPH_I_ASYNC_CREATE.
667
+ */
668
+ ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE;
669
+ unlock_new_inode(inode);
670
+ }
671
+ if (d_in_lookup(dentry) || d_really_is_negative(dentry)) {
672
+ if (!d_unhashed(dentry))
673
+ d_drop(dentry);
674
+ dn = d_splice_alias(inode, dentry);
675
+ WARN_ON_ONCE(dn && dn != dentry);
676
+ }
677
+ file->f_mode |= FMODE_CREATED;
678
+ ret = finish_open(file, dentry, ceph_open);
679
+ }
680
+ return ret;
681
+}
426682
427683 /*
428684 * Do a lookup + open with a single request. If we get a non-existent
....@@ -435,7 +691,8 @@
435691 struct ceph_mds_client *mdsc = fsc->mdsc;
436692 struct ceph_mds_request *req;
437693 struct dentry *dn;
438
- struct ceph_acls_info acls = {};
694
+ struct ceph_acl_sec_ctx as_ctx = {};
695
+ bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
439696 int mask;
440697 int err;
441698
....@@ -446,41 +703,79 @@
446703 if (dentry->d_name.len > NAME_MAX)
447704 return -ENAMETOOLONG;
448705
706
+ /*
707
+ * Do not truncate the file, since atomic_open is called before the
708
+ * permission check. The caller will do the truncation afterward.
709
+ */
710
+ flags &= ~O_TRUNC;
711
+
449712 if (flags & O_CREAT) {
450713 if (ceph_quota_is_max_files_exceeded(dir))
451714 return -EDQUOT;
452
- err = ceph_pre_init_acls(dir, &mode, &acls);
715
+ err = ceph_pre_init_acls(dir, &mode, &as_ctx);
453716 if (err < 0)
454717 return err;
718
+ err = ceph_security_init_secctx(dentry, mode, &as_ctx);
719
+ if (err < 0)
720
+ goto out_ctx;
721
+ /* Async create can't handle more than a page of xattrs */
722
+ if (as_ctx.pagelist &&
723
+ !list_is_singular(&as_ctx.pagelist->head))
724
+ try_async = false;
725
+ } else if (!d_in_lookup(dentry)) {
726
+ /* If it's not being looked up, it's negative */
727
+ return -ENOENT;
455728 }
456
-
729
+retry:
457730 /* do the open */
458731 req = prepare_open_request(dir->i_sb, flags, mode);
459732 if (IS_ERR(req)) {
460733 err = PTR_ERR(req);
461
- goto out_acl;
734
+ goto out_ctx;
462735 }
463736 req->r_dentry = dget(dentry);
464737 req->r_num_caps = 2;
738
+ mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
739
+ if (ceph_security_xattr_wanted(dir))
740
+ mask |= CEPH_CAP_XATTR_SHARED;
741
+ req->r_args.open.mask = cpu_to_le32(mask);
742
+ req->r_parent = dir;
743
+
465744 if (flags & O_CREAT) {
745
+ struct ceph_file_layout lo;
746
+
466747 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
467748 req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
468
- if (acls.pagelist) {
469
- req->r_pagelist = acls.pagelist;
470
- acls.pagelist = NULL;
749
+ if (as_ctx.pagelist) {
750
+ req->r_pagelist = as_ctx.pagelist;
751
+ as_ctx.pagelist = NULL;
752
+ }
753
+ if (try_async &&
754
+ (req->r_dir_caps =
755
+ try_prep_async_create(dir, dentry, &lo,
756
+ &req->r_deleg_ino))) {
757
+ set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
758
+ req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
759
+ req->r_callback = ceph_async_create_cb;
760
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
761
+ if (!err) {
762
+ err = ceph_finish_async_create(dir, dentry,
763
+ file, mode, req,
764
+ &as_ctx, &lo);
765
+ } else if (err == -EJUKEBOX) {
766
+ restore_deleg_ino(dir, req->r_deleg_ino);
767
+ ceph_mdsc_put_request(req);
768
+ try_async = false;
769
+ ceph_put_string(rcu_dereference_raw(lo.pool_ns));
770
+ goto retry;
771
+ }
772
+ ceph_put_string(rcu_dereference_raw(lo.pool_ns));
773
+ goto out_req;
471774 }
472775 }
473776
474
- mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
475
- if (ceph_security_xattr_wanted(dir))
476
- mask |= CEPH_CAP_XATTR_SHARED;
477
- req->r_args.open.mask = cpu_to_le32(mask);
478
-
479
- req->r_parent = dir;
480777 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
481
- err = ceph_mdsc_do_request(mdsc,
482
- (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
483
- req);
778
+ err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req);
484779 err = ceph_handle_snapdir(req, dentry, err);
485780 if (err)
486781 goto out_req;
....@@ -505,17 +800,18 @@
505800 } else {
506801 dout("atomic_open finish_open on dn %p\n", dn);
507802 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
508
- ceph_init_inode_acls(d_inode(dentry), &acls);
803
+ struct inode *newino = d_inode(dentry);
804
+
805
+ cache_file_layout(dir, newino);
806
+ ceph_init_inode_acls(newino, &as_ctx);
509807 file->f_mode |= FMODE_CREATED;
510808 }
511809 err = finish_open(file, dentry, ceph_open);
512810 }
513811 out_req:
514
- if (!req->r_err && req->r_target_inode)
515
- ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
516812 ceph_mdsc_put_request(req);
517
-out_acl:
518
- ceph_release_acls_info(&acls);
813
+out_ctx:
814
+ ceph_release_acl_sec_ctx(&as_ctx);
519815 dout("atomic_open result=%d\n", err);
520816 return err;
521817 }
....@@ -529,7 +825,7 @@
529825 dout("release inode %p dir file %p\n", inode, file);
530826 WARN_ON(!list_empty(&dfi->file_info.rw_contexts));
531827
532
- ceph_put_fmode(ci, dfi->file_info.fmode);
828
+ ceph_put_fmode(ci, dfi->file_info.fmode, 1);
533829
534830 if (dfi->last_readdir)
535831 ceph_mdsc_put_request(dfi->last_readdir);
....@@ -541,7 +837,8 @@
541837 dout("release inode %p regular file %p\n", inode, file);
542838 WARN_ON(!list_empty(&fi->rw_contexts));
543839
544
- ceph_put_fmode(ci, fi->fmode);
840
+ ceph_put_fmode(ci, fi->fmode, 1);
841
+
545842 kmem_cache_free(ceph_file_cachep, fi);
546843 }
547844
....@@ -557,90 +854,26 @@
557854 };
558855
559856 /*
560
- * Read a range of bytes striped over one or more objects. Iterate over
561
- * objects we stripe over. (That's not atomic, but good enough for now.)
857
+ * Completely synchronous read and write methods. Direct from __user
858
+ * buffer to osd, or directly to user pages (if O_DIRECT).
859
+ *
860
+ * If the read spans object boundary, just do multiple reads. (That's not
861
+ * atomic, but good enough for now.)
562862 *
563863 * If we get a short result from the OSD, check against i_size; we need to
564864 * only return a short read to the caller if we hit EOF.
565865 */
566
-static int striped_read(struct inode *inode,
567
- u64 pos, u64 len,
568
- struct page **pages, int num_pages,
569
- int page_align, int *checkeof)
570
-{
571
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
572
- struct ceph_inode_info *ci = ceph_inode(inode);
573
- u64 this_len;
574
- loff_t i_size;
575
- int page_idx;
576
- int ret, read = 0;
577
- bool hit_stripe, was_short;
578
-
579
- /*
580
- * we may need to do multiple reads. not atomic, unfortunately.
581
- */
582
-more:
583
- this_len = len;
584
- page_idx = (page_align + read) >> PAGE_SHIFT;
585
- ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
586
- &ci->i_layout, pos, &this_len,
587
- ci->i_truncate_seq, ci->i_truncate_size,
588
- pages + page_idx, num_pages - page_idx,
589
- ((page_align + read) & ~PAGE_MASK));
590
- if (ret == -ENOENT)
591
- ret = 0;
592
- hit_stripe = this_len < len;
593
- was_short = ret >= 0 && ret < this_len;
594
- dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
595
- ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
596
-
597
- i_size = i_size_read(inode);
598
- if (ret >= 0) {
599
- if (was_short && (pos + ret < i_size)) {
600
- int zlen = min(this_len - ret, i_size - pos - ret);
601
- int zoff = page_align + read + ret;
602
- dout(" zero gap %llu to %llu\n",
603
- pos + ret, pos + ret + zlen);
604
- ceph_zero_page_vector_range(zoff, zlen, pages);
605
- ret += zlen;
606
- }
607
-
608
- read += ret;
609
- pos += ret;
610
- len -= ret;
611
-
612
- /* hit stripe and need continue*/
613
- if (len && hit_stripe && pos < i_size)
614
- goto more;
615
- }
616
-
617
- if (read > 0) {
618
- ret = read;
619
- /* did we bounce off eof? */
620
- if (pos + len > i_size)
621
- *checkeof = CHECK_EOF;
622
- }
623
-
624
- dout("striped_read returns %d\n", ret);
625
- return ret;
626
-}
627
-
628
-/*
629
- * Completely synchronous read and write methods. Direct from __user
630
- * buffer to osd, or directly to user pages (if O_DIRECT).
631
- *
632
- * If the read spans object boundary, just do multiple reads.
633
- */
634866 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
635
- int *checkeof)
867
+ int *retry_op)
636868 {
637869 struct file *file = iocb->ki_filp;
638870 struct inode *inode = file_inode(file);
639
- struct page **pages;
640
- u64 off = iocb->ki_pos;
641
- int num_pages;
871
+ struct ceph_inode_info *ci = ceph_inode(inode);
872
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
873
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
642874 ssize_t ret;
643
- size_t len = iov_iter_count(to);
875
+ u64 off = iocb->ki_pos;
876
+ u64 len = iov_iter_count(to);
644877
645878 dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
646879 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
....@@ -653,61 +886,108 @@
653886 * but it will at least behave sensibly when they are
654887 * in sequence.
655888 */
656
- ret = filemap_write_and_wait_range(inode->i_mapping, off,
657
- off + len);
889
+ ret = filemap_write_and_wait_range(inode->i_mapping,
890
+ off, off + len - 1);
658891 if (ret < 0)
659892 return ret;
660893
661
- if (unlikely(to->type & ITER_PIPE)) {
894
+ ret = 0;
895
+ while ((len = iov_iter_count(to)) > 0) {
896
+ struct ceph_osd_request *req;
897
+ struct page **pages;
898
+ int num_pages;
662899 size_t page_off;
663
- ret = iov_iter_get_pages_alloc(to, &pages, len,
664
- &page_off);
665
- if (ret <= 0)
666
- return -ENOMEM;
667
- num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
900
+ u64 i_size;
901
+ bool more;
902
+ int idx;
903
+ size_t left;
668904
669
- ret = striped_read(inode, off, ret, pages, num_pages,
670
- page_off, checkeof);
671
- if (ret > 0) {
672
- iov_iter_advance(to, ret);
673
- off += ret;
674
- } else {
675
- iov_iter_advance(to, 0);
905
+ req = ceph_osdc_new_request(osdc, &ci->i_layout,
906
+ ci->i_vino, off, &len, 0, 1,
907
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
908
+ NULL, ci->i_truncate_seq,
909
+ ci->i_truncate_size, false);
910
+ if (IS_ERR(req)) {
911
+ ret = PTR_ERR(req);
912
+ break;
676913 }
677
- ceph_put_page_vector(pages, num_pages, false);
678
- } else {
914
+
915
+ more = len < iov_iter_count(to);
916
+
679917 num_pages = calc_pages_for(off, len);
918
+ page_off = off & ~PAGE_MASK;
680919 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
681
- if (IS_ERR(pages))
682
- return PTR_ERR(pages);
920
+ if (IS_ERR(pages)) {
921
+ ceph_osdc_put_request(req);
922
+ ret = PTR_ERR(pages);
923
+ break;
924
+ }
683925
684
- ret = striped_read(inode, off, len, pages, num_pages,
685
- (off & ~PAGE_MASK), checkeof);
686
- if (ret > 0) {
687
- int l, k = 0;
688
- size_t left = ret;
926
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
927
+ false, false);
928
+ ret = ceph_osdc_start_request(osdc, req, false);
929
+ if (!ret)
930
+ ret = ceph_osdc_wait_request(osdc, req);
689931
690
- while (left) {
691
- size_t page_off = off & ~PAGE_MASK;
692
- size_t copy = min_t(size_t, left,
693
- PAGE_SIZE - page_off);
694
- l = copy_page_to_iter(pages[k++], page_off,
695
- copy, to);
696
- off += l;
697
- left -= l;
698
- if (l < copy)
699
- break;
932
+ ceph_update_read_latency(&fsc->mdsc->metric,
933
+ req->r_start_latency,
934
+ req->r_end_latency,
935
+ ret);
936
+
937
+ ceph_osdc_put_request(req);
938
+
939
+ i_size = i_size_read(inode);
940
+ dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
941
+ off, len, ret, i_size, (more ? " MORE" : ""));
942
+
943
+ if (ret == -ENOENT)
944
+ ret = 0;
945
+ if (ret >= 0 && ret < len && (off + ret < i_size)) {
946
+ int zlen = min(len - ret, i_size - off - ret);
947
+ int zoff = page_off + ret;
948
+ dout("sync_read zero gap %llu~%llu\n",
949
+ off + ret, off + ret + zlen);
950
+ ceph_zero_page_vector_range(zoff, zlen, pages);
951
+ ret += zlen;
952
+ }
953
+
954
+ idx = 0;
955
+ left = ret > 0 ? ret : 0;
956
+ while (left > 0) {
957
+ size_t len, copied;
958
+ page_off = off & ~PAGE_MASK;
959
+ len = min_t(size_t, left, PAGE_SIZE - page_off);
960
+ SetPageUptodate(pages[idx]);
961
+ copied = copy_page_to_iter(pages[idx++],
962
+ page_off, len, to);
963
+ off += copied;
964
+ left -= copied;
965
+ if (copied < len) {
966
+ ret = -EFAULT;
967
+ break;
700968 }
701969 }
702970 ceph_release_page_vector(pages, num_pages);
971
+
972
+ if (ret < 0) {
973
+ if (ret == -EBLOCKLISTED)
974
+ fsc->blocklisted = true;
975
+ break;
976
+ }
977
+
978
+ if (off >= i_size || !more)
979
+ break;
703980 }
704981
705982 if (off > iocb->ki_pos) {
983
+ if (ret >= 0 &&
984
+ iov_iter_count(to) > 0 && off >= i_size_read(inode))
985
+ *retry_op = CHECK_EOF;
706986 ret = off - iocb->ki_pos;
707987 iocb->ki_pos = off;
708988 }
709989
710
- dout("sync_read result %zd\n", ret);
990
+ dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
711991 return ret;
712992 }
713993
....@@ -739,6 +1019,9 @@
7391019
7401020 if (!atomic_dec_and_test(&aio_req->pending_reqs))
7411021 return;
1022
+
1023
+ if (aio_req->iocb->ki_flags & IOCB_DIRECT)
1024
+ inode_dio_end(inode);
7421025
7431026 ret = aio_req->error;
7441027 if (!ret)
....@@ -780,12 +1063,23 @@
7801063 struct inode *inode = req->r_inode;
7811064 struct ceph_aio_request *aio_req = req->r_priv;
7821065 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
1066
+ struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
7831067
7841068 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
7851069 BUG_ON(!osd_data->num_bvecs);
7861070
7871071 dout("ceph_aio_complete_req %p rc %d bytes %u\n",
7881072 inode, rc, osd_data->bvec_pos.iter.bi_size);
1073
+
1074
+ /* r_start_latency == 0 means the request was not submitted */
1075
+ if (req->r_start_latency) {
1076
+ if (aio_req->write)
1077
+ ceph_update_write_latency(metric, req->r_start_latency,
1078
+ req->r_end_latency, rc);
1079
+ else
1080
+ ceph_update_read_latency(metric, req->r_start_latency,
1081
+ req->r_end_latency, rc);
1082
+ }
7891083
7901084 if (rc == -EOLDSNAPC) {
7911085 struct ceph_aio_work *aio_work;
....@@ -795,7 +1089,7 @@
7951089 if (aio_work) {
7961090 INIT_WORK(&aio_work->work, ceph_aio_retry_work);
7971091 aio_work->req = req;
798
- queue_work(ceph_inode_to_client(inode)->wb_wq,
1092
+ queue_work(ceph_inode_to_client(inode)->inode_wq,
7991093 &aio_work->work);
8001094 return;
8011095 }
....@@ -821,7 +1115,7 @@
8211115 aio_req->total_len = rc + zlen;
8221116 }
8231117
824
- iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs,
1118
+ iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
8251119 osd_data->num_bvecs,
8261120 osd_data->bvec_pos.iter.bi_size);
8271121 iov_iter_advance(&i, rc);
....@@ -865,7 +1159,7 @@
8651159 }
8661160 spin_unlock(&ci->i_ceph_lock);
8671161
868
- req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
1162
+ req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
8691163 false, GFP_NOFS);
8701164 if (!req) {
8711165 ret = -ENOMEM;
....@@ -877,17 +1171,17 @@
8771171 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
8781172 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
8791173
1174
+ req->r_ops[0] = orig_req->r_ops[0];
1175
+
1176
+ req->r_mtime = aio_req->mtime;
1177
+ req->r_data_offset = req->r_ops[0].extent.offset;
1178
+
8801179 ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
8811180 if (ret) {
8821181 ceph_osdc_put_request(req);
8831182 req = orig_req;
8841183 goto out;
8851184 }
886
-
887
- req->r_ops[0] = orig_req->r_ops[0];
888
-
889
- req->r_mtime = aio_req->mtime;
890
- req->r_data_offset = req->r_ops[0].extent.offset;
8911185
8921186 ceph_osdc_put_request(orig_req);
8931187
....@@ -915,13 +1209,14 @@
9151209 struct inode *inode = file_inode(file);
9161210 struct ceph_inode_info *ci = ceph_inode(inode);
9171211 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1212
+ struct ceph_client_metric *metric = &fsc->mdsc->metric;
9181213 struct ceph_vino vino;
9191214 struct ceph_osd_request *req;
9201215 struct bio_vec *bvecs;
9211216 struct ceph_aio_request *aio_req = NULL;
9221217 int num_pages = 0;
9231218 int flags;
924
- int ret;
1219
+ int ret = 0;
9251220 struct timespec64 mtime = current_time(inode);
9261221 size_t count = iov_iter_count(iter);
9271222 loff_t pos = iocb->ki_pos;
....@@ -933,16 +1228,12 @@
9331228
9341229 dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
9351230 (write ? "write" : "read"), file, pos, (unsigned)count,
936
- snapc, snapc->seq);
937
-
938
- ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
939
- if (ret < 0)
940
- return ret;
1231
+ snapc, snapc ? snapc->seq : 0);
9411232
9421233 if (write) {
9431234 int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
9441235 pos >> PAGE_SHIFT,
945
- (pos + count) >> PAGE_SHIFT);
1236
+ (pos + count - 1) >> PAGE_SHIFT);
9461237 if (ret2 < 0)
9471238 dout("invalidate_inode_pages2_range returned %d\n", ret2);
9481239
....@@ -1010,7 +1301,7 @@
10101301 * may block.
10111302 */
10121303 truncate_inode_pages_range(inode->i_mapping, pos,
1013
- (pos+len) | (PAGE_SIZE - 1));
1304
+ PAGE_ALIGN(pos + len) - 1);
10141305
10151306 req->r_mtime = mtime;
10161307 }
....@@ -1025,7 +1316,7 @@
10251316 req->r_callback = ceph_aio_complete_req;
10261317 req->r_inode = inode;
10271318 req->r_priv = aio_req;
1028
- list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
1319
+ list_add_tail(&req->r_private_item, &aio_req->osd_reqs);
10291320
10301321 pos += len;
10311322 continue;
....@@ -1034,6 +1325,13 @@
10341325 ret = ceph_osdc_start_request(req->r_osdc, req, false);
10351326 if (!ret)
10361327 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1328
+
1329
+ if (write)
1330
+ ceph_update_write_latency(metric, req->r_start_latency,
1331
+ req->r_end_latency, ret);
1332
+ else
1333
+ ceph_update_read_latency(metric, req->r_start_latency,
1334
+ req->r_end_latency, ret);
10371335
10381336 size = i_size_read(inode);
10391337 if (!write) {
....@@ -1044,8 +1342,7 @@
10441342 int zlen = min_t(size_t, len - ret,
10451343 size - pos - ret);
10461344
1047
- iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages,
1048
- len);
1345
+ iov_iter_bvec(&i, READ, bvecs, num_pages, len);
10491346 iov_iter_advance(&i, ret);
10501347 iov_iter_zero(zlen, &i);
10511348 ret += zlen;
....@@ -1083,11 +1380,12 @@
10831380 CEPH_CAP_FILE_RD);
10841381
10851382 list_splice(&aio_req->osd_reqs, &osd_reqs);
1383
+ inode_dio_begin(inode);
10861384 while (!list_empty(&osd_reqs)) {
10871385 req = list_first_entry(&osd_reqs,
10881386 struct ceph_osd_request,
1089
- r_unsafe_item);
1090
- list_del_init(&req->r_unsafe_item);
1387
+ r_private_item);
1388
+ list_del_init(&req->r_private_item);
10911389 if (ret >= 0)
10921390 ret = ceph_osdc_start_request(req->r_osdc,
10931391 req, false);
....@@ -1139,13 +1437,14 @@
11391437 dout("sync_write on file %p %lld~%u snapc %p seq %lld\n",
11401438 file, pos, (unsigned)count, snapc, snapc->seq);
11411439
1142
- ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
1440
+ ret = filemap_write_and_wait_range(inode->i_mapping,
1441
+ pos, pos + count - 1);
11431442 if (ret < 0)
11441443 return ret;
11451444
11461445 ret = invalidate_inode_pages2_range(inode->i_mapping,
11471446 pos >> PAGE_SHIFT,
1148
- (pos + count) >> PAGE_SHIFT);
1447
+ (pos + count - 1) >> PAGE_SHIFT);
11491448 if (ret < 0)
11501449 dout("invalidate_inode_pages2_range returned %d\n", ret);
11511450
....@@ -1205,6 +1504,8 @@
12051504 if (!ret)
12061505 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
12071506
1507
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
1508
+ req->r_end_latency, ret);
12081509 out:
12091510 ceph_osdc_put_request(req);
12101511 if (ret != 0) {
....@@ -1247,6 +1548,7 @@
12471548 struct inode *inode = file_inode(filp);
12481549 struct ceph_inode_info *ci = ceph_inode(inode);
12491550 struct page *pinned_page = NULL;
1551
+ bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
12501552 ssize_t ret;
12511553 int want, got = 0;
12521554 int retry_op = 0, read = 0;
....@@ -1255,13 +1557,24 @@
12551557 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
12561558 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
12571559
1560
+ if (direct_lock)
1561
+ ceph_start_io_direct(inode);
1562
+ else
1563
+ ceph_start_io_read(inode);
1564
+
12581565 if (fi->fmode & CEPH_FILE_MODE_LAZY)
12591566 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
12601567 else
12611568 want = CEPH_CAP_FILE_CACHE;
1262
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1263
- if (ret < 0)
1569
+ ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
1570
+ &got, &pinned_page);
1571
+ if (ret < 0) {
1572
+ if (iocb->ki_flags & IOCB_DIRECT)
1573
+ ceph_end_io_direct(inode);
1574
+ else
1575
+ ceph_end_io_read(inode);
12641576 return ret;
1577
+ }
12651578
12661579 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
12671580 (iocb->ki_flags & IOCB_DIRECT) ||
....@@ -1292,6 +1605,7 @@
12921605 ret = generic_file_read_iter(iocb, to);
12931606 ceph_del_rw_context(fi, &rw_ctx);
12941607 }
1608
+
12951609 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
12961610 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
12971611 if (pinned_page) {
....@@ -1299,6 +1613,12 @@
12991613 pinned_page = NULL;
13001614 }
13011615 ceph_put_cap_refs(ci, got);
1616
+
1617
+ if (direct_lock)
1618
+ ceph_end_io_direct(inode);
1619
+ else
1620
+ ceph_end_io_read(inode);
1621
+
13021622 if (retry_op > HAVE_RETRIED && ret >= 0) {
13031623 int statret;
13041624 struct page *page = NULL;
....@@ -1388,6 +1708,7 @@
13881708 struct ceph_cap_flush *prealloc_cf;
13891709 ssize_t count, written = 0;
13901710 int err, want, got;
1711
+ bool direct_lock = false;
13911712 u32 map_flags;
13921713 u64 pool_flags;
13931714 loff_t pos;
....@@ -1400,8 +1721,14 @@
14001721 if (!prealloc_cf)
14011722 return -ENOMEM;
14021723
1724
+ if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT)
1725
+ direct_lock = true;
1726
+
14031727 retry_snap:
1404
- inode_lock(inode);
1728
+ if (direct_lock)
1729
+ ceph_start_io_direct(inode);
1730
+ else
1731
+ ceph_start_io_write(inode);
14051732
14061733 /* We can write back this queue in page reclaim */
14071734 current->backing_dev_info = inode_to_bdi(inode);
....@@ -1430,20 +1757,6 @@
14301757 goto out;
14311758 }
14321759
1433
- err = file_remove_privs(file);
1434
- if (err)
1435
- goto out;
1436
-
1437
- err = file_update_time(file);
1438
- if (err)
1439
- goto out;
1440
-
1441
- if (ci->i_inline_version != CEPH_INLINE_NONE) {
1442
- err = ceph_uninline_data(file, NULL);
1443
- if (err < 0)
1444
- goto out;
1445
- }
1446
-
14471760 down_read(&osdc->lock);
14481761 map_flags = osdc->osdmap->flags;
14491762 pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
....@@ -1454,6 +1767,16 @@
14541767 goto out;
14551768 }
14561769
1770
+ err = file_remove_privs(file);
1771
+ if (err)
1772
+ goto out;
1773
+
1774
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
1775
+ err = ceph_uninline_data(file, NULL);
1776
+ if (err < 0)
1777
+ goto out;
1778
+ }
1779
+
14571780 dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
14581781 inode, ceph_vinop(inode), pos, count, i_size_read(inode));
14591782 if (fi->fmode & CEPH_FILE_MODE_LAZY)
....@@ -1461,10 +1784,16 @@
14611784 else
14621785 want = CEPH_CAP_FILE_BUFFER;
14631786 got = 0;
1464
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
1787
+ err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
14651788 &got, NULL);
14661789 if (err < 0)
14671790 goto out;
1791
+
1792
+ err = file_update_time(file);
1793
+ if (err)
1794
+ goto out_caps;
1795
+
1796
+ inode_inc_iversion_raw(inode);
14681797
14691798 dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
14701799 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
....@@ -1474,7 +1803,6 @@
14741803 (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
14751804 struct ceph_snap_context *snapc;
14761805 struct iov_iter data;
1477
- inode_unlock(inode);
14781806
14791807 spin_lock(&ci->i_ceph_lock);
14801808 if (__ceph_have_pending_cap_snap(ci)) {
....@@ -1496,6 +1824,10 @@
14961824 &prealloc_cf);
14971825 else
14981826 written = ceph_sync_write(iocb, &data, pos, snapc);
1827
+ if (direct_lock)
1828
+ ceph_end_io_direct(inode);
1829
+ else
1830
+ ceph_end_io_write(inode);
14991831 if (written > 0)
15001832 iov_iter_advance(from, written);
15011833 ceph_put_snap_context(snapc);
....@@ -1510,7 +1842,7 @@
15101842 written = generic_perform_write(file, from, pos);
15111843 if (likely(written >= 0))
15121844 iocb->ki_pos = pos + written;
1513
- inode_unlock(inode);
1845
+ ceph_end_io_write(inode);
15141846 }
15151847
15161848 if (written >= 0) {
....@@ -1524,7 +1856,7 @@
15241856 if (dirty)
15251857 __mark_inode_dirty(inode, dirty);
15261858 if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
1527
- ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
1859
+ ceph_check_caps(ci, 0, NULL);
15281860 }
15291861
15301862 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
....@@ -1546,9 +1878,13 @@
15461878 }
15471879
15481880 goto out_unlocked;
1549
-
1881
+out_caps:
1882
+ ceph_put_cap_refs(ci, got);
15501883 out:
1551
- inode_unlock(inode);
1884
+ if (direct_lock)
1885
+ ceph_end_io_direct(inode);
1886
+ else
1887
+ ceph_end_io_write(inode);
15521888 out_unlocked:
15531889 ceph_free_cap_flush(prealloc_cf);
15541890 current->backing_dev_info = NULL;
....@@ -1786,7 +2122,7 @@
17862122 else
17872123 want = CEPH_CAP_FILE_BUFFER;
17882124
1789
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
2125
+ ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
17902126 if (ret < 0)
17912127 goto unlock;
17922128
....@@ -1810,6 +2146,370 @@
18102146 return ret;
18112147 }
18122148
2149
+/*
2150
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
2151
+ * src_ci. Two attempts are made to obtain both caps, and an error is return if
2152
+ * this fails; zero is returned on success.
2153
+ */
2154
+static int get_rd_wr_caps(struct file *src_filp, int *src_got,
2155
+ struct file *dst_filp,
2156
+ loff_t dst_endoff, int *dst_got)
2157
+{
2158
+ int ret = 0;
2159
+ bool retrying = false;
2160
+
2161
+retry_caps:
2162
+ ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
2163
+ dst_endoff, dst_got, NULL);
2164
+ if (ret < 0)
2165
+ return ret;
2166
+
2167
+ /*
2168
+ * Since we're already holding the FILE_WR capability for the dst file,
2169
+ * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some
2170
+ * retry dance instead to try to get both capabilities.
2171
+ */
2172
+ ret = ceph_try_get_caps(file_inode(src_filp),
2173
+ CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
2174
+ false, src_got);
2175
+ if (ret <= 0) {
2176
+ /* Start by dropping dst_ci caps and getting src_ci caps */
2177
+ ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got);
2178
+ if (retrying) {
2179
+ if (!ret)
2180
+ /* ceph_try_get_caps masks EAGAIN */
2181
+ ret = -EAGAIN;
2182
+ return ret;
2183
+ }
2184
+ ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
2185
+ CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
2186
+ if (ret < 0)
2187
+ return ret;
2188
+ /*... drop src_ci caps too, and retry */
2189
+ ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got);
2190
+ retrying = true;
2191
+ goto retry_caps;
2192
+ }
2193
+ return ret;
2194
+}
2195
+
2196
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
2197
+ struct ceph_inode_info *dst_ci, int dst_got)
2198
+{
2199
+ ceph_put_cap_refs(src_ci, src_got);
2200
+ ceph_put_cap_refs(dst_ci, dst_got);
2201
+}
2202
+
2203
+/*
2204
+ * This function does several size-related checks, returning an error if:
2205
+ * - source file is smaller than off+len
2206
+ * - destination file size is not OK (inode_newsize_ok())
2207
+ * - max bytes quotas is exceeded
2208
+ */
2209
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
2210
+ loff_t src_off, loff_t dst_off, size_t len)
2211
+{
2212
+ loff_t size, endoff;
2213
+
2214
+ size = i_size_read(src_inode);
2215
+ /*
2216
+ * Don't copy beyond source file EOF. Instead of simply setting length
2217
+ * to (size - src_off), just drop to VFS default implementation, as the
2218
+ * local i_size may be stale due to other clients writing to the source
2219
+ * inode.
2220
+ */
2221
+ if (src_off + len > size) {
2222
+ dout("Copy beyond EOF (%llu + %zu > %llu)\n",
2223
+ src_off, len, size);
2224
+ return -EOPNOTSUPP;
2225
+ }
2226
+ size = i_size_read(dst_inode);
2227
+
2228
+ endoff = dst_off + len;
2229
+ if (inode_newsize_ok(dst_inode, endoff))
2230
+ return -EOPNOTSUPP;
2231
+
2232
+ if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
2233
+ return -EDQUOT;
2234
+
2235
+ return 0;
2236
+}
2237
+
2238
+static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off,
2239
+ struct ceph_inode_info *dst_ci, u64 *dst_off,
2240
+ struct ceph_fs_client *fsc,
2241
+ size_t len, unsigned int flags)
2242
+{
2243
+ struct ceph_object_locator src_oloc, dst_oloc;
2244
+ struct ceph_object_id src_oid, dst_oid;
2245
+ size_t bytes = 0;
2246
+ u64 src_objnum, src_objoff, dst_objnum, dst_objoff;
2247
+ u32 src_objlen, dst_objlen;
2248
+ u32 object_size = src_ci->i_layout.object_size;
2249
+ int ret;
2250
+
2251
+ src_oloc.pool = src_ci->i_layout.pool_id;
2252
+ src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
2253
+ dst_oloc.pool = dst_ci->i_layout.pool_id;
2254
+ dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
2255
+
2256
+ while (len >= object_size) {
2257
+ ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off,
2258
+ object_size, &src_objnum,
2259
+ &src_objoff, &src_objlen);
2260
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, *dst_off,
2261
+ object_size, &dst_objnum,
2262
+ &dst_objoff, &dst_objlen);
2263
+ ceph_oid_init(&src_oid);
2264
+ ceph_oid_printf(&src_oid, "%llx.%08llx",
2265
+ src_ci->i_vino.ino, src_objnum);
2266
+ ceph_oid_init(&dst_oid);
2267
+ ceph_oid_printf(&dst_oid, "%llx.%08llx",
2268
+ dst_ci->i_vino.ino, dst_objnum);
2269
+ /* Do an object remote copy */
2270
+ ret = ceph_osdc_copy_from(&fsc->client->osdc,
2271
+ src_ci->i_vino.snap, 0,
2272
+ &src_oid, &src_oloc,
2273
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2274
+ CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
2275
+ &dst_oid, &dst_oloc,
2276
+ CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2277
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
2278
+ dst_ci->i_truncate_seq,
2279
+ dst_ci->i_truncate_size,
2280
+ CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
2281
+ if (ret) {
2282
+ if (ret == -EOPNOTSUPP) {
2283
+ fsc->have_copy_from2 = false;
2284
+ pr_notice("OSDs don't support copy-from2; disabling copy offload\n");
2285
+ }
2286
+ dout("ceph_osdc_copy_from returned %d\n", ret);
2287
+ if (!bytes)
2288
+ bytes = ret;
2289
+ goto out;
2290
+ }
2291
+ len -= object_size;
2292
+ bytes += object_size;
2293
+ *src_off += object_size;
2294
+ *dst_off += object_size;
2295
+ }
2296
+
2297
+out:
2298
+ ceph_oloc_destroy(&src_oloc);
2299
+ ceph_oloc_destroy(&dst_oloc);
2300
+ return bytes;
2301
+}
2302
+
2303
+static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
2304
+ struct file *dst_file, loff_t dst_off,
2305
+ size_t len, unsigned int flags)
2306
+{
2307
+ struct inode *src_inode = file_inode(src_file);
2308
+ struct inode *dst_inode = file_inode(dst_file);
2309
+ struct ceph_inode_info *src_ci = ceph_inode(src_inode);
2310
+ struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
2311
+ struct ceph_cap_flush *prealloc_cf;
2312
+ struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
2313
+ loff_t size;
2314
+ ssize_t ret = -EIO, bytes;
2315
+ u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
2316
+ u32 src_objlen, dst_objlen;
2317
+ int src_got = 0, dst_got = 0, err, dirty;
2318
+
2319
+ if (src_inode->i_sb != dst_inode->i_sb) {
2320
+ struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
2321
+
2322
+ if (ceph_fsid_compare(&src_fsc->client->fsid,
2323
+ &dst_fsc->client->fsid)) {
2324
+ dout("Copying files across clusters: src: %pU dst: %pU\n",
2325
+ &src_fsc->client->fsid, &dst_fsc->client->fsid);
2326
+ return -EXDEV;
2327
+ }
2328
+ }
2329
+ if (ceph_snap(dst_inode) != CEPH_NOSNAP)
2330
+ return -EROFS;
2331
+
2332
+ /*
2333
+ * Some of the checks below will return -EOPNOTSUPP, which will force a
2334
+ * fallback to the default VFS copy_file_range implementation. This is
2335
+ * desirable in several cases (for ex, the 'len' is smaller than the
2336
+ * size of the objects, or in cases where that would be more
2337
+ * efficient).
2338
+ */
2339
+
2340
+ if (ceph_test_mount_opt(src_fsc, NOCOPYFROM))
2341
+ return -EOPNOTSUPP;
2342
+
2343
+ if (!src_fsc->have_copy_from2)
2344
+ return -EOPNOTSUPP;
2345
+
2346
+ /*
2347
+ * Striped file layouts require that we copy partial objects, but the
2348
+ * OSD copy-from operation only supports full-object copies. Limit
2349
+ * this to non-striped file layouts for now.
2350
+ */
2351
+ if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
2352
+ (src_ci->i_layout.stripe_count != 1) ||
2353
+ (dst_ci->i_layout.stripe_count != 1) ||
2354
+ (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) {
2355
+ dout("Invalid src/dst files layout\n");
2356
+ return -EOPNOTSUPP;
2357
+ }
2358
+
2359
+ if (len < src_ci->i_layout.object_size)
2360
+ return -EOPNOTSUPP; /* no remote copy will be done */
2361
+
2362
+ prealloc_cf = ceph_alloc_cap_flush();
2363
+ if (!prealloc_cf)
2364
+ return -ENOMEM;
2365
+
2366
+ /* Start by sync'ing the source and destination files */
2367
+ ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
2368
+ if (ret < 0) {
2369
+ dout("failed to write src file (%zd)\n", ret);
2370
+ goto out;
2371
+ }
2372
+ ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len));
2373
+ if (ret < 0) {
2374
+ dout("failed to write dst file (%zd)\n", ret);
2375
+ goto out;
2376
+ }
2377
+
2378
+ /*
2379
+ * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
2380
+ * clients may have dirty data in their caches. And OSDs know nothing
2381
+ * about caps, so they can't safely do the remote object copies.
2382
+ */
2383
+ err = get_rd_wr_caps(src_file, &src_got,
2384
+ dst_file, (dst_off + len), &dst_got);
2385
+ if (err < 0) {
2386
+ dout("get_rd_wr_caps returned %d\n", err);
2387
+ ret = -EOPNOTSUPP;
2388
+ goto out;
2389
+ }
2390
+
2391
+ ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
2392
+ if (ret < 0)
2393
+ goto out_caps;
2394
+
2395
+ /* Drop dst file cached pages */
2396
+ ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
2397
+ dst_off >> PAGE_SHIFT,
2398
+ (dst_off + len) >> PAGE_SHIFT);
2399
+ if (ret < 0) {
2400
+ dout("Failed to invalidate inode pages (%zd)\n", ret);
2401
+ ret = 0; /* XXX */
2402
+ }
2403
+ ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
2404
+ src_ci->i_layout.object_size,
2405
+ &src_objnum, &src_objoff, &src_objlen);
2406
+ ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
2407
+ dst_ci->i_layout.object_size,
2408
+ &dst_objnum, &dst_objoff, &dst_objlen);
2409
+ /* object-level offsets need to the same */
2410
+ if (src_objoff != dst_objoff) {
2411
+ ret = -EOPNOTSUPP;
2412
+ goto out_caps;
2413
+ }
2414
+
2415
+ /*
2416
+ * Do a manual copy if the object offset isn't object aligned.
2417
+ * 'src_objlen' contains the bytes left until the end of the object,
2418
+ * starting at the src_off
2419
+ */
2420
+ if (src_objoff) {
2421
+ dout("Initial partial copy of %u bytes\n", src_objlen);
2422
+
2423
+ /*
2424
+ * we need to temporarily drop all caps as we'll be calling
2425
+ * {read,write}_iter, which will get caps again.
2426
+ */
2427
+ put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
2428
+ ret = do_splice_direct(src_file, &src_off, dst_file,
2429
+ &dst_off, src_objlen, flags);
2430
+ /* Abort on short copies or on error */
2431
+ if (ret < src_objlen) {
2432
+ dout("Failed partial copy (%zd)\n", ret);
2433
+ goto out;
2434
+ }
2435
+ len -= ret;
2436
+ err = get_rd_wr_caps(src_file, &src_got,
2437
+ dst_file, (dst_off + len), &dst_got);
2438
+ if (err < 0)
2439
+ goto out;
2440
+ err = is_file_size_ok(src_inode, dst_inode,
2441
+ src_off, dst_off, len);
2442
+ if (err < 0)
2443
+ goto out_caps;
2444
+ }
2445
+
2446
+ size = i_size_read(dst_inode);
2447
+ bytes = ceph_do_objects_copy(src_ci, &src_off, dst_ci, &dst_off,
2448
+ src_fsc, len, flags);
2449
+ if (bytes <= 0) {
2450
+ if (!ret)
2451
+ ret = bytes;
2452
+ goto out_caps;
2453
+ }
2454
+ dout("Copied %zu bytes out of %zu\n", bytes, len);
2455
+ len -= bytes;
2456
+ ret += bytes;
2457
+
2458
+ file_update_time(dst_file);
2459
+ inode_inc_iversion_raw(dst_inode);
2460
+
2461
+ if (dst_off > size) {
2462
+ /* Let the MDS know about dst file size change */
2463
+ if (ceph_inode_set_size(dst_inode, dst_off) ||
2464
+ ceph_quota_is_max_bytes_approaching(dst_inode, dst_off))
2465
+ ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY, NULL);
2466
+ }
2467
+ /* Mark Fw dirty */
2468
+ spin_lock(&dst_ci->i_ceph_lock);
2469
+ dst_ci->i_inline_version = CEPH_INLINE_NONE;
2470
+ dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
2471
+ spin_unlock(&dst_ci->i_ceph_lock);
2472
+ if (dirty)
2473
+ __mark_inode_dirty(dst_inode, dirty);
2474
+
2475
+out_caps:
2476
+ put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
2477
+
2478
+ /*
2479
+ * Do the final manual copy if we still have some bytes left, unless
2480
+ * there were errors in remote object copies (len >= object_size).
2481
+ */
2482
+ if (len && (len < src_ci->i_layout.object_size)) {
2483
+ dout("Final partial copy of %zu bytes\n", len);
2484
+ bytes = do_splice_direct(src_file, &src_off, dst_file,
2485
+ &dst_off, len, flags);
2486
+ if (bytes > 0)
2487
+ ret += bytes;
2488
+ else
2489
+ dout("Failed partial copy (%zd)\n", bytes);
2490
+ }
2491
+
2492
+out:
2493
+ ceph_free_cap_flush(prealloc_cf);
2494
+
2495
+ return ret;
2496
+}
2497
+
2498
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
2499
+ struct file *dst_file, loff_t dst_off,
2500
+ size_t len, unsigned int flags)
2501
+{
2502
+ ssize_t ret;
2503
+
2504
+ ret = __ceph_copy_file_range(src_file, src_off, dst_file, dst_off,
2505
+ len, flags);
2506
+
2507
+ if (ret == -EOPNOTSUPP || ret == -EXDEV)
2508
+ ret = generic_copy_file_range(src_file, src_off, dst_file,
2509
+ dst_off, len, flags);
2510
+ return ret;
2511
+}
2512
+
18132513 const struct file_operations ceph_file_fops = {
18142514 .open = ceph_open,
18152515 .release = ceph_release,
....@@ -1824,7 +2524,7 @@
18242524 .splice_read = generic_file_splice_read,
18252525 .splice_write = iter_file_splice_write,
18262526 .unlocked_ioctl = ceph_ioctl,
1827
- .compat_ioctl = ceph_ioctl,
2527
+ .compat_ioctl = compat_ptr_ioctl,
18282528 .fallocate = ceph_fallocate,
2529
+ .copy_file_range = ceph_copy_file_range,
18292530 };
1830
-