hc
2024-02-20 102a0743326a03cd1a1202ceda21e175b7d3575c
kernel/fs/ceph/snap.c
....@@ -3,11 +3,13 @@
33
44 #include <linux/sort.h>
55 #include <linux/slab.h>
6
-
6
+#include <linux/iversion.h>
77 #include "super.h"
88 #include "mds_client.h"
9
-
109 #include <linux/ceph/decode.h>
10
+
11
+/* unused map expires after 5 minutes */
12
+#define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ)
1113
1214 /*
1315 * Snapshots in ceph are driven in large part by cooperation from the
....@@ -58,24 +60,26 @@
5860 /*
5961 * increase ref count for the realm
6062 *
61
- * caller must hold snap_rwsem for write.
63
+ * caller must hold snap_rwsem.
6264 */
6365 void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
6466 struct ceph_snap_realm *realm)
6567 {
66
- dout("get_realm %p %d -> %d\n", realm,
67
- atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
68
+ lockdep_assert_held(&mdsc->snap_rwsem);
69
+
6870 /*
69
- * since we _only_ increment realm refs or empty the empty
70
- * list with snap_rwsem held, adjusting the empty list here is
71
- * safe. we do need to protect against concurrent empty list
72
- * additions, however.
71
+ * The 0->1 and 1->0 transitions must take the snap_empty_lock
72
+ * atomically with the refcount change. Go ahead and bump the
73
+ * nref here, unless it's 0, in which case we take the spinlock
74
+ * and then do the increment and remove it from the list.
7375 */
74
- if (atomic_inc_return(&realm->nref) == 1) {
75
- spin_lock(&mdsc->snap_empty_lock);
76
+ if (atomic_inc_not_zero(&realm->nref))
77
+ return;
78
+
79
+ spin_lock(&mdsc->snap_empty_lock);
80
+ if (atomic_inc_return(&realm->nref) == 1)
7681 list_del_init(&realm->empty_item);
77
- spin_unlock(&mdsc->snap_empty_lock);
78
- }
82
+ spin_unlock(&mdsc->snap_empty_lock);
7983 }
8084
8185 static void __insert_snap_realm(struct rb_root *root,
....@@ -111,6 +115,8 @@
111115 {
112116 struct ceph_snap_realm *realm;
113117
118
+ lockdep_assert_held_write(&mdsc->snap_rwsem);
119
+
114120 realm = kzalloc(sizeof(*realm), GFP_NOFS);
115121 if (!realm)
116122 return ERR_PTR(-ENOMEM);
....@@ -124,6 +130,8 @@
124130 INIT_LIST_HEAD(&realm->inodes_with_caps);
125131 spin_lock_init(&realm->inodes_with_caps_lock);
126132 __insert_snap_realm(&mdsc->snap_realms, realm);
133
+ mdsc->num_snap_realms++;
134
+
127135 dout("create_snap_realm %llx %p\n", realm->ino, realm);
128136 return realm;
129137 }
....@@ -131,13 +139,15 @@
131139 /*
132140 * lookup the realm rooted at @ino.
133141 *
134
- * caller must hold snap_rwsem for write.
142
+ * caller must hold snap_rwsem.
135143 */
136144 static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
137145 u64 ino)
138146 {
139147 struct rb_node *n = mdsc->snap_realms.rb_node;
140148 struct ceph_snap_realm *r;
149
+
150
+ lockdep_assert_held(&mdsc->snap_rwsem);
141151
142152 while (n) {
143153 r = rb_entry(n, struct ceph_snap_realm, node);
....@@ -172,9 +182,12 @@
172182 static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
173183 struct ceph_snap_realm *realm)
174184 {
185
+ lockdep_assert_held_write(&mdsc->snap_rwsem);
186
+
175187 dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
176188
177189 rb_erase(&realm->node, &mdsc->snap_realms);
190
+ mdsc->num_snap_realms--;
178191
179192 if (realm->parent) {
180193 list_del_init(&realm->child_item);
....@@ -193,28 +206,30 @@
193206 static void __put_snap_realm(struct ceph_mds_client *mdsc,
194207 struct ceph_snap_realm *realm)
195208 {
196
- dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
197
- atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
209
+ lockdep_assert_held_write(&mdsc->snap_rwsem);
210
+
211
+ /*
212
+ * We do not require the snap_empty_lock here, as any caller that
213
+ * increments the value must hold the snap_rwsem.
214
+ */
198215 if (atomic_dec_and_test(&realm->nref))
199216 __destroy_snap_realm(mdsc, realm);
200217 }
201218
202219 /*
203
- * caller needn't hold any locks
220
+ * See comments in ceph_get_snap_realm. Caller needn't hold any locks.
204221 */
205222 void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
206223 struct ceph_snap_realm *realm)
207224 {
208
- dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
209
- atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
210
- if (!atomic_dec_and_test(&realm->nref))
225
+ if (!atomic_dec_and_lock(&realm->nref, &mdsc->snap_empty_lock))
211226 return;
212227
213228 if (down_write_trylock(&mdsc->snap_rwsem)) {
229
+ spin_unlock(&mdsc->snap_empty_lock);
214230 __destroy_snap_realm(mdsc, realm);
215231 up_write(&mdsc->snap_rwsem);
216232 } else {
217
- spin_lock(&mdsc->snap_empty_lock);
218233 list_add(&realm->empty_item, &mdsc->snap_empty);
219234 spin_unlock(&mdsc->snap_empty_lock);
220235 }
....@@ -230,6 +245,8 @@
230245 static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
231246 {
232247 struct ceph_snap_realm *realm;
248
+
249
+ lockdep_assert_held_write(&mdsc->snap_rwsem);
233250
234251 spin_lock(&mdsc->snap_empty_lock);
235252 while (!list_empty(&mdsc->snap_empty)) {
....@@ -263,6 +280,8 @@
263280 u64 parentino)
264281 {
265282 struct ceph_snap_realm *parent;
283
+
284
+ lockdep_assert_held_write(&mdsc->snap_rwsem);
266285
267286 if (realm->parent_ino == parentino)
268287 return 0;
....@@ -468,6 +487,9 @@
468487 pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
469488 return;
470489 }
490
+ capsnap->cap_flush.is_capsnap = true;
491
+ INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
492
+ INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
471493
472494 spin_lock(&ci->i_ceph_lock);
473495 used = __ceph_caps_used(ci);
....@@ -597,13 +619,15 @@
597619 struct ceph_cap_snap *capsnap)
598620 {
599621 struct inode *inode = &ci->vfs_inode;
600
- struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
622
+ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
601623
602624 BUG_ON(capsnap->writing);
603625 capsnap->size = inode->i_size;
604626 capsnap->mtime = inode->i_mtime;
605627 capsnap->atime = inode->i_atime;
606628 capsnap->ctime = inode->i_ctime;
629
+ capsnap->btime = ci->i_btime;
630
+ capsnap->change_attr = inode_peek_iversion_raw(inode);
607631 capsnap->time_warp_seq = ci->i_time_warp_seq;
608632 capsnap->truncate_size = ci->i_truncate_size;
609633 capsnap->truncate_seq = ci->i_truncate_seq;
....@@ -623,8 +647,10 @@
623647 capsnap->size);
624648
625649 spin_lock(&mdsc->snap_flush_lock);
626
- if (list_empty(&ci->i_snap_flush_item))
650
+ if (list_empty(&ci->i_snap_flush_item)) {
651
+ ihold(inode);
627652 list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
653
+ }
628654 spin_unlock(&mdsc->snap_flush_lock);
629655 return 1; /* caller may want to ceph_flush_snaps */
630656 }
....@@ -646,13 +672,15 @@
646672 if (!inode)
647673 continue;
648674 spin_unlock(&realm->inodes_with_caps_lock);
649
- iput(lastinode);
675
+ /* avoid calling iput_final() while holding
676
+ * mdsc->snap_rwsem or in mds dispatch threads */
677
+ ceph_async_iput(lastinode);
650678 lastinode = inode;
651679 ceph_queue_cap_snap(ci);
652680 spin_lock(&realm->inodes_with_caps_lock);
653681 }
654682 spin_unlock(&realm->inodes_with_caps_lock);
655
- iput(lastinode);
683
+ ceph_async_iput(lastinode);
656684
657685 dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
658686 }
....@@ -671,14 +699,19 @@
671699 struct ceph_mds_snap_realm *ri; /* encoded */
672700 __le64 *snaps; /* encoded */
673701 __le64 *prior_parent_snaps; /* encoded */
674
- struct ceph_snap_realm *realm = NULL;
702
+ struct ceph_snap_realm *realm;
675703 struct ceph_snap_realm *first_realm = NULL;
676
- int invalidate = 0;
704
+ struct ceph_snap_realm *realm_to_rebuild = NULL;
705
+ int rebuild_snapcs;
677706 int err = -ENOMEM;
678707 LIST_HEAD(dirty_realms);
679708
709
+ lockdep_assert_held_write(&mdsc->snap_rwsem);
710
+
680711 dout("update_snap_trace deletion=%d\n", deletion);
681712 more:
713
+ realm = NULL;
714
+ rebuild_snapcs = 0;
682715 ceph_decode_need(&p, e, sizeof(*ri), bad);
683716 ri = p;
684717 p += sizeof(*ri);
....@@ -702,7 +735,7 @@
702735 err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
703736 if (err < 0)
704737 goto fail;
705
- invalidate += err;
738
+ rebuild_snapcs += err;
706739
707740 if (le64_to_cpu(ri->seq) > realm->seq) {
708741 dout("update_snap_trace updating %llx %p %lld -> %lld\n",
....@@ -727,22 +760,30 @@
727760 if (realm->seq > mdsc->last_snap_seq)
728761 mdsc->last_snap_seq = realm->seq;
729762
730
- invalidate = 1;
763
+ rebuild_snapcs = 1;
731764 } else if (!realm->cached_context) {
732765 dout("update_snap_trace %llx %p seq %lld new\n",
733766 realm->ino, realm, realm->seq);
734
- invalidate = 1;
767
+ rebuild_snapcs = 1;
735768 } else {
736769 dout("update_snap_trace %llx %p seq %lld unchanged\n",
737770 realm->ino, realm, realm->seq);
738771 }
739772
740
- dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
741
- realm, invalidate, p, e);
773
+ dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino,
774
+ realm, rebuild_snapcs, p, e);
742775
743
- /* invalidate when we reach the _end_ (root) of the trace */
744
- if (invalidate && p >= e)
745
- rebuild_snap_realms(realm, &dirty_realms);
776
+ /*
777
+ * this will always track the uppest parent realm from which
778
+ * we need to rebuild the snapshot contexts _downward_ in
779
+ * hierarchy.
780
+ */
781
+ if (rebuild_snapcs)
782
+ realm_to_rebuild = realm;
783
+
784
+ /* rebuild_snapcs when we reach the _end_ (root) of the trace */
785
+ if (realm_to_rebuild && p >= e)
786
+ rebuild_snap_realms(realm_to_rebuild, &dirty_realms);
746787
747788 if (!first_realm)
748789 first_realm = realm;
....@@ -804,7 +845,9 @@
804845 ihold(inode);
805846 spin_unlock(&mdsc->snap_flush_lock);
806847 ceph_flush_snaps(ci, &session);
807
- iput(inode);
848
+ /* avoid calling iput_final() while holding
849
+ * session->s_mutex or in mds dispatch threads */
850
+ ceph_async_iput(inode);
808851 spin_lock(&mdsc->snap_flush_lock);
809852 }
810853 spin_unlock(&mdsc->snap_flush_lock);
....@@ -862,7 +905,7 @@
862905 ceph_snap_op_name(op), split, trace_len);
863906
864907 mutex_lock(&session->s_mutex);
865
- session->s_seq++;
908
+ inc_session_sequence(session);
866909 mutex_unlock(&session->s_mutex);
867910
868911 down_write(&mdsc->snap_rwsem);
....@@ -948,12 +991,14 @@
948991 ceph_get_snap_realm(mdsc, realm);
949992 ceph_put_snap_realm(mdsc, oldrealm);
950993
951
- iput(inode);
994
+ /* avoid calling iput_final() while holding
995
+ * mdsc->snap_rwsem or mds in dispatch threads */
996
+ ceph_async_iput(inode);
952997 continue;
953998
954999 skip_inode:
9551000 spin_unlock(&ci->i_ceph_lock);
956
- iput(inode);
1001
+ ceph_async_iput(inode);
9571002 }
9581003
9591004 /* we may have taken some of the old realm's children. */
....@@ -965,6 +1010,19 @@
9651010 continue;
9661011 adjust_snap_realm_parent(mdsc, child, realm->ino);
9671012 }
1013
+ } else {
1014
+ /*
1015
+ * In the non-split case both 'num_split_inos' and
1016
+ * 'num_split_realms' should be 0, making this a no-op.
1017
+ * However the MDS happens to populate 'split_realms' list
1018
+ * in one of the UPDATE op cases by mistake.
1019
+ *
1020
+ * Skip both lists just in case to ensure that 'p' is
1021
+ * positioned at the start of realm info, as expected by
1022
+ * ceph_update_snap_trace().
1023
+ */
1024
+ p += sizeof(u64) * num_split_inos;
1025
+ p += sizeof(u64) * num_split_realms;
9681026 }
9691027
9701028 /*
....@@ -993,3 +1051,155 @@
9931051 up_write(&mdsc->snap_rwsem);
9941052 return;
9951053 }
1054
+
1055
+struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
1056
+ u64 snap)
1057
+{
1058
+ struct ceph_snapid_map *sm, *exist;
1059
+ struct rb_node **p, *parent;
1060
+ int ret;
1061
+
1062
+ exist = NULL;
1063
+ spin_lock(&mdsc->snapid_map_lock);
1064
+ p = &mdsc->snapid_map_tree.rb_node;
1065
+ while (*p) {
1066
+ exist = rb_entry(*p, struct ceph_snapid_map, node);
1067
+ if (snap > exist->snap) {
1068
+ p = &(*p)->rb_left;
1069
+ } else if (snap < exist->snap) {
1070
+ p = &(*p)->rb_right;
1071
+ } else {
1072
+ if (atomic_inc_return(&exist->ref) == 1)
1073
+ list_del_init(&exist->lru);
1074
+ break;
1075
+ }
1076
+ exist = NULL;
1077
+ }
1078
+ spin_unlock(&mdsc->snapid_map_lock);
1079
+ if (exist) {
1080
+ dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
1081
+ return exist;
1082
+ }
1083
+
1084
+ sm = kmalloc(sizeof(*sm), GFP_NOFS);
1085
+ if (!sm)
1086
+ return NULL;
1087
+
1088
+ ret = get_anon_bdev(&sm->dev);
1089
+ if (ret < 0) {
1090
+ kfree(sm);
1091
+ return NULL;
1092
+ }
1093
+
1094
+ INIT_LIST_HEAD(&sm->lru);
1095
+ atomic_set(&sm->ref, 1);
1096
+ sm->snap = snap;
1097
+
1098
+ exist = NULL;
1099
+ parent = NULL;
1100
+ p = &mdsc->snapid_map_tree.rb_node;
1101
+ spin_lock(&mdsc->snapid_map_lock);
1102
+ while (*p) {
1103
+ parent = *p;
1104
+ exist = rb_entry(*p, struct ceph_snapid_map, node);
1105
+ if (snap > exist->snap)
1106
+ p = &(*p)->rb_left;
1107
+ else if (snap < exist->snap)
1108
+ p = &(*p)->rb_right;
1109
+ else
1110
+ break;
1111
+ exist = NULL;
1112
+ }
1113
+ if (exist) {
1114
+ if (atomic_inc_return(&exist->ref) == 1)
1115
+ list_del_init(&exist->lru);
1116
+ } else {
1117
+ rb_link_node(&sm->node, parent, p);
1118
+ rb_insert_color(&sm->node, &mdsc->snapid_map_tree);
1119
+ }
1120
+ spin_unlock(&mdsc->snapid_map_lock);
1121
+ if (exist) {
1122
+ free_anon_bdev(sm->dev);
1123
+ kfree(sm);
1124
+ dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
1125
+ return exist;
1126
+ }
1127
+
1128
+ dout("create snapid map %llx -> %x\n", sm->snap, sm->dev);
1129
+ return sm;
1130
+}
1131
+
1132
+void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
1133
+ struct ceph_snapid_map *sm)
1134
+{
1135
+ if (!sm)
1136
+ return;
1137
+ if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) {
1138
+ if (!RB_EMPTY_NODE(&sm->node)) {
1139
+ sm->last_used = jiffies;
1140
+ list_add_tail(&sm->lru, &mdsc->snapid_map_lru);
1141
+ spin_unlock(&mdsc->snapid_map_lock);
1142
+ } else {
1143
+ /* already cleaned up by
1144
+ * ceph_cleanup_snapid_map() */
1145
+ spin_unlock(&mdsc->snapid_map_lock);
1146
+ kfree(sm);
1147
+ }
1148
+ }
1149
+}
1150
+
1151
+void ceph_trim_snapid_map(struct ceph_mds_client *mdsc)
1152
+{
1153
+ struct ceph_snapid_map *sm;
1154
+ unsigned long now;
1155
+ LIST_HEAD(to_free);
1156
+
1157
+ spin_lock(&mdsc->snapid_map_lock);
1158
+ now = jiffies;
1159
+
1160
+ while (!list_empty(&mdsc->snapid_map_lru)) {
1161
+ sm = list_first_entry(&mdsc->snapid_map_lru,
1162
+ struct ceph_snapid_map, lru);
1163
+ if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now))
1164
+ break;
1165
+
1166
+ rb_erase(&sm->node, &mdsc->snapid_map_tree);
1167
+ list_move(&sm->lru, &to_free);
1168
+ }
1169
+ spin_unlock(&mdsc->snapid_map_lock);
1170
+
1171
+ while (!list_empty(&to_free)) {
1172
+ sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
1173
+ list_del(&sm->lru);
1174
+ dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev);
1175
+ free_anon_bdev(sm->dev);
1176
+ kfree(sm);
1177
+ }
1178
+}
1179
+
1180
+void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
1181
+{
1182
+ struct ceph_snapid_map *sm;
1183
+ struct rb_node *p;
1184
+ LIST_HEAD(to_free);
1185
+
1186
+ spin_lock(&mdsc->snapid_map_lock);
1187
+ while ((p = rb_first(&mdsc->snapid_map_tree))) {
1188
+ sm = rb_entry(p, struct ceph_snapid_map, node);
1189
+ rb_erase(p, &mdsc->snapid_map_tree);
1190
+ RB_CLEAR_NODE(p);
1191
+ list_move(&sm->lru, &to_free);
1192
+ }
1193
+ spin_unlock(&mdsc->snapid_map_lock);
1194
+
1195
+ while (!list_empty(&to_free)) {
1196
+ sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
1197
+ list_del(&sm->lru);
1198
+ free_anon_bdev(sm->dev);
1199
+ if (WARN_ON_ONCE(atomic_read(&sm->ref))) {
1200
+ pr_err("snapid map %llx -> %x still in use\n",
1201
+ sm->snap, sm->dev);
1202
+ }
1203
+ kfree(sm);
1204
+ }
1205
+}