From cde9070d9970eef1f7ec2360586c802a16230ad8 Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Fri, 10 May 2024 07:43:50 +0000 Subject: [PATCH] rtl88x2CE_WiFi_linux driver --- kernel/fs/ceph/caps.c | 1653 ++++++++++++++++++++++++++++++++++++---------------------- 1 files changed, 1,025 insertions(+), 628 deletions(-) diff --git a/kernel/fs/ceph/caps.c b/kernel/fs/ceph/caps.c index 6443ba1..432dc2a 100644 --- a/kernel/fs/ceph/caps.c +++ b/kernel/fs/ceph/caps.c @@ -8,6 +8,7 @@ #include <linux/vmalloc.h> #include <linux/wait.h> #include <linux/writeback.h> +#include <linux/iversion.h> #include "super.h" #include "mds_client.h" @@ -148,11 +149,17 @@ spin_unlock(&mdsc->caps_list_lock); } -void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) +void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc, + struct ceph_mount_options *fsopt) { spin_lock(&mdsc->caps_list_lock); - mdsc->caps_min_count += delta; - BUG_ON(mdsc->caps_min_count < 0); + mdsc->caps_min_count = fsopt->max_readdir; + if (mdsc->caps_min_count < 1024) + mdsc->caps_min_count = 1024; + mdsc->caps_use_max = fsopt->caps_max; + if (mdsc->caps_use_max > 0 && + mdsc->caps_use_max < mdsc->caps_min_count) + mdsc->caps_use_max = mdsc->caps_min_count; spin_unlock(&mdsc->caps_list_lock); } @@ -272,6 +279,7 @@ if (!err) { BUG_ON(have + alloc != need); ctx->count = need; + ctx->used = 0; } spin_lock(&mdsc->caps_list_lock); @@ -295,13 +303,24 @@ } void ceph_unreserve_caps(struct ceph_mds_client *mdsc, - struct ceph_cap_reservation *ctx) + struct ceph_cap_reservation *ctx) { + bool reclaim = false; + if (!ctx->count) + return; + dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); spin_lock(&mdsc->caps_list_lock); __ceph_unreserve_caps(mdsc, ctx->count); ctx->count = 0; + + if (mdsc->caps_use_max > 0 && + mdsc->caps_use_count > mdsc->caps_use_max) + reclaim = true; spin_unlock(&mdsc->caps_list_lock); + + if (reclaim) + ceph_reclaim_caps_nr(mdsc, ctx->used); } struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, @@ -346,6 +365,7 @@ BUG_ON(list_empty(&mdsc->caps_list)); ctx->count--; + ctx->used++; mdsc->caps_reserve_count--; mdsc->caps_use_count++; @@ -438,37 +458,6 @@ } /* - * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. - */ -static int __ceph_get_cap_mds(struct ceph_inode_info *ci) -{ - struct ceph_cap *cap; - int mds = -1; - struct rb_node *p; - - /* prefer mds with WR|BUFFER|EXCL caps */ - for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { - cap = rb_entry(p, struct ceph_cap, ci_node); - mds = cap->mds; - if (cap->issued & (CEPH_CAP_FILE_WR | - CEPH_CAP_FILE_BUFFER | - CEPH_CAP_FILE_EXCL)) - break; - } - return mds; -} - -int ceph_get_cap_mds(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - int mds; - spin_lock(&ci->i_ceph_lock); - mds = __ceph_get_cap_mds(ceph_inode(inode)); - spin_unlock(&ci->i_ceph_lock); - return mds; -} - -/* * Called under i_ceph_lock. */ static void __insert_cap_node(struct ceph_inode_info *ci, @@ -500,14 +489,11 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { - struct ceph_mount_options *ma = mdsc->fsc->mount_options; - - ci->i_hold_caps_min = round_jiffies(jiffies + - ma->caps_wanted_delay_min * HZ); + struct ceph_mount_options *opt = mdsc->fsc->mount_options; ci->i_hold_caps_max = round_jiffies(jiffies + - ma->caps_wanted_delay_max * HZ); - dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode, - ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies); + opt->caps_wanted_delay_max * HZ); + dout("__cap_set_timeouts %p %lu\n", &ci->vfs_inode, + ci->i_hold_caps_max - jiffies); } /* @@ -521,8 +507,7 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { - __cap_set_timeouts(mdsc, ci); - dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, + dout("__cap_delay_requeue %p flags 0x%lx at %lu\n", &ci->vfs_inode, ci->i_ceph_flags, ci->i_hold_caps_max); if (!mdsc->stopping) { spin_lock(&mdsc->cap_delay_lock); @@ -531,6 +516,7 @@ goto no_change; list_del_init(&ci->i_cap_delay_list); } + __cap_set_timeouts(mdsc, ci); list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); no_change: spin_unlock(&mdsc->cap_delay_lock); @@ -570,19 +556,20 @@ spin_unlock(&mdsc->cap_delay_lock); } -/* - * Common issue checks for add_cap, handle_cap_grant. - */ +/* Common issue checks for add_cap, handle_cap_grant. */ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, unsigned issued) { unsigned had = __ceph_caps_issued(ci, NULL); + lockdep_assert_held(&ci->i_ceph_lock); + /* * Each time we receive FILE_CACHE anew, we increment * i_rdcache_gen. */ - if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + if (S_ISREG(ci->vfs_inode.i_mode) && + (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { ci->i_rdcache_gen++; } @@ -601,12 +588,40 @@ __ceph_dir_clear_complete(ci); } } + + /* Wipe saved layout if we're losing DIR_CREATE caps */ + if (S_ISDIR(ci->vfs_inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) && + !(issued & CEPH_CAP_DIR_CREATE)) { + ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); + memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); + } +} + +/** + * change_auth_cap_ses - move inode to appropriate lists when auth caps change + * @ci: inode to be moved + * @session: new auth caps session + */ +static void change_auth_cap_ses(struct ceph_inode_info *ci, + struct ceph_mds_session *session) +{ + lockdep_assert_held(&ci->i_ceph_lock); + + if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item)) + return; + + spin_lock(&session->s_mdsc->cap_dirty_lock); + if (!list_empty(&ci->i_dirty_item)) + list_move(&ci->i_dirty_item, &session->s_cap_dirty); + if (!list_empty(&ci->i_flushing_item)) + list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); + spin_unlock(&session->s_mdsc->cap_dirty_lock); } /* * Add a capability under the given MDS session. * - * Caller should hold session snap_rwsem (read) and s_mutex. + * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock * * @fmode is the open file mode, if we are opening a file, otherwise * it is < 0. (This is so we can atomically add the cap and add an @@ -614,7 +629,7 @@ */ void ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u64 cap_id, - int fmode, unsigned issued, unsigned wanted, + unsigned issued, unsigned wanted, unsigned seq, unsigned mseq, u64 realmino, int flags, struct ceph_cap **new_cap) { @@ -623,16 +638,16 @@ struct ceph_cap *cap; int mds = session->s_mds; int actual_wanted; + u32 gen; + + lockdep_assert_held(&ci->i_ceph_lock); dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, session->s_mds, cap_id, ceph_cap_string(issued), seq); - /* - * If we are opening the file, include file mode wanted bits - * in wanted. - */ - if (fmode >= 0) - wanted |= ceph_caps_for_mode(fmode); + spin_lock(&session->s_gen_ttl_lock); + gen = session->s_cap_gen; + spin_unlock(&session->s_gen_ttl_lock); cap = __get_cap_for_mds(ci, mds); if (!cap) { @@ -653,8 +668,16 @@ spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_caps); session->s_nr_caps++; + atomic64_inc(&mdsc->metric.total_caps); spin_unlock(&session->s_cap_lock); } else { + spin_lock(&session->s_cap_lock); + list_move_tail(&cap->session_caps, &session->s_caps); + spin_unlock(&session->s_cap_lock); + + if (cap->cap_gen < gen) + cap->issued = cap->implemented = CEPH_CAP_PIN; + /* * auth mds of the inode changed. we received the cap export * message, but still haven't received the cap import message. @@ -726,6 +749,9 @@ if (flags & CEPH_CAP_FLAG_AUTH) { if (!ci->i_auth_cap || ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { + if (ci->i_auth_cap && + ci->i_auth_cap->session != cap->session) + change_auth_cap_ses(ci, cap->session); ci->i_auth_cap = cap; cap->mds_wanted = wanted; } @@ -746,10 +772,7 @@ cap->seq = seq; cap->issue_seq = seq; cap->mseq = mseq; - cap->cap_gen = session->s_cap_gen; - - if (fmode >= 0) - __ceph_get_fmode(ci, fmode); + cap->cap_gen = gen; } /* @@ -864,8 +887,8 @@ int have = ci->i_snap_caps; if ((have & mask) == mask) { - dout("__ceph_caps_issued_mask %p snap issued %s" - " (mask %s)\n", &ci->vfs_inode, + dout("__ceph_caps_issued_mask ino 0x%llx snap issued %s" + " (mask %s)\n", ceph_ino(&ci->vfs_inode), ceph_cap_string(have), ceph_cap_string(mask)); return 1; @@ -876,8 +899,8 @@ if (!__cap_is_valid(cap)) continue; if ((cap->issued & mask) == mask) { - dout("__ceph_caps_issued_mask %p cap %p issued %s" - " (mask %s)\n", &ci->vfs_inode, cap, + dout("__ceph_caps_issued_mask ino 0x%llx cap %p issued %s" + " (mask %s)\n", ceph_ino(&ci->vfs_inode), cap, ceph_cap_string(cap->issued), ceph_cap_string(mask)); if (touch) @@ -888,8 +911,8 @@ /* does a combination of caps satisfy mask? */ have |= cap->issued; if ((have & mask) == mask) { - dout("__ceph_caps_issued_mask %p combo issued %s" - " (mask %s)\n", &ci->vfs_inode, + dout("__ceph_caps_issued_mask ino 0x%llx combo issued %s" + " (mask %s)\n", ceph_ino(&ci->vfs_inode), ceph_cap_string(cap->issued), ceph_cap_string(mask)); if (touch) { @@ -903,7 +926,8 @@ ci_node); if (!__cap_is_valid(cap)) continue; - __touch_cap(cap); + if (cap->issued & mask) + __touch_cap(cap); } } return 1; @@ -911,6 +935,20 @@ } return 0; +} + +int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask, + int touch) +{ + struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); + int r; + + r = __ceph_caps_issued_mask(ci, mask, touch); + if (r) + ceph_update_cap_hit(&fsc->mdsc->metric); + else + ceph_update_cap_mis(&fsc->mdsc->metric); + return r; } /* @@ -952,29 +990,97 @@ if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; if (ci->i_rdcache_ref || - (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ + (S_ISREG(ci->vfs_inode.i_mode) && ci->vfs_inode.i_data.nrpages)) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; if (ci->i_wb_ref || ci->i_wrbuffer_ref) used |= CEPH_CAP_FILE_BUFFER; + if (ci->i_fx_ref) + used |= CEPH_CAP_FILE_EXCL; return used; } + +#define FMODE_WAIT_BIAS 1000 /* * wanted, by virtue of open file modes */ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) { - int i, bits = 0; - for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { - if (ci->i_nr_by_mode[i]) - bits |= 1 << i; + const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN); + const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD); + const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR); + const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY); + struct ceph_mount_options *opt = + ceph_inode_to_client(&ci->vfs_inode)->mount_options; + unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ; + unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ; + + if (S_ISDIR(ci->vfs_inode.i_mode)) { + int want = 0; + + /* use used_cutoff here, to keep dir's wanted caps longer */ + if (ci->i_nr_by_mode[RD_SHIFT] > 0 || + time_after(ci->i_last_rd, used_cutoff)) + want |= CEPH_CAP_ANY_SHARED; + + if (ci->i_nr_by_mode[WR_SHIFT] > 0 || + time_after(ci->i_last_wr, used_cutoff)) { + want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; + if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS) + want |= CEPH_CAP_ANY_DIR_OPS; + } + + if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0) + want |= CEPH_CAP_PIN; + + return want; + } else { + int bits = 0; + + if (ci->i_nr_by_mode[RD_SHIFT] > 0) { + if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS || + time_after(ci->i_last_rd, used_cutoff)) + bits |= 1 << RD_SHIFT; + } else if (time_after(ci->i_last_rd, idle_cutoff)) { + bits |= 1 << RD_SHIFT; + } + + if (ci->i_nr_by_mode[WR_SHIFT] > 0) { + if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS || + time_after(ci->i_last_wr, used_cutoff)) + bits |= 1 << WR_SHIFT; + } else if (time_after(ci->i_last_wr, idle_cutoff)) { + bits |= 1 << WR_SHIFT; + } + + /* check lazyio only when read/write is wanted */ + if ((bits & (CEPH_FILE_MODE_RDWR << 1)) && + ci->i_nr_by_mode[LAZY_SHIFT] > 0) + bits |= 1 << LAZY_SHIFT; + + return bits ? ceph_caps_for_mode(bits >> 1) : 0; } - if (bits == 0) - return 0; - return ceph_caps_for_mode(bits >> 1); +} + +/* + * wanted, by virtue of open file modes AND cap refs (buffered/cached data) + */ +int __ceph_caps_wanted(struct ceph_inode_info *ci) +{ + int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci); + if (S_ISDIR(ci->vfs_inode.i_mode)) { + /* we want EXCL if holding caps of dir ops */ + if (w & CEPH_CAP_ANY_DIR_OPS) + w |= CEPH_CAP_FILE_EXCL; + } else { + /* we want EXCL if dirty data */ + if (w & CEPH_CAP_FILE_BUFFER) + w |= CEPH_CAP_FILE_EXCL; + } + return w; } /* @@ -998,26 +1104,13 @@ return mds_wanted; } -/* - * called under i_ceph_lock - */ -static int __ceph_is_single_caps(struct ceph_inode_info *ci) -{ - return rb_first(&ci->i_caps) == rb_last(&ci->i_caps); -} - -static int __ceph_is_any_caps(struct ceph_inode_info *ci) -{ - return !RB_EMPTY_ROOT(&ci->i_caps); -} - int ceph_is_any_caps(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); int ret; spin_lock(&ci->i_ceph_lock); - ret = __ceph_is_any_caps(ci); + ret = __ceph_is_any_real_caps(ci); spin_unlock(&ci->i_ceph_lock); return ret; @@ -1062,8 +1155,10 @@ /* remove from inode's cap rbtree, and clear auth cap */ rb_erase(&cap->ci_node, &ci->i_caps); - if (ci->i_auth_cap == cap) + if (ci->i_auth_cap == cap) { + WARN_ON_ONCE(!list_empty(&ci->i_dirty_item)); ci->i_auth_cap = NULL; + } /* remove from session list */ spin_lock(&session->s_cap_lock); @@ -1074,6 +1169,7 @@ } else { list_del_init(&cap->session_caps); session->s_nr_caps--; + atomic64_dec(&mdsc->metric.total_caps); cap->session = NULL; removed = 1; } @@ -1088,9 +1184,7 @@ (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { cap->queue_release = 1; if (removed) { - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; + __ceph_queue_cap_release(session, cap); removed = 0; } } else { @@ -1103,15 +1197,16 @@ if (removed) ceph_put_cap(mdsc, cap); - /* when reconnect denied, we remove session caps forcibly, - * i_wr_ref can be non-zero. If there are ongoing write, - * keep i_snap_realm. - */ - if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm) - drop_inode_snap_realm(ci); + if (!__ceph_is_any_real_caps(ci)) { + /* when reconnect denied, we remove session caps forcibly, + * i_wr_ref can be non-zero. If there are ongoing write, + * keep i_snap_realm. + */ + if (ci->i_wr_ref == 0 && ci->i_snap_realm) + drop_inode_snap_realm(ci); - if (!__ceph_is_any_real_caps(ci)) __cap_delay_cancel(mdsc, ci); + } } struct cap_msg_args { @@ -1119,8 +1214,10 @@ u64 ino, cid, follows; u64 flush_tid, oldest_flush_tid, size, max_size; u64 xattr_version; + u64 change_attr; struct ceph_buffer *xattr_buf; - struct timespec64 atime, mtime, ctime; + struct ceph_buffer *old_xattr_buf; + struct timespec64 atime, mtime, ctime, btime; int op, caps, wanted, dirty; u32 seq, issue_seq, mseq, time_warp_seq; u32 flags; @@ -1128,39 +1225,30 @@ kgid_t gid; umode_t mode; bool inline_data; + bool wake; }; /* - * Build and send a cap message to the given MDS. - * - * Caller should be holding s_mutex. + * cap struct size + flock buffer size + inline version + inline data size + + * osd_epoch_barrier + oldest_flush_tid */ -static int send_cap_msg(struct cap_msg_args *arg) +#define CAP_MSG_SIZE (sizeof(struct ceph_mds_caps) + \ + 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4) + +/* Marshal up the cap msg to the MDS */ +static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg) { struct ceph_mds_caps *fc; - struct ceph_msg *msg; void *p; - size_t extra_len; - struct timespec64 zerotime = {0}; struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; - dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" - " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" - " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), - arg->cid, arg->ino, ceph_cap_string(arg->caps), - ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), - arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, - arg->mseq, arg->follows, arg->size, arg->max_size, - arg->xattr_version, + dout("%s %s %llx %llx caps %s wanted %s dirty %s seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu xattr_ver %llu xattr_len %d\n", + __func__, ceph_cap_op_name(arg->op), arg->cid, arg->ino, + ceph_cap_string(arg->caps), ceph_cap_string(arg->wanted), + ceph_cap_string(arg->dirty), arg->seq, arg->issue_seq, + arg->flush_tid, arg->oldest_flush_tid, arg->mseq, arg->follows, + arg->size, arg->max_size, arg->xattr_version, arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); - - /* flock buffer size + inline version + inline data size + - * osd_epoch_barrier + oldest_flush_tid */ - extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, - GFP_NOFS, false); - if (!msg) - return -ENOMEM; msg->hdr.version = cpu_to_le16(10); msg->hdr.tid = cpu_to_le64(arg->flush_tid); @@ -1226,29 +1314,20 @@ /* pool namespace (version 8) (mds always ignores this) */ ceph_encode_32(&p, 0); - /* - * btime and change_attr (version 9) - * - * We just zero these out for now, as the MDS ignores them unless - * the requisite feature flags are set (which we don't do yet). - */ - ceph_encode_timespec64(p, &zerotime); + /* btime and change_attr (version 9) */ + ceph_encode_timespec64(p, &arg->btime); p += sizeof(struct ceph_timespec); - ceph_encode_64(&p, 0); + ceph_encode_64(&p, arg->change_attr); /* Advisory flags (version 10) */ ceph_encode_32(&p, arg->flags); - - ceph_con_send(&arg->session->s_con, msg); - return 0; } /* * Queue cap releases when an inode is dropped from our cache. */ -void ceph_queue_caps_release(struct inode *inode) +void __ceph_remove_caps(struct ceph_inode_info *ci) { - struct ceph_inode_info *ci = ceph_inode(inode); struct rb_node *p; /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU) @@ -1264,141 +1343,133 @@ } /* - * Send a cap msg on the given inode. Update our caps state, then - * drop i_ceph_lock and send the message. + * Prepare to send a cap message to an MDS. Update the cap state, and populate + * the arg struct with the parameters that will need to be sent. This should + * be done under the i_ceph_lock to guard against changes to cap state. * * Make note of max_size reported/requested from mds, revoked caps * that have now been implemented. - * - * Make half-hearted attempt ot to invalidate page cache if we are - * dropping RDCACHE. Note that this will leave behind locked pages - * that we'll then need to deal with elsewhere. - * - * Return non-zero if delayed release, or we experienced an error - * such that the caller should requeue + retry later. - * - * called with i_ceph_lock, then drops it. - * caller should hold snap_rwsem (read), s_mutex. */ -static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, bool sync, int used, int want, int retain, - int flushing, u64 flush_tid, u64 oldest_flush_tid) - __releases(cap->ci->i_ceph_lock) +static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap, + int op, int flags, int used, int want, int retain, + int flushing, u64 flush_tid, u64 oldest_flush_tid) { struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->vfs_inode; - struct ceph_buffer *old_blob = NULL; - struct cap_msg_args arg; int held, revoking; - int wake = 0; - int delayed = 0; - int ret; + + lockdep_assert_held(&ci->i_ceph_lock); held = cap->issued | cap->implemented; revoking = cap->implemented & ~cap->issued; retain &= ~revoking; - dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n", - inode, cap, cap->session, + dout("%s %p cap %p session %p %s -> %s (revoking %s)\n", + __func__, inode, cap, cap->session, ceph_cap_string(held), ceph_cap_string(held & retain), ceph_cap_string(revoking)); BUG_ON((retain & CEPH_CAP_PIN) == 0); - arg.session = cap->session; - - /* don't release wanted unless we've waited a bit. */ - if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && - time_before(jiffies, ci->i_hold_caps_min)) { - dout(" delaying issued %s -> %s, wanted %s -> %s on send\n", - ceph_cap_string(cap->issued), - ceph_cap_string(cap->issued & retain), - ceph_cap_string(cap->mds_wanted), - ceph_cap_string(want)); - want |= cap->mds_wanted; - retain |= cap->issued; - delayed = 1; - } - ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); - if (want & ~cap->mds_wanted) { - /* user space may open/close single file frequently. - * This avoids droping mds_wanted immediately after - * requesting new mds_wanted. - */ - __cap_set_timeouts(mdsc, ci); - } + ci->i_ceph_flags &= ~CEPH_I_FLUSH; cap->issued &= retain; /* drop bits we don't want */ - if (cap->implemented & ~cap->issued) { - /* - * Wake up any waiters on wanted -> needed transition. - * This is due to the weird transition from buffered - * to sync IO... we need to flush dirty pages _before_ - * allowing sync writes to avoid reordering. - */ - wake = 1; - } + /* + * Wake up any waiters on wanted -> needed transition. This is due to + * the weird transition from buffered to sync IO... we need to flush + * dirty pages _before_ allowing sync writes to avoid reordering. + */ + arg->wake = cap->implemented & ~cap->issued; cap->implemented &= cap->issued | used; cap->mds_wanted = want; - arg.ino = ceph_vino(inode).ino; - arg.cid = cap->cap_id; - arg.follows = flushing ? ci->i_head_snapc->seq : 0; - arg.flush_tid = flush_tid; - arg.oldest_flush_tid = oldest_flush_tid; + arg->session = cap->session; + arg->ino = ceph_vino(inode).ino; + arg->cid = cap->cap_id; + arg->follows = flushing ? ci->i_head_snapc->seq : 0; + arg->flush_tid = flush_tid; + arg->oldest_flush_tid = oldest_flush_tid; - arg.size = inode->i_size; - ci->i_reported_size = arg.size; - arg.max_size = ci->i_wanted_max_size; - ci->i_requested_max_size = arg.max_size; + arg->size = inode->i_size; + ci->i_reported_size = arg->size; + arg->max_size = ci->i_wanted_max_size; + if (cap == ci->i_auth_cap) { + if (want & CEPH_CAP_ANY_FILE_WR) + ci->i_requested_max_size = arg->max_size; + else + ci->i_requested_max_size = 0; + } if (flushing & CEPH_CAP_XATTR_EXCL) { - old_blob = __ceph_build_xattrs_blob(ci); - arg.xattr_version = ci->i_xattrs.version; - arg.xattr_buf = ci->i_xattrs.blob; + arg->old_xattr_buf = __ceph_build_xattrs_blob(ci); + arg->xattr_version = ci->i_xattrs.version; + arg->xattr_buf = ci->i_xattrs.blob; } else { - arg.xattr_buf = NULL; + arg->xattr_buf = NULL; + arg->old_xattr_buf = NULL; } - arg.mtime = inode->i_mtime; - arg.atime = inode->i_atime; - arg.ctime = inode->i_ctime; + arg->mtime = inode->i_mtime; + arg->atime = inode->i_atime; + arg->ctime = inode->i_ctime; + arg->btime = ci->i_btime; + arg->change_attr = inode_peek_iversion_raw(inode); - arg.op = op; - arg.caps = cap->implemented; - arg.wanted = want; - arg.dirty = flushing; + arg->op = op; + arg->caps = cap->implemented; + arg->wanted = want; + arg->dirty = flushing; - arg.seq = cap->seq; - arg.issue_seq = cap->issue_seq; - arg.mseq = cap->mseq; - arg.time_warp_seq = ci->i_time_warp_seq; + arg->seq = cap->seq; + arg->issue_seq = cap->issue_seq; + arg->mseq = cap->mseq; + arg->time_warp_seq = ci->i_time_warp_seq; - arg.uid = inode->i_uid; - arg.gid = inode->i_gid; - arg.mode = inode->i_mode; + arg->uid = inode->i_uid; + arg->gid = inode->i_gid; + arg->mode = inode->i_mode; - arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; - if (list_empty(&ci->i_cap_snaps)) - arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP; - else - arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP; - if (sync) - arg.flags |= CEPH_CLIENT_CAPS_SYNC; + arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) && + !list_empty(&ci->i_cap_snaps)) { + struct ceph_cap_snap *capsnap; + list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) { + if (capsnap->cap_flush.tid) + break; + if (capsnap->need_flush) { + flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP; + break; + } + } + } + arg->flags = flags; +} - spin_unlock(&ci->i_ceph_lock); +/* + * Send a cap msg on the given inode. + * + * Caller should hold snap_rwsem (read), s_mutex. + */ +static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci) +{ + struct ceph_msg *msg; + struct inode *inode = &ci->vfs_inode; - ceph_buffer_put(old_blob); - - ret = send_cap_msg(&arg); - if (ret < 0) { - dout("error sending cap msg, must requeue %p\n", inode); - delayed = 1; + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false); + if (!msg) { + pr_err("error allocating cap msg: ino (%llx.%llx) flushing %s tid %llu, requeuing cap.\n", + ceph_vinop(inode), ceph_cap_string(arg->dirty), + arg->flush_tid); + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(arg->session->s_mdsc, ci); + spin_unlock(&ci->i_ceph_lock); + return; } - if (wake) + encode_cap_msg(msg, arg); + ceph_con_send(&arg->session->s_con, msg); + ceph_buffer_put(arg->old_xattr_buf); + if (arg->wake) wake_up_all(&ci->i_cap_wq); - - return delayed; } static inline int __send_flush_snap(struct inode *inode, @@ -1407,6 +1478,11 @@ u32 mseq, u64 oldest_flush_tid) { struct cap_msg_args arg; + struct ceph_msg *msg; + + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, CAP_MSG_SIZE, GFP_NOFS, false); + if (!msg) + return -ENOMEM; arg.session = session; arg.ino = ceph_vino(inode).ino; @@ -1419,10 +1495,13 @@ arg.max_size = 0; arg.xattr_version = capsnap->xattr_version; arg.xattr_buf = capsnap->xattr_blob; + arg.old_xattr_buf = NULL; arg.atime = capsnap->atime; arg.mtime = capsnap->mtime; arg.ctime = capsnap->ctime; + arg.btime = capsnap->btime; + arg.change_attr = capsnap->change_attr; arg.op = CEPH_CAP_OP_FLUSHSNAP; arg.caps = capsnap->issued; @@ -1440,8 +1519,11 @@ arg.inline_data = capsnap->inline_data; arg.flags = 0; + arg.wake = false; - return send_cap_msg(&arg); + encode_cap_msg(msg, &arg); + ceph_con_send(&arg.session->s_con, msg); + return 0; } /* @@ -1554,6 +1636,7 @@ struct inode *inode = &ci->vfs_inode; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_session *session = NULL; + bool need_put = false; int mds; dout("ceph_flush_snaps %p\n", inode); @@ -1590,10 +1673,8 @@ } // make sure flushsnap messages are sent in proper order. - if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) __kick_flushing_caps(mdsc, session, ci, 0); - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; - } __ceph_flush_snaps(ci, session); out: @@ -1607,8 +1688,13 @@ } /* we flushed them all; remove this inode from the queue */ spin_lock(&mdsc->snap_flush_lock); + if (!list_empty(&ci->i_snap_flush_item)) + need_put = true; list_del_init(&ci->i_snap_flush_item); spin_unlock(&mdsc->snap_flush_lock); + + if (need_put) + iput(inode); } /* @@ -1625,6 +1711,8 @@ int was = ci->i_dirty_caps; int dirty = 0; + lockdep_assert_held(&ci->i_ceph_lock); + if (!ci->i_auth_cap) { pr_warn("__mark_dirty_caps %p %llx mask %s, " "but no auth cap (session was closed?)\n", @@ -1637,6 +1725,8 @@ ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { + struct ceph_mds_session *session = ci->i_auth_cap->session; + WARN_ON_ONCE(ci->i_prealloc_cap_flush); swap(ci->i_prealloc_cap_flush, *pcf); @@ -1649,7 +1739,7 @@ &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); spin_lock(&mdsc->cap_dirty_lock); - list_add(&ci->i_dirty_item, &mdsc->cap_dirty); + list_add(&ci->i_dirty_item, &session->s_cap_dirty); spin_unlock(&mdsc->cap_dirty_lock); if (ci->i_flushing_caps == 0) { ihold(inode); @@ -1668,7 +1758,14 @@ struct ceph_cap_flush *ceph_alloc_cap_flush(void) { - return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); + struct ceph_cap_flush *cf; + + cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); + if (!cf) + return NULL; + + cf->is_capsnap = false; + return cf; } void ceph_free_cap_flush(struct ceph_cap_flush *cf) @@ -1692,30 +1789,33 @@ * Remove cap_flush from the mdsc's or inode's flushing cap list. * Return true if caller needs to wake up flush waiters. */ -static bool __finish_cap_flush(struct ceph_mds_client *mdsc, - struct ceph_inode_info *ci, - struct ceph_cap_flush *cf) +static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc, + struct ceph_cap_flush *cf) { struct ceph_cap_flush *prev; bool wake = cf->wake; - if (mdsc) { - /* are there older pending cap flushes? */ - if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { - prev = list_prev_entry(cf, g_list); - prev->wake = true; - wake = false; - } - list_del(&cf->g_list); - } else if (ci) { - if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { - prev = list_prev_entry(cf, i_list); - prev->wake = true; - wake = false; - } - list_del(&cf->i_list); - } else { - BUG_ON(1); + + if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { + prev = list_prev_entry(cf, g_list); + prev->wake = true; + wake = false; } + list_del_init(&cf->g_list); + return wake; +} + +static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct ceph_cap_flush *prev; + bool wake = cf->wake; + + if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { + prev = list_prev_entry(cf, i_list); + prev->wake = true; + wake = false; + } + list_del_init(&cf->i_list); return wake; } @@ -1723,17 +1823,18 @@ * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. * - * Called under i_ceph_lock. + * Called under i_ceph_lock. Returns the flush tid. */ -static int __mark_caps_flushing(struct inode *inode, +static u64 __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session, bool wake, - u64 *flush_tid, u64 *oldest_flush_tid) + u64 *oldest_flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap_flush *cf = NULL; int flushing; + lockdep_assert_held(&ci->i_ceph_lock); BUG_ON(ci->i_dirty_caps == 0); BUG_ON(list_empty(&ci->i_dirty_item)); BUG_ON(!ci->i_prealloc_cap_flush); @@ -1766,8 +1867,7 @@ list_add_tail(&cf->i_list, &ci->i_cap_flush_list); - *flush_tid = cf->tid; - return flushing; + return cf->tid; } /* @@ -1817,8 +1917,6 @@ * versus held caps. Release, flush, ack revoked caps to mds as * appropriate. * - * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay - * cap release further. * CHECK_CAPS_AUTHONLY - we should only check the auth cap * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without * further delay. @@ -1826,9 +1924,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session) { - struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); - struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_cap *cap; u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; @@ -1837,48 +1934,53 @@ int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ struct rb_node *p; - int delayed = 0, sent = 0; - bool no_delay = flags & CHECK_CAPS_NODELAY; bool queue_invalidate = false; bool tried_invalidate = false; - /* if we are unmounting, flush any unused caps immediately. */ - if (mdsc->stopping) - no_delay = true; - spin_lock(&ci->i_ceph_lock); - if (ci->i_ceph_flags & CEPH_I_FLUSH) flags |= CHECK_CAPS_FLUSH; - - if (!(flags & CHECK_CAPS_AUTHONLY) || - (ci->i_auth_cap && __ceph_is_single_caps(ci))) - __cap_delay_cancel(mdsc, ci); goto retry_locked; retry: spin_lock(&ci->i_ceph_lock); retry_locked: + /* Caps wanted by virtue of active open files. */ file_wanted = __ceph_caps_file_wanted(ci); + + /* Caps which have active references against them */ used = __ceph_caps_used(ci); + + /* + * "issued" represents the current caps that the MDS wants us to have. + * "implemented" is the set that we have been granted, and includes the + * ones that have not yet been returned to the MDS (the "revoking" set, + * usually because they have outstanding references). + */ issued = __ceph_caps_issued(ci, &implemented); revoking = implemented & ~issued; want = file_wanted; + + /* The ones we currently want to retain (may be adjusted below) */ retain = file_wanted | used | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { if (file_wanted) { retain |= CEPH_CAP_ANY; /* be greedy */ } else if (S_ISDIR(inode->i_mode) && (issued & CEPH_CAP_FILE_SHARED) && - __ceph_dir_is_complete(ci)) { + __ceph_dir_is_complete(ci)) { /* * If a directory is complete, we want to keep * the exclusive cap. So that MDS does not end up * revoking the shared cap on every create/unlink * operation. */ - want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; + if (IS_RDONLY(inode)) { + want = CEPH_CAP_ANY_SHARED; + } else { + want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; + } retain |= want; } else { @@ -1894,14 +1996,13 @@ } dout("check_caps %p file_want %s used %s dirty %s flushing %s" - " issued %s revoking %s retain %s %s%s%s\n", inode, + " issued %s revoking %s retain %s %s%s\n", inode, ceph_cap_string(file_wanted), ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps), ceph_cap_string(ci->i_flushing_caps), ceph_cap_string(issued), ceph_cap_string(revoking), ceph_cap_string(retain), (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "", - (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "", (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : ""); /* @@ -1909,8 +2010,8 @@ * have cached pages, but don't want them, then try to invalidate. * If we fail, it's because pages are locked.... try again later. */ - if ((!no_delay || mdsc->stopping) && - !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ + if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) && + S_ISREG(inode->i_mode) && !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ inode->i_data.nrpages && /* have cached pages */ (revoking & (CEPH_CAP_FILE_CACHE| @@ -1927,6 +2028,9 @@ } for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { + int mflags = 0; + struct cap_msg_args arg; + cap = rb_entry(p, struct ceph_cap, ci_node); /* avoid looping forever */ @@ -1936,6 +2040,10 @@ /* NOTE: no side-effects allowed, until we take s_mutex */ + /* + * If we have an auth cap, we don't need to consider any + * overlapping caps as used. + */ cap_used = used; if (ci->i_auth_cap && cap != ci->i_auth_cap) cap_used &= ~ci->i_auth_cap->issued; @@ -1990,31 +2098,10 @@ } /* things we might delay */ - if ((cap->issued & ~retain) == 0 && - cap->mds_wanted == want) + if ((cap->issued & ~retain) == 0) continue; /* nope, all good */ - if (no_delay) - goto ack; - - /* delay? */ - if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && - time_before(jiffies, ci->i_hold_caps_max)) { - dout(" delaying issued %s -> %s, wanted %s -> %s\n", - ceph_cap_string(cap->issued), - ceph_cap_string(cap->issued & retain), - ceph_cap_string(cap->mds_wanted), - ceph_cap_string(want)); - delayed++; - continue; - } - ack: - if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { - dout(" skipping %p I_NOFLUSH set\n", inode); - continue; - } - if (session && session != cap->session) { dout("oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); @@ -2052,10 +2139,8 @@ if (cap == ci->i_auth_cap && (ci->i_ceph_flags & (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { - if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) __kick_flushing_caps(mdsc, session, ci, 0); - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; - } if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) __ceph_flush_snaps(ci, session); @@ -2076,9 +2161,12 @@ } if (cap == ci->i_auth_cap && ci->i_dirty_caps) { - flushing = __mark_caps_flushing(inode, session, false, - &flush_tid, - &oldest_flush_tid); + flushing = ci->i_dirty_caps; + flush_tid = __mark_caps_flushing(inode, session, false, + &oldest_flush_tid); + if (flags & CHECK_CAPS_FLUSH && + list_empty(&session->s_cap_dirty)) + mflags |= CEPH_CLIENT_CAPS_SYNC; } else { flushing = 0; flush_tid = 0; @@ -2088,18 +2176,23 @@ } mds = cap->mds; /* remember mds, so we don't repeat */ - sent++; - /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, - cap_used, want, retain, flushing, - flush_tid, oldest_flush_tid); + __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used, + want, retain, flushing, flush_tid, oldest_flush_tid); + spin_unlock(&ci->i_ceph_lock); + + __send_cap(&arg, ci); + goto retry; /* retake i_ceph_lock and restart our cap scan. */ } - /* Reschedule delayed caps release if we delayed anything */ - if (delayed) + /* periodically re-calculate caps wanted by open files */ + if (__ceph_is_any_real_caps(ci) && + list_empty(&ci->i_cap_delay_list) && + (file_wanted & ~CEPH_CAP_PIN) && + !(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { __cap_delay_requeue(mdsc, ci); + } spin_unlock(&ci->i_ceph_lock); @@ -2125,18 +2218,12 @@ retry: spin_lock(&ci->i_ceph_lock); - if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { - spin_unlock(&ci->i_ceph_lock); - dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); - goto out; - } +retry_locked: if (ci->i_dirty_caps && ci->i_auth_cap) { struct ceph_cap *cap = ci->i_auth_cap; - int used = __ceph_caps_used(ci); - int want = __ceph_caps_wanted(ci); - int delayed; + struct cap_msg_args arg; - if (!session || session != cap->session) { + if (session != cap->session) { spin_unlock(&ci->i_ceph_lock); if (session) mutex_unlock(&session->s_mutex); @@ -2149,19 +2236,26 @@ goto out; } - flushing = __mark_caps_flushing(inode, session, true, - &flush_tid, &oldest_flush_tid); - - /* __send_cap drops i_ceph_lock */ - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, - used, want, (cap->issued | cap->implemented), - flushing, flush_tid, oldest_flush_tid); - - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); + if (ci->i_ceph_flags & + (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) { + if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) + __kick_flushing_caps(mdsc, session, ci, 0); + if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) + __ceph_flush_snaps(ci, session); + goto retry_locked; } + + flushing = ci->i_dirty_caps; + flush_tid = __mark_caps_flushing(inode, session, true, + &oldest_flush_tid); + + __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC, + __ceph_caps_used(ci), __ceph_caps_wanted(ci), + (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); + spin_unlock(&ci->i_ceph_lock); + + __send_cap(&arg, ci); } else { if (!list_empty(&ci->i_cap_flush_list)) { struct ceph_cap_flush *cf = @@ -2206,6 +2300,7 @@ */ static int unsafe_request_wait(struct inode *inode) { + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_request *req1 = NULL, *req2 = NULL; int ret, err = 0; @@ -2225,6 +2320,76 @@ } spin_unlock(&ci->i_unsafe_lock); + /* + * Trigger to flush the journal logs in all the relevant MDSes + * manually, or in the worst case we must wait at most 5 seconds + * to wait the journal logs to be flushed by the MDSes periodically. + */ + if (req1 || req2) { + struct ceph_mds_request *req; + struct ceph_mds_session **sessions; + struct ceph_mds_session *s; + unsigned int max_sessions; + int i; + + mutex_lock(&mdsc->mutex); + max_sessions = mdsc->max_sessions; + + sessions = kcalloc(max_sessions, sizeof(s), GFP_KERNEL); + if (!sessions) { + mutex_unlock(&mdsc->mutex); + err = -ENOMEM; + goto out; + } + + spin_lock(&ci->i_unsafe_lock); + if (req1) { + list_for_each_entry(req, &ci->i_unsafe_dirops, + r_unsafe_dir_item) { + s = req->r_session; + if (!s) + continue; + if (!sessions[s->s_mds]) { + s = ceph_get_mds_session(s); + sessions[s->s_mds] = s; + } + } + } + if (req2) { + list_for_each_entry(req, &ci->i_unsafe_iops, + r_unsafe_target_item) { + s = req->r_session; + if (!s) + continue; + if (!sessions[s->s_mds]) { + s = ceph_get_mds_session(s); + sessions[s->s_mds] = s; + } + } + } + spin_unlock(&ci->i_unsafe_lock); + + /* the auth MDS */ + spin_lock(&ci->i_ceph_lock); + if (ci->i_auth_cap) { + s = ci->i_auth_cap->session; + if (!sessions[s->s_mds]) + sessions[s->s_mds] = ceph_get_mds_session(s); + } + spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&mdsc->mutex); + + /* send flush mdlog request to MDSes */ + for (i = 0; i < max_sessions; i++) { + s = sessions[i]; + if (s) { + send_flush_mdlog(s); + ceph_put_mds_session(s); + } + } + kfree(sessions); + } + dout("unsafe_request_wait %p wait on tid %llu %llu\n", inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); if (req1) { @@ -2232,15 +2397,19 @@ ceph_timeout_jiffies(req1->r_timeout)); if (ret) err = -EIO; - ceph_mdsc_put_request(req1); } if (req2) { ret = !wait_for_completion_timeout(&req2->r_safe_completion, ceph_timeout_jiffies(req2->r_timeout)); if (ret) err = -EIO; - ceph_mdsc_put_request(req2); } + +out: + if (req1) + ceph_mdsc_put_request(req1); + if (req2) + ceph_mdsc_put_request(req2); return err; } @@ -2249,35 +2418,40 @@ struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); u64 flush_tid; - int ret; + int ret, err; int dirty; dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); ret = file_write_and_wait_range(file, start, end); - if (ret < 0) - goto out; - if (datasync) goto out; - inode_lock(inode); + ret = ceph_wait_on_async_create(inode); + if (ret) + goto out; dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); - ret = unsafe_request_wait(inode); + err = unsafe_request_wait(inode); /* * only wait on non-file metadata writeback (the mds * can recover size and mtime, so we don't need to * wait for that) */ - if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { - ret = wait_event_interruptible(ci->i_cap_wq, + if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { + err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); } - inode_unlock(inode); + + if (err < 0) + ret = err; + + err = file_check_and_advance_wb_err(file); + if (err < 0) + ret = err; out: dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); return ret; @@ -2327,6 +2501,16 @@ struct ceph_cap_flush *cf; int ret; u64 first_tid = 0; + u64 last_snap_flush = 0; + + ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + + list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) { + if (cf->is_capsnap) { + last_snap_flush = cf->tid; + break; + } + } list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { if (cf->tid < first_tid) @@ -2341,22 +2525,20 @@ first_tid = cf->tid + 1; - if (cf->caps) { + if (!cf->is_capsnap) { + struct cap_msg_args arg; + dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, cap, cf->tid, ceph_cap_string(cf->caps)); - ci->i_ceph_flags |= CEPH_I_NODELAY; - ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - false, __ceph_caps_used(ci), + __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, + (cf->tid < last_snap_flush ? + CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0), + __ceph_caps_used(ci), __ceph_caps_wanted(ci), - cap->issued | cap->implemented, + (cap->issued | cap->implemented), cf->caps, cf->tid, oldest_flush_tid); - if (ret) { - pr_err("kick_flushing_caps: error sending " - "cap flush, ino (%llx.%llx) " - "tid %llu flushing %s\n", - ceph_vinop(inode), cf->tid, - ceph_cap_string(cf->caps)); - } + spin_unlock(&ci->i_ceph_lock); + __send_cap(&arg, ci); } else { struct ceph_cap_snap *capsnap = container_of(cf, struct ceph_cap_snap, @@ -2417,7 +2599,12 @@ */ if ((cap->issued & ci->i_flushing_caps) != ci->i_flushing_caps) { - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; + /* encode_caps_cb() also will reset these sequence + * numbers. make sure sequence numbers in cap flush + * message match later reconnect message */ + cap->seq = 0; + cap->issue_seq = 0; + cap->mseq = 0; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); } else { @@ -2435,6 +2622,8 @@ struct ceph_cap *cap; u64 oldest_flush_tid; + lockdep_assert_held(&session->s_mutex); + dout("kick_flushing_caps mds%d\n", session->s_mds); spin_lock(&mdsc->cap_dirty_lock); @@ -2451,7 +2640,6 @@ continue; } if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); } @@ -2459,16 +2647,15 @@ } } -static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session, - struct inode *inode) - __releases(ci->i_ceph_lock) +void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session, + struct ceph_inode_info *ci) { - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap *cap; + struct ceph_mds_client *mdsc = session->s_mdsc; + struct ceph_cap *cap = ci->i_auth_cap; - cap = ci->i_auth_cap; - dout("kick_flushing_inode_caps %p flushing %s\n", inode, + lockdep_assert_held(&ci->i_ceph_lock); + + dout("%s %p flushing %s\n", __func__, &ci->vfs_inode, ceph_cap_string(ci->i_flushing_caps)); if (!list_empty(&ci->i_cap_flush_list)) { @@ -2479,11 +2666,7 @@ oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); - ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); - spin_unlock(&ci->i_ceph_lock); - } else { - spin_unlock(&ci->i_ceph_lock); } } @@ -2491,18 +2674,20 @@ /* * Take references to capabilities we hold, so that we don't release * them to the MDS prematurely. - * - * Protected by i_ceph_lock. */ -static void __take_cap_refs(struct ceph_inode_info *ci, int got, +void ceph_take_cap_refs(struct ceph_inode_info *ci, int got, bool snap_rwsem_locked) { + lockdep_assert_held(&ci->i_ceph_lock); + if (got & CEPH_CAP_PIN) ci->i_pin_ref++; if (got & CEPH_CAP_FILE_RD) ci->i_rd_ref++; if (got & CEPH_CAP_FILE_CACHE) ci->i_rdcache_ref++; + if (got & CEPH_CAP_FILE_EXCL) + ci->i_fx_ref++; if (got & CEPH_CAP_FILE_WR) { if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { BUG_ON(!snap_rwsem_locked); @@ -2515,7 +2700,7 @@ if (ci->i_wb_ref == 0) ihold(&ci->vfs_inode); ci->i_wb_ref++; - dout("__take_cap_refs %p wb %d -> %d (?)\n", + dout("%s %p wb %d -> %d (?)\n", __func__, &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref); } } @@ -2526,15 +2711,26 @@ * to (when applicable), and check against max_size here as well. * Note that caller is responsible for ensuring max_size increases are * requested from the MDS. + * + * Returns 0 if caps were not able to be acquired (yet), 1 if succeed, + * or a negative error code. There are 3 speical error codes: + * -EAGAIN: need to sleep but non-blocking is specified + * -EFBIG: ask caller to call check_max_size() and try again. + * -ESTALE: ask caller to call ceph_renew_caps() and try again. */ -static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - loff_t endoff, bool nonblock, int *got, int *err) +enum { + /* first 8 bits are reserved for CEPH_FILE_MODE_FOO */ + NON_BLOCKING = (1 << 8), + CHECK_FILELOCK = (1 << 9), +}; + +static int try_get_cap_refs(struct inode *inode, int need, int want, + loff_t endoff, int flags, int *got) { - struct inode *inode = &ci->vfs_inode; + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; int ret = 0; int have, implemented; - int file_wanted; bool snap_rwsem_locked = false; dout("get_cap_refs %p need %s want %s\n", inode, @@ -2543,13 +2739,10 @@ again: spin_lock(&ci->i_ceph_lock); - /* make sure file is actually open */ - file_wanted = __ceph_caps_file_wanted(ci); - if ((file_wanted & need) != need) { - dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", - ceph_cap_string(need), ceph_cap_string(file_wanted)); - *err = -EBADF; - ret = 1; + if ((flags & CHECK_FILELOCK) && + (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) { + dout("try_get_cap_refs %p error filelock\n", inode); + ret = -EIO; goto out_unlock; } @@ -2570,10 +2763,8 @@ if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); - if (endoff > ci->i_requested_max_size) { - *err = -EAGAIN; - ret = 1; - } + if (endoff > ci->i_requested_max_size) + ret = ci->i_auth_cap ? -EFBIG : -ESTALE; goto out_unlock; } /* @@ -2607,9 +2798,8 @@ * we can not call down_read() when * task isn't in TASK_RUNNING state */ - if (nonblock) { - *err = -EAGAIN; - ret = 1; + if (flags & NON_BLOCKING) { + ret = -EAGAIN; goto out_unlock; } @@ -2620,57 +2810,63 @@ } snap_rwsem_locked = true; } - *got = need | (have & want); - if ((need & CEPH_CAP_FILE_RD) && + if ((have & want) == want) + *got = need | want; + else + *got = need; + if (S_ISREG(inode->i_mode) && + (need & CEPH_CAP_FILE_RD) && !(*got & CEPH_CAP_FILE_CACHE)) ceph_disable_fscache_readpage(ci); - __take_cap_refs(ci, *got, true); + ceph_take_cap_refs(ci, *got, true); ret = 1; } } else { int session_readonly = false; - if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) { + int mds_wanted; + if (ci->i_auth_cap && + (need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) { struct ceph_mds_session *s = ci->i_auth_cap->session; spin_lock(&s->s_cap_lock); session_readonly = s->s_readonly; spin_unlock(&s->s_cap_lock); } if (session_readonly) { - dout("get_cap_refs %p needed %s but mds%d readonly\n", + dout("get_cap_refs %p need %s but mds%d readonly\n", inode, ceph_cap_string(need), ci->i_auth_cap->mds); - *err = -EROFS; - ret = 1; + ret = -EROFS; goto out_unlock; } - if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { - int mds_wanted; - if (READ_ONCE(mdsc->fsc->mount_state) == - CEPH_MOUNT_SHUTDOWN) { - dout("get_cap_refs %p forced umount\n", inode); - *err = -EIO; - ret = 1; - goto out_unlock; - } - mds_wanted = __ceph_caps_mds_wanted(ci, false); - if (need & ~(mds_wanted & need)) { - dout("get_cap_refs %p caps were dropped" - " (session killed?)\n", inode); - *err = -ESTALE; - ret = 1; - goto out_unlock; - } - if (!(file_wanted & ~mds_wanted)) - ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { + dout("get_cap_refs %p forced umount\n", inode); + ret = -EIO; + goto out_unlock; + } + mds_wanted = __ceph_caps_mds_wanted(ci, false); + if (need & ~mds_wanted) { + dout("get_cap_refs %p need %s > mds_wanted %s\n", + inode, ceph_cap_string(need), + ceph_cap_string(mds_wanted)); + ret = -ESTALE; + goto out_unlock; } - dout("get_cap_refs %p have %s needed %s\n", inode, + dout("get_cap_refs %p have %s need %s\n", inode, ceph_cap_string(have), ceph_cap_string(need)); } out_unlock: + + __ceph_touch_fmode(ci, mdsc, flags); + spin_unlock(&ci->i_ceph_lock); if (snap_rwsem_locked) up_read(&mdsc->snap_rwsem); + + if (!ret) + ceph_update_cap_mis(&mdsc->metric); + else if (ret == 1) + ceph_update_cap_hit(&mdsc->metric); dout("get_cap_refs %p ret %d got %s\n", inode, ret, ceph_cap_string(*got)); @@ -2705,24 +2901,39 @@ ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } -int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) +static inline int get_used_fmode(int caps) { - int ret, err = 0; + int fmode = 0; + if (caps & CEPH_CAP_FILE_RD) + fmode |= CEPH_FILE_MODE_RD; + if (caps & CEPH_CAP_FILE_WR) + fmode |= CEPH_FILE_MODE_WR; + return fmode; +} + +int ceph_try_get_caps(struct inode *inode, int need, int want, + bool nonblock, int *got) +{ + int ret, flags; BUG_ON(need & ~CEPH_CAP_FILE_RD); - BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); - ret = ceph_pool_perm_check(ci, need); - if (ret < 0) - return ret; - - ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); - if (ret) { - if (err == -EAGAIN) { - ret = 0; - } else if (err < 0) { - ret = err; - } + BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO | + CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL | + CEPH_CAP_ANY_DIR_OPS)); + if (need) { + ret = ceph_pool_perm_check(inode, need); + if (ret < 0) + return ret; } + + flags = get_used_fmode(need | want); + if (nonblock) + flags |= NON_BLOCKING; + + ret = try_get_cap_refs(inode, need, want, 0, flags, got); + /* three special error codes */ + if (ret == -EAGAIN || ret == -EFBIG || ret == -ESTALE) + ret = 0; return ret; } @@ -2731,34 +2942,54 @@ * due to a small max_size, make sure we check_max_size (and possibly * ask the mds) so we don't get hung up indefinitely. */ -int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, +int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got, struct page **pinned_page) { - int _got, ret, err = 0; + struct ceph_file_info *fi = filp->private_data; + struct inode *inode = file_inode(filp); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + int ret, _got, flags; - ret = ceph_pool_perm_check(ci, need); + ret = ceph_pool_perm_check(inode, need); if (ret < 0) return ret; - while (true) { - if (endoff > 0) - check_max_size(&ci->vfs_inode, endoff); + if ((fi->fmode & CEPH_FILE_MODE_WR) && + fi->filp_gen != READ_ONCE(fsc->filp_gen)) + return -EBADF; - err = 0; + flags = get_used_fmode(need | want); + + while (true) { + flags &= CEPH_FILE_MODE_MASK; + if (vfs_inode_has_locks(inode)) + flags |= CHECK_FILELOCK; _got = 0; - ret = try_get_cap_refs(ci, need, want, endoff, - false, &_got, &err); - if (ret) { - if (err == -EAGAIN) - continue; - if (err < 0) - ret = err; - } else { + ret = try_get_cap_refs(inode, need, want, endoff, + flags, &_got); + WARN_ON_ONCE(ret == -EAGAIN); + if (!ret) { + struct ceph_mds_client *mdsc = fsc->mdsc; + struct cap_wait cw; DEFINE_WAIT_FUNC(wait, woken_wake_function); + + cw.ino = ceph_ino(inode); + cw.tgid = current->tgid; + cw.need = need; + cw.want = want; + + spin_lock(&mdsc->caps_list_lock); + list_add(&cw.list, &mdsc->cap_wait_list); + spin_unlock(&mdsc->caps_list_lock); + + /* make sure used fmode not timeout */ + ceph_get_fmode(ci, flags, FMODE_WAIT_BIAS); add_wait_queue(&ci->i_cap_wq, &wait); - while (!try_get_cap_refs(ci, need, want, endoff, - true, &_got, &err)) { + flags |= NON_BLOCKING; + while (!(ret = try_get_cap_refs(inode, need, want, + endoff, flags, &_got))) { if (signal_pending(current)) { ret = -ERESTARTSYS; break; @@ -2767,27 +2998,48 @@ } remove_wait_queue(&ci->i_cap_wq, &wait); + ceph_put_fmode(ci, flags, FMODE_WAIT_BIAS); - if (err == -EAGAIN) + spin_lock(&mdsc->caps_list_lock); + list_del(&cw.list); + spin_unlock(&mdsc->caps_list_lock); + + if (ret == -EAGAIN) continue; - if (err < 0) - ret = err; } + + if ((fi->fmode & CEPH_FILE_MODE_WR) && + fi->filp_gen != READ_ONCE(fsc->filp_gen)) { + if (ret >= 0 && _got) + ceph_put_cap_refs(ci, _got); + return -EBADF; + } + if (ret < 0) { - if (err == -ESTALE) { + if (ret == -EFBIG || ret == -ESTALE) { + int ret2 = ceph_wait_on_async_create(inode); + if (ret2 < 0) + return ret2; + } + if (ret == -EFBIG) { + check_max_size(inode, endoff); + continue; + } + if (ret == -ESTALE) { /* session was killed, try renew caps */ - ret = ceph_renew_caps(&ci->vfs_inode); + ret = ceph_renew_caps(inode, flags); if (ret == 0) continue; } return ret; } - if (ci->i_inline_version != CEPH_INLINE_NONE && + if (S_ISREG(ci->vfs_inode.i_mode) && + ci->i_inline_version != CEPH_INLINE_NONE && (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - i_size_read(&ci->vfs_inode) > 0) { + i_size_read(inode) > 0) { struct page *page = - find_get_page(ci->vfs_inode.i_mapping, 0); + find_get_page(inode->i_mapping, 0); if (page) { if (PageUptodate(page)) { *pinned_page = page; @@ -2806,7 +3058,7 @@ * getattr request will bring inline data into * page cache */ - ret = __ceph_do_getattr(&ci->vfs_inode, NULL, + ret = __ceph_do_getattr(inode, NULL, CEPH_STAT_CAP_INLINE_DATA, true); if (ret < 0) @@ -2816,7 +3068,8 @@ break; } - if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE)) + if (S_ISREG(ci->vfs_inode.i_mode) && + (_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE)) ceph_fscache_revalidate_cookie(ci); *got = _got; @@ -2830,7 +3083,7 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) { spin_lock(&ci->i_ceph_lock); - __take_cap_refs(ci, caps, false); + ceph_take_cap_refs(ci, caps, false); spin_unlock(&ci->i_ceph_lock); } @@ -2867,7 +3120,8 @@ * If we are releasing a WR cap (from a sync write), finalize any affected * cap_snap, and wake up any waiters. */ -void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) +static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had, + bool skip_checking_caps) { struct inode *inode = &ci->vfs_inode; int last = 0, put = 0, flushsnaps = 0, wake = 0; @@ -2880,6 +3134,9 @@ last++; if (had & CEPH_CAP_FILE_CACHE) if (--ci->i_rdcache_ref == 0) + last++; + if (had & CEPH_CAP_FILE_EXCL) + if (--ci->i_fx_ref == 0) last++; if (had & CEPH_CAP_FILE_BUFFER) { if (--ci->i_wb_ref == 0) { @@ -2912,7 +3169,7 @@ ci->i_head_snapc = NULL; } /* see comment in __ceph_remove_cap() */ - if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) + if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm) drop_inode_snap_realm(ci); } spin_unlock(&ci->i_ceph_lock); @@ -2920,14 +3177,26 @@ dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), last ? " last" : "", put ? " put" : ""); - if (last && !flushsnaps) - ceph_check_caps(ci, 0, NULL); - else if (flushsnaps) - ceph_flush_snaps(ci, NULL); + if (!skip_checking_caps) { + if (last) + ceph_check_caps(ci, 0, NULL); + else if (flushsnaps) + ceph_flush_snaps(ci, NULL); + } if (wake) wake_up_all(&ci->i_cap_wq); while (put-- > 0) iput(inode); +} + +void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) +{ + __ceph_put_cap_refs(ci, had, false); +} + +void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci, int had) +{ + __ceph_put_cap_refs(ci, had, true); } /* @@ -2977,7 +3246,16 @@ break; } } - BUG_ON(!found); + + if (!found) { + /* + * The capsnap should already be removed when removing + * auth cap in the case of a forced unmount. + */ + WARN_ON_ONCE(ci->i_auth_cap); + goto unlock; + } + capsnap->dirty_pages -= nr; if (capsnap->dirty_pages == 0) { complete_capsnap = true; @@ -2999,17 +3277,20 @@ complete_capsnap ? " (complete capsnap)" : ""); } +unlock: spin_unlock(&ci->i_ceph_lock); if (last) { - ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); + ceph_check_caps(ci, 0, NULL); } else if (flush_snaps) { ceph_flush_snaps(ci, NULL); } if (complete_capsnap) wake_up_all(&ci->i_cap_wq); - while (put-- > 0) - iput(inode); + while (put-- > 0) { + /* avoid calling iput_final() in osd dispatch threads */ + ceph_async_iput(inode); + } } /* @@ -3054,8 +3335,10 @@ bool dirstat_valid; u64 nfiles; u64 nsubdirs; + u64 change_attr; /* currently issued */ int issued; + struct timespec64 btime; }; /* @@ -3079,7 +3362,8 @@ int used, wanted, dirty; u64 size = le64_to_cpu(grant->size); u64 max_size = le64_to_cpu(grant->max_size); - int check_caps = 0; + unsigned char check_caps = 0; + bool was_stale = cap->cap_gen < session->s_cap_gen; bool wake = false; bool writeback = false; bool queue_trunc = false; @@ -3092,6 +3376,28 @@ dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); + + /* + * If CACHE is being revoked, and we have no dirty buffers, + * try to invalidate (once). (If there are dirty buffers, we + * will invalidate _after_ writeback.) + */ + if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */ + ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && + !(ci->i_wrbuffer_ref || ci->i_wb_ref)) { + if (try_nonblocking_invalidate(inode)) { + /* there were locked pages.. invalidate later + in a separate thread. */ + if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { + queue_invalidate = true; + ci->i_rdcache_revoking = ci->i_rdcache_gen; + } + } + } + + if (was_stale) + cap->issued = cap->implemented = CEPH_CAP_PIN; /* * auth mds of the inode changed. we received the cap export message, @@ -3108,36 +3414,20 @@ newcaps |= cap->issued; } - /* - * If CACHE is being revoked, and we have no dirty buffers, - * try to invalidate (once). (If there are dirty buffers, we - * will invalidate _after_ writeback.) - */ - if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ - ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && - (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && - !(ci->i_wrbuffer_ref || ci->i_wb_ref)) { - if (try_nonblocking_invalidate(inode)) { - /* there were locked pages.. invalidate later - in a separate thread. */ - if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { - queue_invalidate = true; - ci->i_rdcache_revoking = ci->i_rdcache_gen; - } - } - } - /* side effects now are allowed */ cap->cap_gen = session->s_cap_gen; cap->seq = seq; __check_cap_issue(ci, cap, newcaps); + inode_set_max_iversion_raw(inode, extra_info->change_attr); + if ((newcaps & CEPH_CAP_AUTH_SHARED) && (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) { inode->i_mode = le32_to_cpu(grant->mode); inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); + ci->i_btime = extra_info->btime; dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, from_kuid(&init_user_ns, inode->i_uid), from_kgid(&init_user_ns, inode->i_gid)); @@ -3164,6 +3454,7 @@ ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); ci->i_xattrs.version = version; ceph_forget_all_cached_acls(inode); + ceph_security_invalidate_secctx(inode); } } @@ -3216,10 +3507,6 @@ ci->i_requested_max_size = 0; } wake = true; - } else if (ci->i_wanted_max_size > ci->i_max_size && - ci->i_wanted_max_size > ci->i_requested_max_size) { - /* CEPH_CAP_OP_IMPORT */ - wake = true; } } @@ -3231,13 +3518,20 @@ ceph_cap_string(wanted), ceph_cap_string(used), ceph_cap_string(dirty)); - if (wanted != le32_to_cpu(grant->wanted)) { - dout("mds wanted %s -> %s\n", - ceph_cap_string(le32_to_cpu(grant->wanted)), - ceph_cap_string(wanted)); - /* imported cap may not have correct mds_wanted */ - if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) - check_caps = 1; + + if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) && + (wanted & ~(cap->mds_wanted | newcaps))) { + /* + * If mds is importing cap, prior cap messages that update + * 'wanted' may get dropped by mds (migrate seq mismatch). + * + * We don't send cap message to update 'wanted' if what we + * want are already issued. If mds revokes caps, cap message + * that releases caps also tells mds what we want. But if + * caps got revoked by mds forcedly (session stale). We may + * haven't told mds what we want. + */ + check_caps = 1; } /* revocation, grant, or no-op? */ @@ -3248,11 +3542,12 @@ ceph_cap_string(cap->issued), ceph_cap_string(newcaps), ceph_cap_string(revoking)); - if (revoking & used & CEPH_CAP_FILE_BUFFER) + if (S_ISREG(inode->i_mode) && + (revoking & used & CEPH_CAP_FILE_BUFFER)) writeback = true; /* initiate writeback; will delay ack */ - else if (revoking == CEPH_CAP_FILE_CACHE && - (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && - queue_invalidate) + else if (queue_invalidate && + revoking == CEPH_CAP_FILE_CACHE && + (newcaps & CEPH_CAP_FILE_LAZYIO) == 0) ; /* do nothing yet, invalidation will be queued */ else if (cap == ci->i_auth_cap) check_caps = 1; /* check auth cap only */ @@ -3279,6 +3574,15 @@ } BUG_ON(cap->issued & ~cap->implemented); + /* don't let check_caps skip sending a response to MDS for revoke msgs */ + if (le32_to_cpu(grant->op) == CEPH_CAP_OP_REVOKE) { + cap->mds_wanted = 0; + if (cap == ci->i_auth_cap) + check_caps = 1; /* check auth cap only */ + else + check_caps = 2; /* check all caps */ + } + if (extra_info->inline_version > 0 && extra_info->inline_version >= ci->i_inline_version) { ci->i_inline_version = extra_info->inline_version; @@ -3288,13 +3592,22 @@ } if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { - if (newcaps & ~extra_info->issued) - wake = true; - kick_flushing_inode_caps(session->s_mdsc, session, inode); + if (ci->i_auth_cap == cap) { + if (newcaps & ~extra_info->issued) + wake = true; + + if (ci->i_requested_max_size > max_size || + !(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) { + /* re-request max_size if necessary */ + ci->i_requested_max_size = 0; + wake = true; + } + + ceph_kick_flushing_inode_caps(session, ci); + } up_read(&session->s_mdsc->snap_rwsem); - } else { - spin_unlock(&ci->i_ceph_lock); } + spin_unlock(&ci->i_ceph_lock); if (fill_inline) ceph_fill_inline_data(inode, NULL, extra_info->inline_data, @@ -3318,10 +3631,10 @@ wake_up_all(&ci->i_cap_wq); if (check_caps == 1) - ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, + ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL, session); else if (check_caps == 2) - ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); + ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session); else mutex_unlock(&session->s_mutex); } @@ -3348,15 +3661,26 @@ bool wake_mdsc = false; list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { + /* Is this the one that was flushed? */ if (cf->tid == flush_tid) cleaned = cf->caps; - if (cf->caps == 0) /* capsnap */ + + /* Is this a capsnap? */ + if (cf->is_capsnap) continue; + if (cf->tid <= flush_tid) { - if (__finish_cap_flush(NULL, ci, cf)) - wake_ci = true; + /* + * An earlier or current tid. The FLUSH_ACK should + * represent a superset of this flush's caps. + */ + wake_ci |= __detach_cap_flush_from_ci(ci, cf); list_add_tail(&cf->i_list, &to_remove); } else { + /* + * This is a later one. Any caps in it are still dirty + * so don't count them as cleaned. + */ cleaned &= ~cf->caps; if (!cleaned) break; @@ -3376,10 +3700,8 @@ spin_lock(&mdsc->cap_dirty_lock); - list_for_each_entry(cf, &to_remove, i_list) { - if (__finish_cap_flush(mdsc, NULL, cf)) - wake_mdsc = true; - } + list_for_each_entry(cf, &to_remove, i_list) + wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf); if (ci->i_flushing_caps == 0) { if (list_empty(&ci->i_cap_flush_list)) { @@ -3417,8 +3739,9 @@ while (!list_empty(&to_remove)) { cf = list_first_entry(&to_remove, struct ceph_cap_flush, i_list); - list_del(&cf->i_list); - ceph_free_cap_flush(cf); + list_del_init(&cf->i_list); + if (!cf->is_capsnap) + ceph_free_cap_flush(cf); } if (wake_ci) @@ -3427,6 +3750,43 @@ wake_up_all(&mdsc->cap_flushing_wq); if (drop) iput(inode); +} + +void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap, + bool *wake_ci, bool *wake_mdsc) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + bool ret; + + lockdep_assert_held(&ci->i_ceph_lock); + + dout("removing capsnap %p, inode %p ci %p\n", capsnap, inode, ci); + + list_del_init(&capsnap->ci_item); + ret = __detach_cap_flush_from_ci(ci, &capsnap->cap_flush); + if (wake_ci) + *wake_ci = ret; + + spin_lock(&mdsc->cap_dirty_lock); + if (list_empty(&ci->i_cap_flush_list)) + list_del_init(&ci->i_flushing_item); + + ret = __detach_cap_flush_from_mdsc(mdsc, &capsnap->cap_flush); + if (wake_mdsc) + *wake_mdsc = ret; + spin_unlock(&mdsc->cap_dirty_lock); +} + +void ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap, + bool *wake_ci, bool *wake_mdsc) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + lockdep_assert_held(&ci->i_ceph_lock); + + WARN_ON_ONCE(capsnap->dirty_pages || capsnap->writing); + __ceph_remove_capsnap(inode, capsnap, wake_ci, wake_mdsc); } /* @@ -3466,25 +3826,10 @@ capsnap, capsnap->follows); } } - if (flushed) { - WARN_ON(capsnap->dirty_pages || capsnap->writing); - dout(" removing %p cap_snap %p follows %lld\n", - inode, capsnap, follows); - list_del(&capsnap->ci_item); - if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) - wake_ci = true; - - spin_lock(&mdsc->cap_dirty_lock); - - if (list_empty(&ci->i_cap_flush_list)) - list_del_init(&ci->i_flushing_item); - - if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) - wake_mdsc = true; - - spin_unlock(&mdsc->cap_dirty_lock); - } + if (flushed) + ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc); spin_unlock(&ci->i_ceph_lock); + if (flushed) { ceph_put_snap_context(capsnap->context); ceph_put_cap_snap(capsnap); @@ -3501,10 +3846,9 @@ * * caller hold s_mutex. */ -static void handle_cap_trunc(struct inode *inode, +static bool handle_cap_trunc(struct inode *inode, struct ceph_mds_caps *trunc, struct ceph_mds_session *session) - __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -3515,7 +3859,9 @@ int implemented = 0; int dirty = __ceph_caps_dirty(ci); int issued = __ceph_caps_issued(ceph_inode(inode), &implemented); - int queue_trunc = 0; + bool queue_trunc = false; + + lockdep_assert_held(&ci->i_ceph_lock); issued |= implemented | dirty; @@ -3523,10 +3869,7 @@ inode, mds, seq, truncate_size, truncate_seq); queue_trunc = ceph_fill_file_size(inode, issued, truncate_seq, truncate_size, size); - spin_unlock(&ci->i_ceph_lock); - - if (queue_trunc) - ceph_queue_vmtruncate(inode); + return queue_trunc; } /* @@ -3571,8 +3914,6 @@ if (target < 0) { __ceph_remove_cap(cap, false); - if (!ci->i_auth_cap) - ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; goto out_unlock; } @@ -3602,15 +3943,9 @@ tcap->issue_seq = t_seq - 1; tcap->issued |= issued; tcap->implemented |= issued; - if (cap == ci->i_auth_cap) + if (cap == ci->i_auth_cap) { ci->i_auth_cap = tcap; - - if (!list_empty(&ci->i_cap_flush_list) && - ci->i_auth_cap == tcap) { - spin_lock(&mdsc->cap_dirty_lock); - list_move_tail(&ci->i_flushing_item, - &tcap->session->s_cap_flushing); - spin_unlock(&mdsc->cap_dirty_lock); + change_auth_cap_ses(ci, tcap->session); } } __ceph_remove_cap(cap, false); @@ -3619,7 +3954,7 @@ /* add placeholder for the export tagert */ int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; tcap = new_cap; - ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, + ceph_add_cap(inode, tsession, t_cap_id, issued, 0, t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); if (!list_empty(&ci->i_cap_flush_list) && @@ -3679,7 +4014,6 @@ struct ceph_mds_cap_peer *ph, struct ceph_mds_session *session, struct ceph_cap **target_cap, int *old_issued) - __acquires(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap, *ocap, *new_cap = NULL; @@ -3704,14 +4038,13 @@ dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", inode, ci, mds, mseq, peer); - retry: - spin_lock(&ci->i_ceph_lock); cap = __get_cap_for_mds(ci, mds); if (!cap) { if (!new_cap) { spin_unlock(&ci->i_ceph_lock); new_cap = ceph_get_cap(mdsc, NULL); + spin_lock(&ci->i_ceph_lock); goto retry; } cap = new_cap; @@ -3725,7 +4058,7 @@ __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, + ceph_add_cap(inode, session, cap_id, caps, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH, &new_cap); ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; @@ -3745,9 +4078,6 @@ } __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); } - - /* make sure we re-request max_size, if necessary */ - ci->i_requested_max_size = 0; *old_issued = issued; *target_cap = cap; @@ -3777,6 +4107,7 @@ size_t snaptrace_len; void *p, *end; struct cap_extra_info extra_info = {}; + bool queue_trunc; dout("handle_caps from mds%d\n", session->s_mds); @@ -3852,17 +4183,19 @@ } } - if (msg_version >= 11) { + if (msg_version >= 9) { struct ceph_timespec *btime; - u64 change_attr; - u32 flags; - /* version >= 9 */ if (p + sizeof(*btime) > end) goto bad; btime = p; + ceph_decode_timespec64(&extra_info.btime, btime); p += sizeof(*btime); - ceph_decode_64_safe(&p, end, change_attr, bad); + ceph_decode_64_safe(&p, end, extra_info.change_attr, bad); + } + + if (msg_version >= 11) { + u32 flags; /* version >= 10 */ ceph_decode_32_safe(&p, end, flags, bad); /* version >= 11 */ @@ -3878,7 +4211,7 @@ vino.snap, inode); mutex_lock(&session->s_mutex); - session->s_seq++; + inc_session_sequence(session); dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, (unsigned)seq); @@ -3894,9 +4227,7 @@ cap->seq = seq; cap->issue_seq = seq; spin_lock(&session->s_cap_lock); - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; + __ceph_queue_cap_release(session, cap); spin_unlock(&session->s_cap_lock); } goto flush_cap_releases; @@ -3924,6 +4255,7 @@ } else { down_read(&mdsc->snap_rwsem); } + spin_lock(&ci->i_ceph_lock); handle_cap_import(mdsc, inode, h, peer, session, &cap, &extra_info.issued); handle_cap_grant(inode, session, cap, @@ -3960,7 +4292,10 @@ break; case CEPH_CAP_OP_TRUNC: - handle_cap_trunc(inode, h, session); + queue_trunc = handle_cap_trunc(inode, h, session); + spin_unlock(&ci->i_ceph_lock); + if (queue_trunc) + ceph_queue_vmtruncate(inode); break; default: @@ -3969,7 +4304,13 @@ ceph_cap_op_name(op)); } - goto done; +done: + mutex_unlock(&session->s_mutex); +done_unlocked: + ceph_put_string(extra_info.pool_ns); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(inode); + return; flush_cap_releases: /* @@ -3977,14 +4318,8 @@ * along for the mds (who clearly thinks we still have this * cap). */ - ceph_send_cap_releases(mdsc, session); - -done: - mutex_unlock(&session->s_mutex); -done_unlocked: - iput(inode); - ceph_put_string(extra_info.pool_ns); - return; + ceph_flush_cap_releases(mdsc, session); + goto done; bad: pr_err("ceph_handle_caps: corrupt message\n"); @@ -3994,56 +4329,70 @@ /* * Delayed work handler to process end of delayed cap release LRU list. + * + * If new caps are added to the list while processing it, these won't get + * processed in this run. In this case, the ci->i_hold_caps_max will be + * returned so that the work can be scheduled accordingly. */ -void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) +unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc) { struct inode *inode; struct ceph_inode_info *ci; - int flags = CHECK_CAPS_NODELAY; + struct ceph_mount_options *opt = mdsc->fsc->mount_options; + unsigned long delay_max = opt->caps_wanted_delay_max * HZ; + unsigned long loop_start = jiffies; + unsigned long delay = 0; dout("check_delayed_caps\n"); - while (1) { - spin_lock(&mdsc->cap_delay_lock); - if (list_empty(&mdsc->cap_delay_list)) - break; + spin_lock(&mdsc->cap_delay_lock); + while (!list_empty(&mdsc->cap_delay_list)) { ci = list_first_entry(&mdsc->cap_delay_list, struct ceph_inode_info, i_cap_delay_list); + if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) { + dout("%s caps added recently. Exiting loop", __func__); + delay = ci->i_hold_caps_max; + break; + } if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 && time_before(jiffies, ci->i_hold_caps_max)) break; list_del_init(&ci->i_cap_delay_list); inode = igrab(&ci->vfs_inode); - spin_unlock(&mdsc->cap_delay_lock); - if (inode) { + spin_unlock(&mdsc->cap_delay_lock); dout("check_delayed_caps on %p\n", inode); - ceph_check_caps(ci, flags, NULL); - iput(inode); + ceph_check_caps(ci, 0, NULL); + /* avoid calling iput_final() in tick thread */ + ceph_async_iput(inode); + spin_lock(&mdsc->cap_delay_lock); } } spin_unlock(&mdsc->cap_delay_lock); + + return delay; } /* * Flush all dirty caps to the mds */ -void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) +static void flush_dirty_session_caps(struct ceph_mds_session *s) { + struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_inode_info *ci; struct inode *inode; dout("flush_dirty_caps\n"); spin_lock(&mdsc->cap_dirty_lock); - while (!list_empty(&mdsc->cap_dirty)) { - ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info, + while (!list_empty(&s->s_cap_dirty)) { + ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info, i_dirty_item); inode = &ci->vfs_inode; ihold(inode); dout("flush_dirty_caps %p\n", inode); spin_unlock(&mdsc->cap_dirty_lock); - ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL); + ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL); iput(inode); spin_lock(&mdsc->cap_dirty_lock); } @@ -4051,14 +4400,53 @@ dout("flush_dirty_caps done\n"); } -void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) +void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) { - int i; + ceph_mdsc_iterate_sessions(mdsc, flush_dirty_session_caps, true); +} + +void __ceph_touch_fmode(struct ceph_inode_info *ci, + struct ceph_mds_client *mdsc, int fmode) +{ + unsigned long now = jiffies; + if (fmode & CEPH_FILE_MODE_RD) + ci->i_last_rd = now; + if (fmode & CEPH_FILE_MODE_WR) + ci->i_last_wr = now; + /* queue periodic check */ + if (fmode && + __ceph_is_any_real_caps(ci) && + list_empty(&ci->i_cap_delay_list)) + __cap_delay_requeue(mdsc, ci); +} + +void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count) +{ + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->vfs_inode.i_sb); int bits = (fmode << 1) | 1; + bool already_opened = false; + int i; + + if (count == 1) + atomic64_inc(&mdsc->metric.opened_files); + + spin_lock(&ci->i_ceph_lock); for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + /* + * If any of the mode ref is larger than 0, + * that means it has been already opened by + * others. Just skip checking the PIN ref. + */ + if (i && ci->i_nr_by_mode[i]) + already_opened = true; + if (bits & (1 << i)) - ci->i_nr_by_mode[i]++; + ci->i_nr_by_mode[i] += count; } + + if (!already_opened) + percpu_counter_inc(&mdsc->metric.opened_inodes); + spin_unlock(&ci->i_ceph_lock); } /* @@ -4066,30 +4454,39 @@ * we may need to release capabilities to the MDS (or schedule * their delayed release). */ -void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) +void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count) { - int i, last = 0; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->vfs_inode.i_sb); int bits = (fmode << 1) | 1; + bool is_closed = true; + int i; + + if (count == 1) + atomic64_dec(&mdsc->metric.opened_files); + spin_lock(&ci->i_ceph_lock); for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { if (bits & (1 << i)) { - BUG_ON(ci->i_nr_by_mode[i] == 0); - if (--ci->i_nr_by_mode[i] == 0) - last++; + BUG_ON(ci->i_nr_by_mode[i] < count); + ci->i_nr_by_mode[i] -= count; } - } - dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", - &ci->vfs_inode, fmode, - ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], - ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); - spin_unlock(&ci->i_ceph_lock); - if (last && ci->i_vino.snap == CEPH_NOSNAP) - ceph_check_caps(ci, 0, NULL); + /* + * If any of the mode ref is not 0 after + * decreased, that means it is still opened + * by others. Just skip checking the PIN ref. + */ + if (i && ci->i_nr_by_mode[i]) + is_closed = false; + } + + if (is_closed) + percpu_counter_dec(&mdsc->metric.opened_inodes); + spin_unlock(&ci->i_ceph_lock); } /* - * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it + * For a soon-to-be unlinked file, drop the LINK caps. If it * looks like the link count will hit 0, drop any other caps (other * than PIN) we don't specifically want (due to the file still being * open). @@ -4103,7 +4500,6 @@ if (inode->i_nlink == 1) { drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN); - ci->i_ceph_flags |= CEPH_I_NODELAY; if (__ceph_caps_dirty(ci)) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; @@ -4159,8 +4555,6 @@ if (force || (cap->issued & drop)) { if (cap->issued & drop) { int wanted = __ceph_caps_wanted(ci); - if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) - wanted |= cap->mds_wanted; dout("encode_inode_release %p cap %p " "%s -> %s, wanted %s -> %s\n", inode, cap, ceph_cap_string(cap->issued), @@ -4171,6 +4565,9 @@ cap->issued &= ~drop; cap->implemented &= ~drop; cap->mds_wanted = wanted; + if (cap == ci->i_auth_cap && + !(wanted & CEPH_CAP_ANY_FILE_WR)) + ci->i_requested_max_size = 0; } else { dout("encode_inode_release %p cap %p %s" " (force)\n", inode, cap, -- Gitblit v1.6.2