hc
2024-01-03 2f7c68cb55ecb7331f2381deb497c27155f32faf
kernel/fs/ceph/mds_client.c
....@@ -9,6 +9,8 @@
99 #include <linux/debugfs.h>
1010 #include <linux/seq_file.h>
1111 #include <linux/ratelimit.h>
12
+#include <linux/bits.h>
13
+#include <linux/ktime.h>
1214
1315 #include "super.h"
1416 #include "mds_client.h"
....@@ -19,6 +21,8 @@
1921 #include <linux/ceph/pagelist.h>
2022 #include <linux/ceph/auth.h>
2123 #include <linux/ceph/debugfs.h>
24
+
25
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
2226
2327 /*
2428 * A cluster of MDS (metadata server) daemons is responsible for
....@@ -46,13 +50,17 @@
4650 */
4751
4852 struct ceph_reconnect_state {
49
- int nr_caps;
53
+ struct ceph_mds_session *session;
54
+ int nr_caps, nr_realms;
5055 struct ceph_pagelist *pagelist;
5156 unsigned msg_version;
57
+ bool allow_multi;
5258 };
5359
5460 static void __wake_requests(struct ceph_mds_client *mdsc,
5561 struct list_head *head);
62
+static void ceph_cap_release_work(struct work_struct *work);
63
+static void ceph_cap_reclaim_work(struct work_struct *work);
5664
5765 static const struct ceph_connection_operations mds_con_ops;
5866
....@@ -61,6 +69,29 @@
6169 * mds reply parsing
6270 */
6371
72
+static int parse_reply_info_quota(void **p, void *end,
73
+ struct ceph_mds_reply_info_in *info)
74
+{
75
+ u8 struct_v, struct_compat;
76
+ u32 struct_len;
77
+
78
+ ceph_decode_8_safe(p, end, struct_v, bad);
79
+ ceph_decode_8_safe(p, end, struct_compat, bad);
80
+ /* struct_v is expected to be >= 1. we only
81
+ * understand encoding with struct_compat == 1. */
82
+ if (!struct_v || struct_compat != 1)
83
+ goto bad;
84
+ ceph_decode_32_safe(p, end, struct_len, bad);
85
+ ceph_decode_need(p, end, struct_len, bad);
86
+ end = *p + struct_len;
87
+ ceph_decode_64_safe(p, end, info->max_bytes, bad);
88
+ ceph_decode_64_safe(p, end, info->max_files, bad);
89
+ *p = end;
90
+ return 0;
91
+bad:
92
+ return -EIO;
93
+}
94
+
6495 /*
6596 * parse individual inode info
6697 */
....@@ -68,8 +99,24 @@
6899 struct ceph_mds_reply_info_in *info,
69100 u64 features)
70101 {
71
- int err = -EIO;
102
+ int err = 0;
103
+ u8 struct_v = 0;
72104
105
+ if (features == (u64)-1) {
106
+ u32 struct_len;
107
+ u8 struct_compat;
108
+ ceph_decode_8_safe(p, end, struct_v, bad);
109
+ ceph_decode_8_safe(p, end, struct_compat, bad);
110
+ /* struct_v is expected to be >= 1. we only understand
111
+ * encoding with struct_compat == 1. */
112
+ if (!struct_v || struct_compat != 1)
113
+ goto bad;
114
+ ceph_decode_32_safe(p, end, struct_len, bad);
115
+ ceph_decode_need(p, end, struct_len, bad);
116
+ end = *p + struct_len;
117
+ }
118
+
119
+ ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
73120 info->in = *p;
74121 *p += sizeof(struct ceph_mds_reply_inode) +
75122 sizeof(*info->in->fragtree.splits) *
....@@ -80,60 +127,158 @@
80127 info->symlink = *p;
81128 *p += info->symlink_len;
82129
83
- if (features & CEPH_FEATURE_DIRLAYOUTHASH)
84
- ceph_decode_copy_safe(p, end, &info->dir_layout,
85
- sizeof(info->dir_layout), bad);
86
- else
87
- memset(&info->dir_layout, 0, sizeof(info->dir_layout));
88
-
130
+ ceph_decode_copy_safe(p, end, &info->dir_layout,
131
+ sizeof(info->dir_layout), bad);
89132 ceph_decode_32_safe(p, end, info->xattr_len, bad);
90133 ceph_decode_need(p, end, info->xattr_len, bad);
91134 info->xattr_data = *p;
92135 *p += info->xattr_len;
93136
94
- if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
137
+ if (features == (u64)-1) {
138
+ /* inline data */
95139 ceph_decode_64_safe(p, end, info->inline_version, bad);
96140 ceph_decode_32_safe(p, end, info->inline_len, bad);
97141 ceph_decode_need(p, end, info->inline_len, bad);
98142 info->inline_data = *p;
99143 *p += info->inline_len;
100
- } else
101
- info->inline_version = CEPH_INLINE_NONE;
102
-
103
- if (features & CEPH_FEATURE_MDS_QUOTA) {
104
- u8 struct_v, struct_compat;
105
- u32 struct_len;
106
-
107
- /*
108
- * both struct_v and struct_compat are expected to be >= 1
109
- */
110
- ceph_decode_8_safe(p, end, struct_v, bad);
111
- ceph_decode_8_safe(p, end, struct_compat, bad);
112
- if (!struct_v || !struct_compat)
113
- goto bad;
114
- ceph_decode_32_safe(p, end, struct_len, bad);
115
- ceph_decode_need(p, end, struct_len, bad);
116
- ceph_decode_64_safe(p, end, info->max_bytes, bad);
117
- ceph_decode_64_safe(p, end, info->max_files, bad);
118
- } else {
119
- info->max_bytes = 0;
120
- info->max_files = 0;
121
- }
122
-
123
- info->pool_ns_len = 0;
124
- info->pool_ns_data = NULL;
125
- if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
144
+ /* quota */
145
+ err = parse_reply_info_quota(p, end, info);
146
+ if (err < 0)
147
+ goto out_bad;
148
+ /* pool namespace */
126149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
127150 if (info->pool_ns_len > 0) {
128151 ceph_decode_need(p, end, info->pool_ns_len, bad);
129152 info->pool_ns_data = *p;
130153 *p += info->pool_ns_len;
131154 }
132
- }
133155
156
+ /* btime */
157
+ ceph_decode_need(p, end, sizeof(info->btime), bad);
158
+ ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
+
160
+ /* change attribute */
161
+ ceph_decode_64_safe(p, end, info->change_attr, bad);
162
+
163
+ /* dir pin */
164
+ if (struct_v >= 2) {
165
+ ceph_decode_32_safe(p, end, info->dir_pin, bad);
166
+ } else {
167
+ info->dir_pin = -ENODATA;
168
+ }
169
+
170
+ /* snapshot birth time, remains zero for v<=2 */
171
+ if (struct_v >= 3) {
172
+ ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173
+ ceph_decode_copy(p, &info->snap_btime,
174
+ sizeof(info->snap_btime));
175
+ } else {
176
+ memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177
+ }
178
+
179
+ *p = end;
180
+ } else {
181
+ if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
182
+ ceph_decode_64_safe(p, end, info->inline_version, bad);
183
+ ceph_decode_32_safe(p, end, info->inline_len, bad);
184
+ ceph_decode_need(p, end, info->inline_len, bad);
185
+ info->inline_data = *p;
186
+ *p += info->inline_len;
187
+ } else
188
+ info->inline_version = CEPH_INLINE_NONE;
189
+
190
+ if (features & CEPH_FEATURE_MDS_QUOTA) {
191
+ err = parse_reply_info_quota(p, end, info);
192
+ if (err < 0)
193
+ goto out_bad;
194
+ } else {
195
+ info->max_bytes = 0;
196
+ info->max_files = 0;
197
+ }
198
+
199
+ info->pool_ns_len = 0;
200
+ info->pool_ns_data = NULL;
201
+ if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
202
+ ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
203
+ if (info->pool_ns_len > 0) {
204
+ ceph_decode_need(p, end, info->pool_ns_len, bad);
205
+ info->pool_ns_data = *p;
206
+ *p += info->pool_ns_len;
207
+ }
208
+ }
209
+
210
+ if (features & CEPH_FEATURE_FS_BTIME) {
211
+ ceph_decode_need(p, end, sizeof(info->btime), bad);
212
+ ceph_decode_copy(p, &info->btime, sizeof(info->btime));
213
+ ceph_decode_64_safe(p, end, info->change_attr, bad);
214
+ }
215
+
216
+ info->dir_pin = -ENODATA;
217
+ /* info->snap_btime remains zero */
218
+ }
134219 return 0;
135220 bad:
221
+ err = -EIO;
222
+out_bad:
136223 return err;
224
+}
225
+
226
+static int parse_reply_info_dir(void **p, void *end,
227
+ struct ceph_mds_reply_dirfrag **dirfrag,
228
+ u64 features)
229
+{
230
+ if (features == (u64)-1) {
231
+ u8 struct_v, struct_compat;
232
+ u32 struct_len;
233
+ ceph_decode_8_safe(p, end, struct_v, bad);
234
+ ceph_decode_8_safe(p, end, struct_compat, bad);
235
+ /* struct_v is expected to be >= 1. we only understand
236
+ * encoding whose struct_compat == 1. */
237
+ if (!struct_v || struct_compat != 1)
238
+ goto bad;
239
+ ceph_decode_32_safe(p, end, struct_len, bad);
240
+ ceph_decode_need(p, end, struct_len, bad);
241
+ end = *p + struct_len;
242
+ }
243
+
244
+ ceph_decode_need(p, end, sizeof(**dirfrag), bad);
245
+ *dirfrag = *p;
246
+ *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
247
+ if (unlikely(*p > end))
248
+ goto bad;
249
+ if (features == (u64)-1)
250
+ *p = end;
251
+ return 0;
252
+bad:
253
+ return -EIO;
254
+}
255
+
256
+static int parse_reply_info_lease(void **p, void *end,
257
+ struct ceph_mds_reply_lease **lease,
258
+ u64 features)
259
+{
260
+ if (features == (u64)-1) {
261
+ u8 struct_v, struct_compat;
262
+ u32 struct_len;
263
+ ceph_decode_8_safe(p, end, struct_v, bad);
264
+ ceph_decode_8_safe(p, end, struct_compat, bad);
265
+ /* struct_v is expected to be >= 1. we only understand
266
+ * encoding whose struct_compat == 1. */
267
+ if (!struct_v || struct_compat != 1)
268
+ goto bad;
269
+ ceph_decode_32_safe(p, end, struct_len, bad);
270
+ ceph_decode_need(p, end, struct_len, bad);
271
+ end = *p + struct_len;
272
+ }
273
+
274
+ ceph_decode_need(p, end, sizeof(**lease), bad);
275
+ *lease = *p;
276
+ *p += sizeof(**lease);
277
+ if (features == (u64)-1)
278
+ *p = end;
279
+ return 0;
280
+bad:
281
+ return -EIO;
137282 }
138283
139284 /*
....@@ -151,20 +296,18 @@
151296 if (err < 0)
152297 goto out_bad;
153298
154
- if (unlikely(*p + sizeof(*info->dirfrag) > end))
155
- goto bad;
156
- info->dirfrag = *p;
157
- *p += sizeof(*info->dirfrag) +
158
- sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
159
- if (unlikely(*p > end))
160
- goto bad;
299
+ err = parse_reply_info_dir(p, end, &info->dirfrag, features);
300
+ if (err < 0)
301
+ goto out_bad;
161302
162303 ceph_decode_32_safe(p, end, info->dname_len, bad);
163304 ceph_decode_need(p, end, info->dname_len, bad);
164305 info->dname = *p;
165306 *p += info->dname_len;
166
- info->dlease = *p;
167
- *p += sizeof(*info->dlease);
307
+
308
+ err = parse_reply_info_lease(p, end, &info->dlease, features);
309
+ if (err < 0)
310
+ goto out_bad;
168311 }
169312
170313 if (info->head->is_target) {
....@@ -187,20 +330,16 @@
187330 /*
188331 * parse readdir results
189332 */
190
-static int parse_reply_info_dir(void **p, void *end,
333
+static int parse_reply_info_readdir(void **p, void *end,
191334 struct ceph_mds_reply_info_parsed *info,
192335 u64 features)
193336 {
194337 u32 num, i = 0;
195338 int err;
196339
197
- info->dir_dir = *p;
198
- if (*p + sizeof(*info->dir_dir) > end)
199
- goto bad;
200
- *p += sizeof(*info->dir_dir) +
201
- sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
202
- if (*p > end)
203
- goto bad;
340
+ err = parse_reply_info_dir(p, end, &info->dir_dir, features);
341
+ if (err < 0)
342
+ goto out_bad;
204343
205344 ceph_decode_need(p, end, sizeof(num) + 2, bad);
206345 num = ceph_decode_32(p);
....@@ -226,15 +365,16 @@
226365 while (num) {
227366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
228367 /* dentry */
229
- ceph_decode_need(p, end, sizeof(u32)*2, bad);
230
- rde->name_len = ceph_decode_32(p);
368
+ ceph_decode_32_safe(p, end, rde->name_len, bad);
231369 ceph_decode_need(p, end, rde->name_len, bad);
232370 rde->name = *p;
233371 *p += rde->name_len;
234372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
235
- rde->lease = *p;
236
- *p += sizeof(struct ceph_mds_reply_lease);
237373
374
+ /* dentry lease */
375
+ err = parse_reply_info_lease(p, end, &rde->lease, features);
376
+ if (err)
377
+ goto out_bad;
238378 /* inode */
239379 err = parse_reply_info_in(p, end, &rde->inode, features);
240380 if (err < 0)
....@@ -246,8 +386,8 @@
246386 }
247387
248388 done:
249
- if (*p != end)
250
- goto bad;
389
+ /* Skip over any unrecognized fields */
390
+ *p = end;
251391 return 0;
252392
253393 bad:
....@@ -268,36 +408,145 @@
268408 goto bad;
269409
270410 info->filelock_reply = *p;
271
- *p += sizeof(*info->filelock_reply);
272411
273
- if (unlikely(*p != end))
274
- goto bad;
412
+ /* Skip over any unrecognized fields */
413
+ *p = end;
275414 return 0;
276
-
277415 bad:
278416 return -EIO;
279417 }
418
+
419
+
420
+#if BITS_PER_LONG == 64
421
+
422
+#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
423
+
424
+static int ceph_parse_deleg_inos(void **p, void *end,
425
+ struct ceph_mds_session *s)
426
+{
427
+ u32 sets;
428
+
429
+ ceph_decode_32_safe(p, end, sets, bad);
430
+ dout("got %u sets of delegated inodes\n", sets);
431
+ while (sets--) {
432
+ u64 start, len, ino;
433
+
434
+ ceph_decode_64_safe(p, end, start, bad);
435
+ ceph_decode_64_safe(p, end, len, bad);
436
+
437
+ /* Don't accept a delegation of system inodes */
438
+ if (start < CEPH_INO_SYSTEM_BASE) {
439
+ pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
440
+ start, len);
441
+ continue;
442
+ }
443
+ while (len--) {
444
+ int err = xa_insert(&s->s_delegated_inos, ino = start++,
445
+ DELEGATED_INO_AVAILABLE,
446
+ GFP_KERNEL);
447
+ if (!err) {
448
+ dout("added delegated inode 0x%llx\n",
449
+ start - 1);
450
+ } else if (err == -EBUSY) {
451
+ pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
452
+ start - 1);
453
+ } else {
454
+ return err;
455
+ }
456
+ }
457
+ }
458
+ return 0;
459
+bad:
460
+ return -EIO;
461
+}
462
+
463
+u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
464
+{
465
+ unsigned long ino;
466
+ void *val;
467
+
468
+ xa_for_each(&s->s_delegated_inos, ino, val) {
469
+ val = xa_erase(&s->s_delegated_inos, ino);
470
+ if (val == DELEGATED_INO_AVAILABLE)
471
+ return ino;
472
+ }
473
+ return 0;
474
+}
475
+
476
+int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
477
+{
478
+ return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
479
+ GFP_KERNEL);
480
+}
481
+#else /* BITS_PER_LONG == 64 */
482
+/*
483
+ * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
484
+ * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
485
+ * and bottom words?
486
+ */
487
+static int ceph_parse_deleg_inos(void **p, void *end,
488
+ struct ceph_mds_session *s)
489
+{
490
+ u32 sets;
491
+
492
+ ceph_decode_32_safe(p, end, sets, bad);
493
+ if (sets)
494
+ ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
495
+ return 0;
496
+bad:
497
+ return -EIO;
498
+}
499
+
500
+u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
501
+{
502
+ return 0;
503
+}
504
+
505
+int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
506
+{
507
+ return 0;
508
+}
509
+#endif /* BITS_PER_LONG == 64 */
280510
281511 /*
282512 * parse create results
283513 */
284514 static int parse_reply_info_create(void **p, void *end,
285515 struct ceph_mds_reply_info_parsed *info,
286
- u64 features)
516
+ u64 features, struct ceph_mds_session *s)
287517 {
288
- if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
518
+ int ret;
519
+
520
+ if (features == (u64)-1 ||
521
+ (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
289522 if (*p == end) {
523
+ /* Malformed reply? */
290524 info->has_create_ino = false;
291
- } else {
525
+ } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
526
+ u8 struct_v, struct_compat;
527
+ u32 len;
528
+
292529 info->has_create_ino = true;
293
- info->ino = ceph_decode_64(p);
530
+ ceph_decode_8_safe(p, end, struct_v, bad);
531
+ ceph_decode_8_safe(p, end, struct_compat, bad);
532
+ ceph_decode_32_safe(p, end, len, bad);
533
+ ceph_decode_64_safe(p, end, info->ino, bad);
534
+ ret = ceph_parse_deleg_inos(p, end, s);
535
+ if (ret)
536
+ return ret;
537
+ } else {
538
+ /* legacy */
539
+ ceph_decode_64_safe(p, end, info->ino, bad);
540
+ info->has_create_ino = true;
294541 }
542
+ } else {
543
+ if (*p != end)
544
+ goto bad;
295545 }
296546
297
- if (unlikely(*p != end))
298
- goto bad;
547
+ /* Skip over any unrecognized fields */
548
+ *p = end;
299549 return 0;
300
-
301550 bad:
302551 return -EIO;
303552 }
....@@ -307,16 +556,16 @@
307556 */
308557 static int parse_reply_info_extra(void **p, void *end,
309558 struct ceph_mds_reply_info_parsed *info,
310
- u64 features)
559
+ u64 features, struct ceph_mds_session *s)
311560 {
312561 u32 op = le32_to_cpu(info->head->op);
313562
314563 if (op == CEPH_MDS_OP_GETFILELOCK)
315564 return parse_reply_info_filelock(p, end, info, features);
316565 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
317
- return parse_reply_info_dir(p, end, info, features);
566
+ return parse_reply_info_readdir(p, end, info, features);
318567 else if (op == CEPH_MDS_OP_CREATE)
319
- return parse_reply_info_create(p, end, info, features);
568
+ return parse_reply_info_create(p, end, info, features, s);
320569 else
321570 return -EIO;
322571 }
....@@ -324,7 +573,7 @@
324573 /*
325574 * parse entire mds reply
326575 */
327
-static int parse_reply_info(struct ceph_msg *msg,
576
+static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
328577 struct ceph_mds_reply_info_parsed *info,
329578 u64 features)
330579 {
....@@ -349,7 +598,7 @@
349598 ceph_decode_32_safe(&p, end, len, bad);
350599 if (len > 0) {
351600 ceph_decode_need(&p, end, len, bad);
352
- err = parse_reply_info_extra(&p, p+len, info, features);
601
+ err = parse_reply_info_extra(&p, p+len, info, features, s);
353602 if (err < 0)
354603 goto out_bad;
355604 }
....@@ -390,6 +639,7 @@
390639 case CEPH_MDS_SESSION_OPEN: return "open";
391640 case CEPH_MDS_SESSION_HUNG: return "hung";
392641 case CEPH_MDS_SESSION_CLOSING: return "closing";
642
+ case CEPH_MDS_SESSION_CLOSED: return "closed";
393643 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
394644 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
395645 case CEPH_MDS_SESSION_REJECTED: return "rejected";
....@@ -397,7 +647,7 @@
397647 }
398648 }
399649
400
-static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
650
+struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
401651 {
402652 if (refcount_inc_not_zero(&s->s_ref)) {
403653 dout("mdsc get_session %p %d -> %d\n", s,
....@@ -411,11 +661,16 @@
411661
412662 void ceph_put_mds_session(struct ceph_mds_session *s)
413663 {
664
+ if (IS_ERR_OR_NULL(s))
665
+ return;
666
+
414667 dout("mdsc put_session %p %d -> %d\n", s,
415668 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
416669 if (refcount_dec_and_test(&s->s_ref)) {
417670 if (s->s_auth.authorizer)
418671 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
672
+ WARN_ON(mutex_is_locked(&s->s_mutex));
673
+ xa_destroy(&s->s_delegated_inos);
419674 kfree(s);
420675 }
421676 }
....@@ -426,15 +681,9 @@
426681 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
427682 int mds)
428683 {
429
- struct ceph_mds_session *session;
430
-
431684 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
432685 return NULL;
433
- session = mdsc->sessions[mds];
434
- dout("lookup_mds_session %p %d\n", session,
435
- refcount_read(&session->s_ref));
436
- get_session(session);
437
- return session;
686
+ return ceph_get_mds_session(mdsc->sessions[mds]);
438687 }
439688
440689 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
....@@ -463,7 +712,7 @@
463712 {
464713 struct ceph_mds_session *s;
465714
466
- if (mds >= mdsc->mdsmap->m_num_mds)
715
+ if (mds >= mdsc->mdsmap->possible_max_rank)
467716 return ERR_PTR(-EINVAL);
468717
469718 s = kzalloc(sizeof(*s), GFP_NOFS);
....@@ -498,7 +747,7 @@
498747 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
499748
500749 spin_lock_init(&s->s_gen_ttl_lock);
501
- s->s_cap_gen = 0;
750
+ s->s_cap_gen = 1;
502751 s->s_cap_ttl = jiffies - 1;
503752
504753 spin_lock_init(&s->s_cap_lock);
....@@ -506,14 +755,17 @@
506755 s->s_renew_seq = 0;
507756 INIT_LIST_HEAD(&s->s_caps);
508757 s->s_nr_caps = 0;
509
- s->s_trim_caps = 0;
510758 refcount_set(&s->s_ref, 1);
511759 INIT_LIST_HEAD(&s->s_waiting);
512760 INIT_LIST_HEAD(&s->s_unsafe);
761
+ xa_init(&s->s_delegated_inos);
513762 s->s_num_cap_releases = 0;
514763 s->s_cap_reconnect = 0;
515764 s->s_cap_iterator = NULL;
516765 INIT_LIST_HEAD(&s->s_cap_releases);
766
+ INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
767
+
768
+ INIT_LIST_HEAD(&s->s_cap_dirty);
517769 INIT_LIST_HEAD(&s->s_cap_flushing);
518770
519771 mdsc->sessions[mds] = s;
....@@ -557,11 +809,39 @@
557809 }
558810 }
559811
812
+void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
813
+ void (*cb)(struct ceph_mds_session *),
814
+ bool check_state)
815
+{
816
+ int mds;
817
+
818
+ mutex_lock(&mdsc->mutex);
819
+ for (mds = 0; mds < mdsc->max_sessions; ++mds) {
820
+ struct ceph_mds_session *s;
821
+
822
+ s = __ceph_lookup_mds_session(mdsc, mds);
823
+ if (!s)
824
+ continue;
825
+
826
+ if (check_state && !check_session_state(s)) {
827
+ ceph_put_mds_session(s);
828
+ continue;
829
+ }
830
+
831
+ mutex_unlock(&mdsc->mutex);
832
+ cb(s);
833
+ ceph_put_mds_session(s);
834
+ mutex_lock(&mdsc->mutex);
835
+ }
836
+ mutex_unlock(&mdsc->mutex);
837
+}
838
+
560839 void ceph_mdsc_release_request(struct kref *kref)
561840 {
562841 struct ceph_mds_request *req = container_of(kref,
563842 struct ceph_mds_request,
564843 r_kref);
844
+ ceph_mdsc_release_dir_caps_no_check(req);
565845 destroy_reply_info(&req->r_reply_info);
566846 if (req->r_request)
567847 ceph_msg_put(req->r_request);
....@@ -569,11 +849,14 @@
569849 ceph_msg_put(req->r_reply);
570850 if (req->r_inode) {
571851 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
572
- iput(req->r_inode);
852
+ /* avoid calling iput_final() in mds dispatch threads */
853
+ ceph_async_iput(req->r_inode);
573854 }
574
- if (req->r_parent)
855
+ if (req->r_parent) {
575856 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
576
- iput(req->r_target_inode);
857
+ ceph_async_iput(req->r_parent);
858
+ }
859
+ ceph_async_iput(req->r_target_inode);
577860 if (req->r_dentry)
578861 dput(req->r_dentry);
579862 if (req->r_old_dentry)
....@@ -587,7 +870,7 @@
587870 */
588871 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
589872 CEPH_CAP_PIN);
590
- iput(req->r_old_dentry_dir);
873
+ ceph_async_iput(req->r_old_dentry_dir);
591874 }
592875 kfree(req->r_path1);
593876 kfree(req->r_path2);
....@@ -595,7 +878,8 @@
595878 ceph_pagelist_release(req->r_pagelist);
596879 put_request_session(req);
597880 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
598
- kfree(req);
881
+ WARN_ON_ONCE(!list_empty(&req->r_wait));
882
+ kmem_cache_free(ceph_mds_request_cachep, req);
599883 }
600884
601885 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
....@@ -652,8 +936,13 @@
652936 mdsc->oldest_tid = req->r_tid;
653937
654938 if (dir) {
939
+ struct ceph_inode_info *ci = ceph_inode(dir);
940
+
655941 ihold(dir);
656942 req->r_unsafe_dir = dir;
943
+ spin_lock(&ci->i_unsafe_lock);
944
+ list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
945
+ spin_unlock(&ci->i_unsafe_lock);
657946 }
658947 }
659948
....@@ -681,8 +970,7 @@
681970
682971 erase_request(&mdsc->request_tree, req);
683972
684
- if (req->r_unsafe_dir &&
685
- test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
973
+ if (req->r_unsafe_dir) {
686974 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
687975 spin_lock(&ci->i_unsafe_lock);
688976 list_del_init(&req->r_unsafe_dir_item);
....@@ -697,7 +985,8 @@
697985 }
698986
699987 if (req->r_unsafe_dir) {
700
- iput(req->r_unsafe_dir);
988
+ /* avoid calling iput_final() in mds dispatch threads */
989
+ ceph_async_iput(req->r_unsafe_dir);
701990 req->r_unsafe_dir = NULL;
702991 }
703992
....@@ -737,7 +1026,8 @@
7371026 * Called under mdsc->mutex.
7381027 */
7391028 static int __choose_mds(struct ceph_mds_client *mdsc,
740
- struct ceph_mds_request *req)
1029
+ struct ceph_mds_request *req,
1030
+ bool *random)
7411031 {
7421032 struct inode *inode;
7431033 struct ceph_inode_info *ci;
....@@ -747,6 +1037,9 @@
7471037 u32 hash = req->r_direct_hash;
7481038 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
7491039
1040
+ if (random)
1041
+ *random = false;
1042
+
7501043 /*
7511044 * is there a specific mds we should try? ignore hint if we have
7521045 * no session and the mds is not up (active or recovering).
....@@ -754,7 +1047,7 @@
7541047 if (req->r_resend_mds >= 0 &&
7551048 (__have_session(mdsc, req->r_resend_mds) ||
7561049 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
757
- dout("choose_mds using resend_mds mds%d\n",
1050
+ dout("%s using resend_mds mds%d\n", __func__,
7581051 req->r_resend_mds);
7591052 return req->r_resend_mds;
7601053 }
....@@ -772,7 +1065,7 @@
7721065 rcu_read_lock();
7731066 inode = get_nonsnap_parent(req->r_dentry);
7741067 rcu_read_unlock();
775
- dout("__choose_mds using snapdir's parent %p\n", inode);
1068
+ dout("%s using snapdir's parent %p\n", __func__, inode);
7761069 }
7771070 } else if (req->r_dentry) {
7781071 /* ignore race with rename; old or new d_parent is okay */
....@@ -780,7 +1073,7 @@
7801073 struct inode *dir;
7811074
7821075 rcu_read_lock();
783
- parent = req->r_dentry->d_parent;
1076
+ parent = READ_ONCE(req->r_dentry->d_parent);
7841077 dir = req->r_parent ? : d_inode_rcu(parent);
7851078
7861079 if (!dir || dir->i_sb != mdsc->fsc->sb) {
....@@ -792,7 +1085,7 @@
7921085 /* direct snapped/virtual snapdir requests
7931086 * based on parent dir inode */
7941087 inode = get_nonsnap_parent(parent);
795
- dout("__choose_mds using nonsnap parent %p\n", inode);
1088
+ dout("%s using nonsnap parent %p\n", __func__, inode);
7961089 } else {
7971090 /* dentry target */
7981091 inode = d_inode(req->r_dentry);
....@@ -808,8 +1101,8 @@
8081101 rcu_read_unlock();
8091102 }
8101103
811
- dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
812
- (int)hash, mode);
1104
+ dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1105
+ hash, mode);
8131106 if (!inode)
8141107 goto random;
8151108 ci = ceph_inode(inode);
....@@ -827,30 +1120,32 @@
8271120 get_random_bytes(&r, 1);
8281121 r %= frag.ndist;
8291122 mds = frag.dist[r];
830
- dout("choose_mds %p %llx.%llx "
831
- "frag %u mds%d (%d/%d)\n",
832
- inode, ceph_vinop(inode),
833
- frag.frag, mds,
834
- (int)r, frag.ndist);
1123
+ dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1124
+ __func__, inode, ceph_vinop(inode),
1125
+ frag.frag, mds, (int)r, frag.ndist);
8351126 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
836
- CEPH_MDS_STATE_ACTIVE)
1127
+ CEPH_MDS_STATE_ACTIVE &&
1128
+ !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
8371129 goto out;
8381130 }
8391131
8401132 /* since this file/dir wasn't known to be
8411133 * replicated, then we want to look for the
8421134 * authoritative mds. */
843
- mode = USE_AUTH_MDS;
8441135 if (frag.mds >= 0) {
8451136 /* choose auth mds */
8461137 mds = frag.mds;
847
- dout("choose_mds %p %llx.%llx "
848
- "frag %u mds%d (auth)\n",
849
- inode, ceph_vinop(inode), frag.frag, mds);
1138
+ dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1139
+ __func__, inode, ceph_vinop(inode),
1140
+ frag.frag, mds);
8501141 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
851
- CEPH_MDS_STATE_ACTIVE)
852
- goto out;
1142
+ CEPH_MDS_STATE_ACTIVE) {
1143
+ if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1144
+ mds))
1145
+ goto out;
1146
+ }
8531147 }
1148
+ mode = USE_AUTH_MDS;
8541149 }
8551150 }
8561151
....@@ -862,21 +1157,26 @@
8621157 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
8631158 if (!cap) {
8641159 spin_unlock(&ci->i_ceph_lock);
865
- iput(inode);
1160
+ ceph_async_iput(inode);
8661161 goto random;
8671162 }
8681163 mds = cap->session->s_mds;
869
- dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
1164
+ dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
8701165 inode, ceph_vinop(inode), mds,
8711166 cap == ci->i_auth_cap ? "auth " : "", cap);
8721167 spin_unlock(&ci->i_ceph_lock);
8731168 out:
874
- iput(inode);
1169
+ /* avoid calling iput_final() while holding mdsc->mutex or
1170
+ * in mds dispatch threads */
1171
+ ceph_async_iput(inode);
8751172 return mds;
8761173
8771174 random:
1175
+ if (random)
1176
+ *random = true;
1177
+
8781178 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
879
- dout("choose_mds chose random mds%d\n", mds);
1179
+ dout("%s chose random mds%d\n", __func__, mds);
8801180 return mds;
8811181 }
8821182
....@@ -884,7 +1184,7 @@
8841184 /*
8851185 * session messages
8861186 */
887
-static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1187
+struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
8881188 {
8891189 struct ceph_msg *msg;
8901190 struct ceph_mds_session_head *h;
....@@ -892,7 +1192,8 @@
8921192 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
8931193 false);
8941194 if (!msg) {
895
- pr_err("create_session_msg ENOMEM creating msg\n");
1195
+ pr_err("ENOMEM creating session %s msg\n",
1196
+ ceph_session_op_name(op));
8961197 return NULL;
8971198 }
8981199 h = msg->front.iov_base;
....@@ -902,25 +1203,77 @@
9021203 return msg;
9031204 }
9041205
905
-static void encode_supported_features(void **p, void *end)
1206
+static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1207
+#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1208
+static int encode_supported_features(void **p, void *end)
9061209 {
907
- static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
908
- static const size_t count = ARRAY_SIZE(bits);
1210
+ static const size_t count = ARRAY_SIZE(feature_bits);
9091211
9101212 if (count > 0) {
9111213 size_t i;
912
- size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
1214
+ size_t size = FEATURE_BYTES(count);
1215
+ unsigned long bit;
9131216
914
- BUG_ON(*p + 4 + size > end);
1217
+ if (WARN_ON_ONCE(*p + 4 + size > end))
1218
+ return -ERANGE;
1219
+
1220
+ ceph_encode_32(p, size);
1221
+ memset(*p, 0, size);
1222
+ for (i = 0; i < count; i++) {
1223
+ bit = feature_bits[i];
1224
+ ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1225
+ }
1226
+ *p += size;
1227
+ } else {
1228
+ if (WARN_ON_ONCE(*p + 4 > end))
1229
+ return -ERANGE;
1230
+
1231
+ ceph_encode_32(p, 0);
1232
+ }
1233
+
1234
+ return 0;
1235
+}
1236
+
1237
+static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1238
+#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1239
+static int encode_metric_spec(void **p, void *end)
1240
+{
1241
+ static const size_t count = ARRAY_SIZE(metric_bits);
1242
+
1243
+ /* header */
1244
+ if (WARN_ON_ONCE(*p + 2 > end))
1245
+ return -ERANGE;
1246
+
1247
+ ceph_encode_8(p, 1); /* version */
1248
+ ceph_encode_8(p, 1); /* compat */
1249
+
1250
+ if (count > 0) {
1251
+ size_t i;
1252
+ size_t size = METRIC_BYTES(count);
1253
+
1254
+ if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1255
+ return -ERANGE;
1256
+
1257
+ /* metric spec info length */
1258
+ ceph_encode_32(p, 4 + size);
1259
+
1260
+ /* metric spec */
9151261 ceph_encode_32(p, size);
9161262 memset(*p, 0, size);
9171263 for (i = 0; i < count; i++)
918
- ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
1264
+ ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
9191265 *p += size;
9201266 } else {
921
- BUG_ON(*p + 4 > end);
1267
+ if (WARN_ON_ONCE(*p + 4 + 4 > end))
1268
+ return -ERANGE;
1269
+
1270
+ /* metric spec info length */
1271
+ ceph_encode_32(p, 4);
1272
+ /* metric spec */
9221273 ceph_encode_32(p, 0);
9231274 }
1275
+
1276
+ return 0;
9241277 }
9251278
9261279 /*
....@@ -936,7 +1289,9 @@
9361289 int metadata_key_count = 0;
9371290 struct ceph_options *opt = mdsc->fsc->client->options;
9381291 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1292
+ size_t size, count;
9391293 void *p, *end;
1294
+ int ret;
9401295
9411296 const char* metadata[][2] = {
9421297 {"hostname", mdsc->nodename},
....@@ -953,15 +1308,27 @@
9531308 strlen(metadata[i][1]);
9541309 metadata_key_count++;
9551310 }
1311
+
9561312 /* supported feature */
957
- extra_bytes += 4 + 8;
1313
+ size = 0;
1314
+ count = ARRAY_SIZE(feature_bits);
1315
+ if (count > 0)
1316
+ size = FEATURE_BYTES(count);
1317
+ extra_bytes += 4 + size;
1318
+
1319
+ /* metric spec */
1320
+ size = 0;
1321
+ count = ARRAY_SIZE(metric_bits);
1322
+ if (count > 0)
1323
+ size = METRIC_BYTES(count);
1324
+ extra_bytes += 2 + 4 + 4 + size;
9581325
9591326 /* Allocate the message */
9601327 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
9611328 GFP_NOFS, false);
9621329 if (!msg) {
963
- pr_err("create_session_msg ENOMEM creating msg\n");
964
- return NULL;
1330
+ pr_err("ENOMEM creating session open msg\n");
1331
+ return ERR_PTR(-ENOMEM);
9651332 }
9661333 p = msg->front.iov_base;
9671334 end = p + msg->front.iov_len;
....@@ -974,9 +1341,9 @@
9741341 * Serialize client metadata into waiting buffer space, using
9751342 * the format that userspace expects for map<string, string>
9761343 *
977
- * ClientSession messages with metadata are v2
1344
+ * ClientSession messages with metadata are v4
9781345 */
979
- msg->hdr.version = cpu_to_le16(3);
1346
+ msg->hdr.version = cpu_to_le16(4);
9801347 msg->hdr.compat_version = cpu_to_le16(1);
9811348
9821349 /* The write pointer, following the session_head structure */
....@@ -998,7 +1365,20 @@
9981365 p += val_len;
9991366 }
10001367
1001
- encode_supported_features(&p, end);
1368
+ ret = encode_supported_features(&p, end);
1369
+ if (ret) {
1370
+ pr_err("encode_supported_features failed!\n");
1371
+ ceph_msg_put(msg);
1372
+ return ERR_PTR(ret);
1373
+ }
1374
+
1375
+ ret = encode_metric_spec(&p, end);
1376
+ if (ret) {
1377
+ pr_err("encode_metric_spec failed!\n");
1378
+ ceph_msg_put(msg);
1379
+ return ERR_PTR(ret);
1380
+ }
1381
+
10021382 msg->front.iov_len = p - msg->front.iov_base;
10031383 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
10041384
....@@ -1026,8 +1406,8 @@
10261406
10271407 /* send connect message */
10281408 msg = create_session_open_msg(mdsc, session->s_seq);
1029
- if (!msg)
1030
- return -ENOMEM;
1409
+ if (IS_ERR(msg))
1410
+ return PTR_ERR(msg);
10311411 ceph_con_send(&session->s_con, msg);
10321412 return 0;
10331413 }
....@@ -1041,6 +1421,7 @@
10411421 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
10421422 {
10431423 struct ceph_mds_session *session;
1424
+ int ret;
10441425
10451426 session = __ceph_lookup_mds_session(mdsc, target);
10461427 if (!session) {
....@@ -1049,8 +1430,11 @@
10491430 return session;
10501431 }
10511432 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1052
- session->s_state == CEPH_MDS_SESSION_CLOSING)
1053
- __open_session(mdsc, session);
1433
+ session->s_state == CEPH_MDS_SESSION_CLOSING) {
1434
+ ret = __open_session(mdsc, session);
1435
+ if (ret)
1436
+ return ERR_PTR(ret);
1437
+ }
10541438
10551439 return session;
10561440 }
....@@ -1076,7 +1460,7 @@
10761460 struct ceph_mds_session *ts;
10771461 int i, mds = session->s_mds;
10781462
1079
- if (mds >= mdsc->mdsmap->m_num_mds)
1463
+ if (mds >= mdsc->mdsmap->possible_max_rank)
10801464 return;
10811465
10821466 mi = &mdsc->mdsmap->m_info[mds];
....@@ -1085,8 +1469,7 @@
10851469
10861470 for (i = 0; i < mi->num_export_targets; i++) {
10871471 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1088
- if (!IS_ERR(ts))
1089
- ceph_put_mds_session(ts);
1472
+ ceph_put_mds_session(ts);
10901473 }
10911474 }
10921475
....@@ -1137,6 +1520,10 @@
11371520 struct ceph_mds_request, r_unsafe_item);
11381521 pr_warn_ratelimited(" dropping unsafe request %llu\n",
11391522 req->r_tid);
1523
+ if (req->r_target_inode)
1524
+ mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1525
+ if (req->r_unsafe_dir)
1526
+ mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
11401527 __unregister_request(mdsc, req);
11411528 }
11421529 /* zero r_attempts, so kick_requests() will re-send requests */
....@@ -1157,9 +1544,9 @@
11571544 *
11581545 * Caller must hold session s_mutex.
11591546 */
1160
-static int iterate_session_caps(struct ceph_mds_session *session,
1161
- int (*cb)(struct inode *, struct ceph_cap *,
1162
- void *), void *arg)
1547
+int ceph_iterate_session_caps(struct ceph_mds_session *session,
1548
+ int (*cb)(struct inode *, struct ceph_cap *,
1549
+ void *), void *arg)
11631550 {
11641551 struct list_head *p;
11651552 struct ceph_cap *cap;
....@@ -1181,7 +1568,9 @@
11811568 spin_unlock(&session->s_cap_lock);
11821569
11831570 if (last_inode) {
1184
- iput(last_inode);
1571
+ /* avoid calling iput_final() while holding
1572
+ * s_mutex or in mds dispatch threads */
1573
+ ceph_async_iput(last_inode);
11851574 last_inode = NULL;
11861575 }
11871576 if (old_cap) {
....@@ -1201,13 +1590,11 @@
12011590 cap->session = NULL;
12021591 list_del_init(&cap->session_caps);
12031592 session->s_nr_caps--;
1204
- if (cap->queue_release) {
1205
- list_add_tail(&cap->session_caps,
1206
- &session->s_cap_releases);
1207
- session->s_num_cap_releases++;
1208
- } else {
1593
+ atomic64_dec(&session->s_mdsc->metric.total_caps);
1594
+ if (cap->queue_release)
1595
+ __ceph_queue_cap_release(session, cap);
1596
+ else
12091597 old_cap = cap; /* put_cap it w/o locks held */
1210
- }
12111598 }
12121599 if (ret < 0)
12131600 goto out;
....@@ -1217,21 +1604,46 @@
12171604 session->s_cap_iterator = NULL;
12181605 spin_unlock(&session->s_cap_lock);
12191606
1220
- iput(last_inode);
1607
+ ceph_async_iput(last_inode);
12211608 if (old_cap)
12221609 ceph_put_cap(session->s_mdsc, old_cap);
12231610
12241611 return ret;
12251612 }
12261613
1614
+static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
1615
+{
1616
+ struct ceph_inode_info *ci = ceph_inode(inode);
1617
+ struct ceph_cap_snap *capsnap;
1618
+ int capsnap_release = 0;
1619
+
1620
+ lockdep_assert_held(&ci->i_ceph_lock);
1621
+
1622
+ dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
1623
+
1624
+ while (!list_empty(&ci->i_cap_snaps)) {
1625
+ capsnap = list_first_entry(&ci->i_cap_snaps,
1626
+ struct ceph_cap_snap, ci_item);
1627
+ __ceph_remove_capsnap(inode, capsnap, NULL, NULL);
1628
+ ceph_put_snap_context(capsnap->context);
1629
+ ceph_put_cap_snap(capsnap);
1630
+ capsnap_release++;
1631
+ }
1632
+ wake_up_all(&ci->i_cap_wq);
1633
+ wake_up_all(&mdsc->cap_flushing_wq);
1634
+ return capsnap_release;
1635
+}
1636
+
12271637 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
12281638 void *arg)
12291639 {
12301640 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1641
+ struct ceph_mds_client *mdsc = fsc->mdsc;
12311642 struct ceph_inode_info *ci = ceph_inode(inode);
12321643 LIST_HEAD(to_remove);
1233
- bool drop = false;
1644
+ bool dirty_dropped = false;
12341645 bool invalidate = false;
1646
+ int capsnap_release = 0;
12351647
12361648 dout("removing cap %p, ci is %p, inode is %p\n",
12371649 cap, ci, &ci->vfs_inode);
....@@ -1239,13 +1651,13 @@
12391651 __ceph_remove_cap(cap, false);
12401652 if (!ci->i_auth_cap) {
12411653 struct ceph_cap_flush *cf;
1242
- struct ceph_mds_client *mdsc = fsc->mdsc;
12431654
1244
- ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1245
-
1246
- if (ci->i_wrbuffer_ref > 0 &&
1247
- READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1248
- invalidate = true;
1655
+ if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1656
+ if (inode->i_data.nrpages > 0)
1657
+ invalidate = true;
1658
+ if (ci->i_wrbuffer_ref > 0)
1659
+ mapping_set_error(&inode->i_data, -EIO);
1660
+ }
12491661
12501662 while (!list_empty(&ci->i_cap_flush_list)) {
12511663 cf = list_first_entry(&ci->i_cap_flush_list,
....@@ -1256,7 +1668,7 @@
12561668 spin_lock(&mdsc->cap_dirty_lock);
12571669
12581670 list_for_each_entry(cf, &to_remove, i_list)
1259
- list_del(&cf->g_list);
1671
+ list_del_init(&cf->g_list);
12601672
12611673 if (!list_empty(&ci->i_dirty_item)) {
12621674 pr_warn_ratelimited(
....@@ -1265,7 +1677,7 @@
12651677 inode, ceph_ino(inode));
12661678 ci->i_dirty_caps = 0;
12671679 list_del_init(&ci->i_dirty_item);
1268
- drop = true;
1680
+ dirty_dropped = true;
12691681 }
12701682 if (!list_empty(&ci->i_flushing_item)) {
12711683 pr_warn_ratelimited(
....@@ -1275,9 +1687,21 @@
12751687 ci->i_flushing_caps = 0;
12761688 list_del_init(&ci->i_flushing_item);
12771689 mdsc->num_cap_flushing--;
1278
- drop = true;
1690
+ dirty_dropped = true;
12791691 }
12801692 spin_unlock(&mdsc->cap_dirty_lock);
1693
+
1694
+ if (dirty_dropped) {
1695
+ mapping_set_error(inode->i_mapping, -EIO);
1696
+
1697
+ if (ci->i_wrbuffer_ref_head == 0 &&
1698
+ ci->i_wr_ref == 0 &&
1699
+ ci->i_dirty_caps == 0 &&
1700
+ ci->i_flushing_caps == 0) {
1701
+ ceph_put_snap_context(ci->i_head_snapc);
1702
+ ci->i_head_snapc = NULL;
1703
+ }
1704
+ }
12811705
12821706 if (atomic_read(&ci->i_filelock_ref) > 0) {
12831707 /* make further file lock syscall return -EIO */
....@@ -1291,28 +1715,25 @@
12911715 ci->i_prealloc_cap_flush = NULL;
12921716 }
12931717
1294
- if (drop &&
1295
- ci->i_wrbuffer_ref_head == 0 &&
1296
- ci->i_wr_ref == 0 &&
1297
- ci->i_dirty_caps == 0 &&
1298
- ci->i_flushing_caps == 0) {
1299
- ceph_put_snap_context(ci->i_head_snapc);
1300
- ci->i_head_snapc = NULL;
1301
- }
1718
+ if (!list_empty(&ci->i_cap_snaps))
1719
+ capsnap_release = remove_capsnaps(mdsc, inode);
13021720 }
13031721 spin_unlock(&ci->i_ceph_lock);
13041722 while (!list_empty(&to_remove)) {
13051723 struct ceph_cap_flush *cf;
13061724 cf = list_first_entry(&to_remove,
13071725 struct ceph_cap_flush, i_list);
1308
- list_del(&cf->i_list);
1309
- ceph_free_cap_flush(cf);
1726
+ list_del_init(&cf->i_list);
1727
+ if (!cf->is_capsnap)
1728
+ ceph_free_cap_flush(cf);
13101729 }
13111730
13121731 wake_up_all(&ci->i_cap_wq);
13131732 if (invalidate)
13141733 ceph_queue_invalidate(inode);
1315
- if (drop)
1734
+ if (dirty_dropped)
1735
+ iput(inode);
1736
+ while (capsnap_release--)
13161737 iput(inode);
13171738 return 0;
13181739 }
....@@ -1327,7 +1748,7 @@
13271748 LIST_HEAD(dispose);
13281749
13291750 dout("remove_session_caps on %p\n", session);
1330
- iterate_session_caps(session, remove_session_caps_cb, fsc);
1751
+ ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
13311752
13321753 wake_up_all(&fsc->mdsc->cap_flushing_wq);
13331754
....@@ -1353,7 +1774,8 @@
13531774 spin_unlock(&session->s_cap_lock);
13541775
13551776 inode = ceph_find_inode(sb, vino);
1356
- iput(inode);
1777
+ /* avoid calling iput_final() while holding s_mutex */
1778
+ ceph_async_iput(inode);
13571779
13581780 spin_lock(&session->s_cap_lock);
13591781 }
....@@ -1368,6 +1790,12 @@
13681790 dispose_cap_releases(session->s_mdsc, &dispose);
13691791 }
13701792
1793
+enum {
1794
+ RECONNECT,
1795
+ RENEWCAPS,
1796
+ FORCE_RO,
1797
+};
1798
+
13711799 /*
13721800 * wake up any threads waiting on this session's caps. if the cap is
13731801 * old (didn't get renewed on the client reconnect), remove it now.
....@@ -1378,23 +1806,31 @@
13781806 void *arg)
13791807 {
13801808 struct ceph_inode_info *ci = ceph_inode(inode);
1809
+ unsigned long ev = (unsigned long)arg;
13811810
1382
- if (arg) {
1811
+ if (ev == RECONNECT) {
13831812 spin_lock(&ci->i_ceph_lock);
13841813 ci->i_wanted_max_size = 0;
13851814 ci->i_requested_max_size = 0;
13861815 spin_unlock(&ci->i_ceph_lock);
1816
+ } else if (ev == RENEWCAPS) {
1817
+ if (cap->cap_gen < cap->session->s_cap_gen) {
1818
+ /* mds did not re-issue stale cap */
1819
+ spin_lock(&ci->i_ceph_lock);
1820
+ cap->issued = cap->implemented = CEPH_CAP_PIN;
1821
+ spin_unlock(&ci->i_ceph_lock);
1822
+ }
1823
+ } else if (ev == FORCE_RO) {
13871824 }
13881825 wake_up_all(&ci->i_cap_wq);
13891826 return 0;
13901827 }
13911828
1392
-static void wake_up_session_caps(struct ceph_mds_session *session,
1393
- int reconnect)
1829
+static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
13941830 {
13951831 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1396
- iterate_session_caps(session, wake_up_session_cb,
1397
- (void *)(unsigned long)reconnect);
1832
+ ceph_iterate_session_caps(session, wake_up_session_cb,
1833
+ (void *)(unsigned long)ev);
13981834 }
13991835
14001836 /*
....@@ -1425,8 +1861,8 @@
14251861
14261862 dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
14271863 ceph_mds_state_name(state));
1428
- msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1429
- ++session->s_renew_seq);
1864
+ msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1865
+ ++session->s_renew_seq);
14301866 if (!msg)
14311867 return -ENOMEM;
14321868 ceph_con_send(&session->s_con, msg);
....@@ -1440,7 +1876,7 @@
14401876
14411877 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
14421878 session->s_mds, ceph_session_state_name(session->s_state), seq);
1443
- msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1879
+ msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
14441880 if (!msg)
14451881 return -ENOMEM;
14461882 ceph_con_send(&session->s_con, msg);
....@@ -1479,21 +1915,21 @@
14791915 spin_unlock(&session->s_cap_lock);
14801916
14811917 if (wake)
1482
- wake_up_session_caps(session, 0);
1918
+ wake_up_session_caps(session, RENEWCAPS);
14831919 }
14841920
14851921 /*
14861922 * send a session close request
14871923 */
1488
-static int request_close_session(struct ceph_mds_client *mdsc,
1489
- struct ceph_mds_session *session)
1924
+static int request_close_session(struct ceph_mds_session *session)
14901925 {
14911926 struct ceph_msg *msg;
14921927
14931928 dout("request_close_session mds%d state %s seq %lld\n",
14941929 session->s_mds, ceph_session_state_name(session->s_state),
14951930 session->s_seq);
1496
- msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1931
+ msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1932
+ session->s_seq);
14971933 if (!msg)
14981934 return -ENOMEM;
14991935 ceph_con_send(&session->s_con, msg);
....@@ -1509,7 +1945,7 @@
15091945 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
15101946 return 0;
15111947 session->s_state = CEPH_MDS_SESSION_CLOSING;
1512
- return request_close_session(mdsc, session);
1948
+ return request_close_session(session);
15131949 }
15141950
15151951 static bool drop_negative_children(struct dentry *dentry)
....@@ -1547,11 +1983,11 @@
15471983 */
15481984 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
15491985 {
1550
- struct ceph_mds_session *session = arg;
1986
+ int *remaining = arg;
15511987 struct ceph_inode_info *ci = ceph_inode(inode);
15521988 int used, wanted, oissued, mine;
15531989
1554
- if (session->s_trim_caps <= 0)
1990
+ if (*remaining <= 0)
15551991 return -1;
15561992
15571993 spin_lock(&ci->i_ceph_lock);
....@@ -1577,7 +2013,8 @@
15772013 }
15782014 /* The inode has cached pages, but it's no longer used.
15792015 * we can safely drop it */
1580
- if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2016
+ if (S_ISREG(inode->i_mode) &&
2017
+ wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
15812018 !(oissued & CEPH_CAP_FILE_CACHE)) {
15822019 used = 0;
15832020 oissued = 0;
....@@ -1588,7 +2025,7 @@
15882025 if (oissued) {
15892026 /* we aren't the only cap.. just remove us */
15902027 __ceph_remove_cap(cap, true);
1591
- session->s_trim_caps--;
2028
+ (*remaining)--;
15922029 } else {
15932030 struct dentry *dentry;
15942031 /* try dropping referring dentries */
....@@ -1600,7 +2037,7 @@
16002037 d_prune_aliases(inode);
16012038 count = atomic_read(&inode->i_count);
16022039 if (count == 1)
1603
- session->s_trim_caps--;
2040
+ (*remaining)--;
16042041 dout("trim_caps_cb %p cap %p pruned, count now %d\n",
16052042 inode, cap, count);
16062043 } else {
....@@ -1626,15 +2063,15 @@
16262063 dout("trim_caps mds%d start: %d / %d, trim %d\n",
16272064 session->s_mds, session->s_nr_caps, max_caps, trim_caps);
16282065 if (trim_caps > 0) {
1629
- session->s_trim_caps = trim_caps;
1630
- iterate_session_caps(session, trim_caps_cb, session);
2066
+ int remaining = trim_caps;
2067
+
2068
+ ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
16312069 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
16322070 session->s_mds, session->s_nr_caps, max_caps,
1633
- trim_caps - session->s_trim_caps);
1634
- session->s_trim_caps = 0;
2071
+ trim_caps - remaining);
16352072 }
16362073
1637
- ceph_send_cap_releases(mdsc, session);
2074
+ ceph_flush_cap_releases(mdsc, session);
16382075 return 0;
16392076 }
16402077
....@@ -1677,8 +2114,8 @@
16772114 /*
16782115 * called under s_mutex
16792116 */
1680
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1681
- struct ceph_mds_session *session)
2117
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2118
+ struct ceph_mds_session *session)
16822119 {
16832120 struct ceph_msg *msg = NULL;
16842121 struct ceph_mds_cap_release *head;
....@@ -1720,7 +2157,8 @@
17202157 num_cap_releases--;
17212158
17222159 head = msg->front.iov_base;
1723
- le32_add_cpu(&head->num, 1);
2160
+ put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2161
+ &head->num);
17242162 item = msg->front.iov_base + msg->front.iov_len;
17252163 item->ino = cpu_to_le64(cap->cap_ino);
17262164 item->cap_id = cpu_to_le64(cap->cap_id);
....@@ -1770,6 +2208,81 @@
17702208 spin_unlock(&session->s_cap_lock);
17712209 }
17722210
2211
+static void ceph_cap_release_work(struct work_struct *work)
2212
+{
2213
+ struct ceph_mds_session *session =
2214
+ container_of(work, struct ceph_mds_session, s_cap_release_work);
2215
+
2216
+ mutex_lock(&session->s_mutex);
2217
+ if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2218
+ session->s_state == CEPH_MDS_SESSION_HUNG)
2219
+ ceph_send_cap_releases(session->s_mdsc, session);
2220
+ mutex_unlock(&session->s_mutex);
2221
+ ceph_put_mds_session(session);
2222
+}
2223
+
2224
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2225
+ struct ceph_mds_session *session)
2226
+{
2227
+ if (mdsc->stopping)
2228
+ return;
2229
+
2230
+ ceph_get_mds_session(session);
2231
+ if (queue_work(mdsc->fsc->cap_wq,
2232
+ &session->s_cap_release_work)) {
2233
+ dout("cap release work queued\n");
2234
+ } else {
2235
+ ceph_put_mds_session(session);
2236
+ dout("failed to queue cap release work\n");
2237
+ }
2238
+}
2239
+
2240
+/*
2241
+ * caller holds session->s_cap_lock
2242
+ */
2243
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
2244
+ struct ceph_cap *cap)
2245
+{
2246
+ list_add_tail(&cap->session_caps, &session->s_cap_releases);
2247
+ session->s_num_cap_releases++;
2248
+
2249
+ if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2250
+ ceph_flush_cap_releases(session->s_mdsc, session);
2251
+}
2252
+
2253
+static void ceph_cap_reclaim_work(struct work_struct *work)
2254
+{
2255
+ struct ceph_mds_client *mdsc =
2256
+ container_of(work, struct ceph_mds_client, cap_reclaim_work);
2257
+ int ret = ceph_trim_dentries(mdsc);
2258
+ if (ret == -EAGAIN)
2259
+ ceph_queue_cap_reclaim_work(mdsc);
2260
+}
2261
+
2262
+void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2263
+{
2264
+ if (mdsc->stopping)
2265
+ return;
2266
+
2267
+ if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2268
+ dout("caps reclaim work queued\n");
2269
+ } else {
2270
+ dout("failed to queue caps release work\n");
2271
+ }
2272
+}
2273
+
2274
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2275
+{
2276
+ int val;
2277
+ if (!nr)
2278
+ return;
2279
+ val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2280
+ if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2281
+ atomic_set(&mdsc->cap_reclaim_pending, 0);
2282
+ ceph_queue_cap_reclaim_work(mdsc);
2283
+ }
2284
+}
2285
+
17732286 /*
17742287 * requests
17752288 */
....@@ -1781,12 +2294,13 @@
17812294 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
17822295 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
17832296 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1784
- int order, num_entries;
2297
+ unsigned int num_entries;
2298
+ int order;
17852299
17862300 spin_lock(&ci->i_ceph_lock);
17872301 num_entries = ci->i_files + ci->i_subdirs;
17882302 spin_unlock(&ci->i_ceph_lock);
1789
- num_entries = max(num_entries, 1);
2303
+ num_entries = max(num_entries, 1U);
17902304 num_entries = min(num_entries, opt->max_readdir);
17912305
17922306 order = get_order(size * num_entries);
....@@ -1817,15 +2331,16 @@
18172331 struct ceph_mds_request *
18182332 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
18192333 {
1820
- struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1821
- struct timespec64 ts;
2334
+ struct ceph_mds_request *req;
18222335
2336
+ req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
18232337 if (!req)
18242338 return ERR_PTR(-ENOMEM);
18252339
18262340 mutex_init(&req->r_fill_mutex);
18272341 req->r_mdsc = mdsc;
18282342 req->r_started = jiffies;
2343
+ req->r_start_latency = ktime_get();
18292344 req->r_resend_mds = -1;
18302345 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
18312346 INIT_LIST_HEAD(&req->r_unsafe_target_item);
....@@ -1837,8 +2352,7 @@
18372352 init_completion(&req->r_safe_completion);
18382353 INIT_LIST_HEAD(&req->r_unsafe_item);
18392354
1840
- ktime_get_coarse_real_ts64(&ts);
1841
- req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
2355
+ ktime_get_coarse_real_ts64(&req->r_stamp);
18422356
18432357 req->r_op = op;
18442358 req->r_direct_mode = mode;
....@@ -1873,43 +2387,29 @@
18732387 * Encode hidden .snap dirs as a double /, i.e.
18742388 * foo/.snap/bar -> foo//bar
18752389 */
1876
-char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
2390
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
18772391 int stop_on_nosnap)
18782392 {
18792393 struct dentry *temp;
18802394 char *path;
1881
- int len, pos;
2395
+ int pos;
18822396 unsigned seq;
2397
+ u64 base;
18832398
18842399 if (!dentry)
18852400 return ERR_PTR(-EINVAL);
18862401
1887
-retry:
1888
- len = 0;
1889
- seq = read_seqbegin(&rename_lock);
1890
- rcu_read_lock();
1891
- for (temp = dentry; !IS_ROOT(temp);) {
1892
- struct inode *inode = d_inode(temp);
1893
- if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1894
- len++; /* slash only */
1895
- else if (stop_on_nosnap && inode &&
1896
- ceph_snap(inode) == CEPH_NOSNAP)
1897
- break;
1898
- else
1899
- len += 1 + temp->d_name.len;
1900
- temp = temp->d_parent;
1901
- }
1902
- rcu_read_unlock();
1903
- if (len)
1904
- len--; /* no leading '/' */
1905
-
1906
- path = kmalloc(len+1, GFP_NOFS);
2402
+ path = __getname();
19072403 if (!path)
19082404 return ERR_PTR(-ENOMEM);
1909
- pos = len;
1910
- path[pos] = 0; /* trailing null */
2405
+retry:
2406
+ pos = PATH_MAX - 1;
2407
+ path[pos] = '\0';
2408
+
2409
+ seq = read_seqbegin(&rename_lock);
19112410 rcu_read_lock();
1912
- for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
2411
+ temp = dentry;
2412
+ for (;;) {
19132413 struct inode *inode;
19142414
19152415 spin_lock(&temp->d_lock);
....@@ -1917,9 +2417,10 @@
19172417 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
19182418 dout("build_path path+%d: %p SNAPDIR\n",
19192419 pos, temp);
1920
- } else if (stop_on_nosnap && inode &&
2420
+ } else if (stop_on_nosnap && inode && dentry != temp &&
19212421 ceph_snap(inode) == CEPH_NOSNAP) {
19222422 spin_unlock(&temp->d_lock);
2423
+ pos++; /* get rid of any prepended '/' */
19232424 break;
19242425 } else {
19252426 pos -= temp->d_name.len;
....@@ -1927,83 +2428,58 @@
19272428 spin_unlock(&temp->d_lock);
19282429 break;
19292430 }
1930
- strncpy(path + pos, temp->d_name.name,
1931
- temp->d_name.len);
2431
+ memcpy(path + pos, temp->d_name.name, temp->d_name.len);
19322432 }
19332433 spin_unlock(&temp->d_lock);
1934
- if (pos)
1935
- path[--pos] = '/';
1936
- temp = temp->d_parent;
2434
+ temp = READ_ONCE(temp->d_parent);
2435
+
2436
+ /* Are we at the root? */
2437
+ if (IS_ROOT(temp))
2438
+ break;
2439
+
2440
+ /* Are we out of buffer? */
2441
+ if (--pos < 0)
2442
+ break;
2443
+
2444
+ path[pos] = '/';
19372445 }
2446
+ base = ceph_ino(d_inode(temp));
19382447 rcu_read_unlock();
1939
- if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1940
- pr_err("build_path did not end path lookup where "
1941
- "expected, namelen is %d, pos is %d\n", len, pos);
1942
- /* presumably this is only possible if racing with a
1943
- rename of one of the parent directories (we can not
1944
- lock the dentries above us to prevent this, but
1945
- retrying should be harmless) */
1946
- kfree(path);
2448
+
2449
+ if (read_seqretry(&rename_lock, seq))
2450
+ goto retry;
2451
+
2452
+ if (pos < 0) {
2453
+ /*
2454
+ * A rename didn't occur, but somehow we didn't end up where
2455
+ * we thought we would. Throw a warning and try again.
2456
+ */
2457
+ pr_warn("build_path did not end path lookup where "
2458
+ "expected, pos is %d\n", pos);
19472459 goto retry;
19482460 }
19492461
1950
- *base = ceph_ino(d_inode(temp));
1951
- *plen = len;
2462
+ *pbase = base;
2463
+ *plen = PATH_MAX - 1 - pos;
19522464 dout("build_path on %p %d built %llx '%.*s'\n",
1953
- dentry, d_count(dentry), *base, len, path);
1954
- return path;
1955
-}
1956
-
1957
-/* Duplicate the dentry->d_name.name safely */
1958
-static int clone_dentry_name(struct dentry *dentry, const char **ppath,
1959
- int *ppathlen)
1960
-{
1961
- u32 len;
1962
- char *name;
1963
-
1964
-retry:
1965
- len = READ_ONCE(dentry->d_name.len);
1966
- name = kmalloc(len + 1, GFP_NOFS);
1967
- if (!name)
1968
- return -ENOMEM;
1969
-
1970
- spin_lock(&dentry->d_lock);
1971
- if (dentry->d_name.len != len) {
1972
- spin_unlock(&dentry->d_lock);
1973
- kfree(name);
1974
- goto retry;
1975
- }
1976
- memcpy(name, dentry->d_name.name, len);
1977
- spin_unlock(&dentry->d_lock);
1978
-
1979
- name[len] = '\0';
1980
- *ppath = name;
1981
- *ppathlen = len;
1982
- return 0;
2465
+ dentry, d_count(dentry), base, *plen, path + pos);
2466
+ return path + pos;
19832467 }
19842468
19852469 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
19862470 const char **ppath, int *ppathlen, u64 *pino,
19872471 bool *pfreepath, bool parent_locked)
19882472 {
1989
- int ret;
19902473 char *path;
19912474
19922475 rcu_read_lock();
19932476 if (!dir)
19942477 dir = d_inode_rcu(dentry->d_parent);
1995
- if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
2478
+ if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
19962479 *pino = ceph_ino(dir);
19972480 rcu_read_unlock();
1998
- if (parent_locked) {
1999
- *ppath = dentry->d_name.name;
2000
- *ppathlen = dentry->d_name.len;
2001
- } else {
2002
- ret = clone_dentry_name(dentry, ppath, ppathlen);
2003
- if (ret)
2004
- return ret;
2005
- *pfreepath = true;
2006
- }
2481
+ *ppath = dentry->d_name.name;
2482
+ *ppathlen = dentry->d_name.len;
20072483 return 0;
20082484 }
20092485 rcu_read_unlock();
....@@ -2115,11 +2591,11 @@
21152591 (!!req->r_inode_drop + !!req->r_dentry_drop +
21162592 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
21172593 if (req->r_dentry_drop)
2118
- len += req->r_dentry->d_name.len;
2594
+ len += pathlen1;
21192595 if (req->r_old_dentry_drop)
2120
- len += req->r_old_dentry->d_name.len;
2596
+ len += pathlen2;
21212597
2122
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
2598
+ msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
21232599 if (!msg) {
21242600 msg = ERR_PTR(-ENOMEM);
21252601 goto out_free2;
....@@ -2136,6 +2612,7 @@
21362612 head->op = cpu_to_le32(req->r_op);
21372613 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
21382614 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2615
+ head->ino = cpu_to_le64(req->r_deleg_ino);
21392616 head->args = req->r_args;
21402617
21412618 ceph_encode_filepath(&p, end, ino1, path1);
....@@ -2149,7 +2626,8 @@
21492626 if (req->r_inode_drop)
21502627 releases += ceph_encode_inode_release(&p,
21512628 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2152
- mds, req->r_inode_drop, req->r_inode_unless, 0);
2629
+ mds, req->r_inode_drop, req->r_inode_unless,
2630
+ req->r_op == CEPH_MDS_OP_READDIR);
21532631 if (req->r_dentry_drop)
21542632 releases += ceph_encode_dentry_release(&p, req->r_dentry,
21552633 req->r_parent, mds, req->r_dentry_drop,
....@@ -2178,13 +2656,17 @@
21782656 ceph_encode_copy(&p, &ts, sizeof(ts));
21792657 }
21802658
2181
- BUG_ON(p > end);
2659
+ if (WARN_ON_ONCE(p > end)) {
2660
+ ceph_msg_put(msg);
2661
+ msg = ERR_PTR(-ERANGE);
2662
+ goto out_free2;
2663
+ }
2664
+
21822665 msg->front.iov_len = p - msg->front.iov_base;
21832666 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
21842667
21852668 if (req->r_pagelist) {
21862669 struct ceph_pagelist *pagelist = req->r_pagelist;
2187
- refcount_inc(&pagelist->refcnt);
21882670 ceph_msg_data_add_pagelist(msg, pagelist);
21892671 msg->hdr.data_len = cpu_to_le32(pagelist->length);
21902672 } else {
....@@ -2195,10 +2677,10 @@
21952677
21962678 out_free2:
21972679 if (freepath2)
2198
- kfree((char *)path2);
2680
+ ceph_mdsc_free_path((char *)path2, pathlen2);
21992681 out_free1:
22002682 if (freepath1)
2201
- kfree((char *)path1);
2683
+ ceph_mdsc_free_path((char *)path1, pathlen1);
22022684 out:
22032685 return msg;
22042686 }
....@@ -2210,10 +2692,11 @@
22102692 static void complete_request(struct ceph_mds_client *mdsc,
22112693 struct ceph_mds_request *req)
22122694 {
2695
+ req->r_end_latency = ktime_get();
2696
+
22132697 if (req->r_callback)
22142698 req->r_callback(mdsc, req);
2215
- else
2216
- complete_all(&req->r_completion);
2699
+ complete_all(&req->r_completion);
22172700 }
22182701
22192702 /*
....@@ -2291,15 +2774,36 @@
22912774 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
22922775 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
22932776 flags |= CEPH_MDS_FLAG_REPLAY;
2777
+ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2778
+ flags |= CEPH_MDS_FLAG_ASYNC;
22942779 if (req->r_parent)
22952780 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
22962781 rhead->flags = cpu_to_le32(flags);
22972782 rhead->num_fwd = req->r_num_fwd;
22982783 rhead->num_retry = req->r_attempts - 1;
2299
- rhead->ino = 0;
23002784
23012785 dout(" r_parent = %p\n", req->r_parent);
23022786 return 0;
2787
+}
2788
+
2789
+/*
2790
+ * called under mdsc->mutex
2791
+ */
2792
+static int __send_request(struct ceph_mds_client *mdsc,
2793
+ struct ceph_mds_session *session,
2794
+ struct ceph_mds_request *req,
2795
+ bool drop_cap_releases)
2796
+{
2797
+ int err;
2798
+
2799
+ err = __prepare_send_request(mdsc, req, session->s_mds,
2800
+ drop_cap_releases);
2801
+ if (!err) {
2802
+ ceph_msg_get(req->r_request);
2803
+ ceph_con_send(&session->s_con, req->r_request);
2804
+ }
2805
+
2806
+ return err;
23032807 }
23042808
23052809 /*
....@@ -2311,6 +2815,7 @@
23112815 struct ceph_mds_session *session = NULL;
23122816 int mds = -1;
23132817 int err = 0;
2818
+ bool random;
23142819
23152820 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
23162821 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
....@@ -2321,7 +2826,7 @@
23212826 if (req->r_timeout &&
23222827 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
23232828 dout("do_request timed out\n");
2324
- err = -EIO;
2829
+ err = -ETIMEDOUT;
23252830 goto finish;
23262831 }
23272832 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
....@@ -2350,9 +2855,13 @@
23502855
23512856 put_request_session(req);
23522857
2353
- mds = __choose_mds(mdsc, req);
2858
+ mds = __choose_mds(mdsc, req, &random);
23542859 if (mds < 0 ||
23552860 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2861
+ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2862
+ err = -EJUKEBOX;
2863
+ goto finish;
2864
+ }
23562865 dout("do_request no mds or not active, waiting for map\n");
23572866 list_add(&req->r_wait, &mdsc->waiting_for_map);
23582867 return;
....@@ -2367,7 +2876,7 @@
23672876 goto finish;
23682877 }
23692878 }
2370
- req->r_session = get_session(session);
2879
+ req->r_session = ceph_get_mds_session(session);
23712880
23722881 dout("do_request mds%d session %p state %s\n", mds, session,
23732882 ceph_session_state_name(session->s_state));
....@@ -2377,9 +2886,24 @@
23772886 err = -EACCES;
23782887 goto out_session;
23792888 }
2889
+ /*
2890
+ * We cannot queue async requests since the caps and delegated
2891
+ * inodes are bound to the session. Just return -EJUKEBOX and
2892
+ * let the caller retry a sync request in that case.
2893
+ */
2894
+ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2895
+ err = -EJUKEBOX;
2896
+ goto out_session;
2897
+ }
23802898 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2381
- session->s_state == CEPH_MDS_SESSION_CLOSING)
2382
- __open_session(mdsc, session);
2899
+ session->s_state == CEPH_MDS_SESSION_CLOSING) {
2900
+ err = __open_session(mdsc, session);
2901
+ if (err)
2902
+ goto out_session;
2903
+ /* retry the same mds later */
2904
+ if (random)
2905
+ req->r_resend_mds = mds;
2906
+ }
23832907 list_add(&req->r_wait, &session->s_waiting);
23842908 goto out_session;
23852909 }
....@@ -2390,11 +2914,7 @@
23902914 if (req->r_request_started == 0) /* note request start time */
23912915 req->r_request_started = jiffies;
23922916
2393
- err = __prepare_send_request(mdsc, req, mds, false);
2394
- if (!err) {
2395
- ceph_msg_get(req->r_request);
2396
- ceph_con_send(&session->s_con, req->r_request);
2397
- }
2917
+ err = __send_request(mdsc, session, req, false);
23982918
23992919 out_session:
24002920 ceph_put_mds_session(session);
....@@ -2454,49 +2974,61 @@
24542974 }
24552975 }
24562976
2457
-void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2977
+int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
24582978 struct ceph_mds_request *req)
24592979 {
2460
- dout("submit_request on %p\n", req);
2461
- mutex_lock(&mdsc->mutex);
2462
- __register_request(mdsc, req, NULL);
2463
- __do_request(mdsc, req);
2464
- mutex_unlock(&mdsc->mutex);
2465
-}
2466
-
2467
-/*
2468
- * Synchrously perform an mds request. Take care of all of the
2469
- * session setup, forwarding, retry details.
2470
- */
2471
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2472
- struct inode *dir,
2473
- struct ceph_mds_request *req)
2474
-{
2475
- int err;
2476
-
2477
- dout("do_request on %p\n", req);
2980
+ int err = 0;
24782981
24792982 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
24802983 if (req->r_inode)
24812984 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2482
- if (req->r_parent)
2483
- ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2985
+ if (req->r_parent) {
2986
+ struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2987
+ int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2988
+ CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2989
+ spin_lock(&ci->i_ceph_lock);
2990
+ ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2991
+ __ceph_touch_fmode(ci, mdsc, fmode);
2992
+ spin_unlock(&ci->i_ceph_lock);
2993
+ ihold(req->r_parent);
2994
+ }
24842995 if (req->r_old_dentry_dir)
24852996 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
24862997 CEPH_CAP_PIN);
24872998
2488
- /* issue */
2999
+ if (req->r_inode) {
3000
+ err = ceph_wait_on_async_create(req->r_inode);
3001
+ if (err) {
3002
+ dout("%s: wait for async create returned: %d\n",
3003
+ __func__, err);
3004
+ return err;
3005
+ }
3006
+ }
3007
+
3008
+ if (!err && req->r_old_inode) {
3009
+ err = ceph_wait_on_async_create(req->r_old_inode);
3010
+ if (err) {
3011
+ dout("%s: wait for async create returned: %d\n",
3012
+ __func__, err);
3013
+ return err;
3014
+ }
3015
+ }
3016
+
3017
+ dout("submit_request on %p for inode %p\n", req, dir);
24893018 mutex_lock(&mdsc->mutex);
24903019 __register_request(mdsc, req, dir);
24913020 __do_request(mdsc, req);
3021
+ err = req->r_err;
3022
+ mutex_unlock(&mdsc->mutex);
3023
+ return err;
3024
+}
24923025
2493
- if (req->r_err) {
2494
- err = req->r_err;
2495
- goto out;
2496
- }
3026
+static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3027
+ struct ceph_mds_request *req)
3028
+{
3029
+ int err;
24973030
24983031 /* wait */
2499
- mutex_unlock(&mdsc->mutex);
25003032 dout("do_request waiting\n");
25013033 if (!req->r_timeout && req->r_wait_for_completion) {
25023034 err = req->r_wait_for_completion(mdsc, req);
....@@ -2507,7 +3039,7 @@
25073039 if (timeleft > 0)
25083040 err = 0;
25093041 else if (!timeleft)
2510
- err = -EIO; /* timed out */
3042
+ err = -ETIMEDOUT; /* timed out */
25113043 else
25123044 err = timeleft; /* killed */
25133045 }
....@@ -2537,8 +3069,26 @@
25373069 err = req->r_err;
25383070 }
25393071
2540
-out:
25413072 mutex_unlock(&mdsc->mutex);
3073
+ return err;
3074
+}
3075
+
3076
+/*
3077
+ * Synchrously perform an mds request. Take care of all of the
3078
+ * session setup, forwarding, retry details.
3079
+ */
3080
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3081
+ struct inode *dir,
3082
+ struct ceph_mds_request *req)
3083
+{
3084
+ int err;
3085
+
3086
+ dout("do_request on %p\n", req);
3087
+
3088
+ /* issue */
3089
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
3090
+ if (!err)
3091
+ err = ceph_mdsc_wait_request(mdsc, req);
25423092 dout("do_request %p done, result %d\n", req, err);
25433093 return err;
25443094 }
....@@ -2641,7 +3191,7 @@
26413191 mutex_unlock(&mdsc->mutex);
26423192 goto out;
26433193 } else {
2644
- int mds = __choose_mds(mdsc, req);
3194
+ int mds = __choose_mds(mdsc, req, NULL);
26453195 if (mds >= 0 && mds != req->r_session->s_mds) {
26463196 dout("but auth changed, so resending\n");
26473197 __do_request(mdsc, req);
....@@ -2657,6 +3207,10 @@
26573207 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
26583208 __unregister_request(mdsc, req);
26593209
3210
+ /* last request during umount? */
3211
+ if (mdsc->stopping && !__get_oldest_req(mdsc))
3212
+ complete_all(&mdsc->safe_umount_waiters);
3213
+
26603214 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
26613215 /*
26623216 * We already handled the unsafe response, now do the
....@@ -2667,28 +3221,20 @@
26673221 */
26683222 dout("got safe reply %llu, mds%d\n", tid, mds);
26693223
2670
- /* last unsafe request during umount? */
2671
- if (mdsc->stopping && !__get_oldest_req(mdsc))
2672
- complete_all(&mdsc->safe_umount_waiters);
26733224 mutex_unlock(&mdsc->mutex);
26743225 goto out;
26753226 }
26763227 } else {
26773228 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
26783229 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2679
- if (req->r_unsafe_dir) {
2680
- struct ceph_inode_info *ci =
2681
- ceph_inode(req->r_unsafe_dir);
2682
- spin_lock(&ci->i_unsafe_lock);
2683
- list_add_tail(&req->r_unsafe_dir_item,
2684
- &ci->i_unsafe_dirops);
2685
- spin_unlock(&ci->i_unsafe_lock);
2686
- }
26873230 }
26883231
26893232 dout("handle_reply tid %lld result %d\n", tid, result);
26903233 rinfo = &req->r_reply_info;
2691
- err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
3234
+ if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3235
+ err = parse_reply_info(session, msg, rinfo, (u64)-1);
3236
+ else
3237
+ err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
26923238 mutex_unlock(&mdsc->mutex);
26933239
26943240 mutex_lock(&session->s_mutex);
....@@ -2719,7 +3265,6 @@
27193265 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
27203266 req->r_op == CEPH_MDS_OP_LSSNAP))
27213267 ceph_readdir_prepopulate(req, req->r_session);
2722
- ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
27233268 }
27243269 current->journal_info = NULL;
27253270 mutex_unlock(&req->r_fill_mutex);
....@@ -2728,12 +3273,18 @@
27283273 if (realm)
27293274 ceph_put_snap_realm(mdsc, realm);
27303275
2731
- if (err == 0 && req->r_target_inode &&
2732
- test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2733
- struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2734
- spin_lock(&ci->i_unsafe_lock);
2735
- list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2736
- spin_unlock(&ci->i_unsafe_lock);
3276
+ if (err == 0) {
3277
+ if (req->r_target_inode &&
3278
+ test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3279
+ struct ceph_inode_info *ci =
3280
+ ceph_inode(req->r_target_inode);
3281
+ spin_lock(&ci->i_unsafe_lock);
3282
+ list_add_tail(&req->r_unsafe_target_item,
3283
+ &ci->i_unsafe_iops);
3284
+ spin_unlock(&ci->i_unsafe_lock);
3285
+ }
3286
+
3287
+ ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
27373288 }
27383289 out_err:
27393290 mutex_lock(&mdsc->mutex);
....@@ -2753,6 +3304,9 @@
27533304
27543305 /* kick calling process */
27553306 complete_request(mdsc, req);
3307
+
3308
+ ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
3309
+ req->r_end_latency, err);
27563310 out:
27573311 ceph_mdsc_put_request(req);
27583312 return;
....@@ -2812,6 +3366,34 @@
28123366 pr_err("mdsc_handle_forward decode error err=%d\n", err);
28133367 }
28143368
3369
+static int __decode_session_metadata(void **p, void *end,
3370
+ bool *blocklisted)
3371
+{
3372
+ /* map<string,string> */
3373
+ u32 n;
3374
+ bool err_str;
3375
+ ceph_decode_32_safe(p, end, n, bad);
3376
+ while (n-- > 0) {
3377
+ u32 len;
3378
+ ceph_decode_32_safe(p, end, len, bad);
3379
+ ceph_decode_need(p, end, len, bad);
3380
+ err_str = !strncmp(*p, "error_string", len);
3381
+ *p += len;
3382
+ ceph_decode_32_safe(p, end, len, bad);
3383
+ ceph_decode_need(p, end, len, bad);
3384
+ /*
3385
+ * Match "blocklisted (blacklisted)" from newer MDSes,
3386
+ * or "blacklisted" from older MDSes.
3387
+ */
3388
+ if (err_str && strnstr(*p, "blacklisted", len))
3389
+ *blocklisted = true;
3390
+ *p += len;
3391
+ }
3392
+ return 0;
3393
+bad:
3394
+ return -1;
3395
+}
3396
+
28153397 /*
28163398 * handle a mds session control message
28173399 */
....@@ -2819,21 +3401,40 @@
28193401 struct ceph_msg *msg)
28203402 {
28213403 struct ceph_mds_client *mdsc = session->s_mdsc;
2822
- u32 op;
2823
- u64 seq;
28243404 int mds = session->s_mds;
2825
- struct ceph_mds_session_head *h = msg->front.iov_base;
3405
+ int msg_version = le16_to_cpu(msg->hdr.version);
3406
+ void *p = msg->front.iov_base;
3407
+ void *end = p + msg->front.iov_len;
3408
+ struct ceph_mds_session_head *h;
3409
+ u32 op;
3410
+ u64 seq, features = 0;
28263411 int wake = 0;
3412
+ bool blocklisted = false;
28273413
28283414 /* decode */
2829
- if (msg->front.iov_len < sizeof(*h))
2830
- goto bad;
3415
+ ceph_decode_need(&p, end, sizeof(*h), bad);
3416
+ h = p;
3417
+ p += sizeof(*h);
3418
+
28313419 op = le32_to_cpu(h->op);
28323420 seq = le64_to_cpu(h->seq);
28333421
3422
+ if (msg_version >= 3) {
3423
+ u32 len;
3424
+ /* version >= 2, metadata */
3425
+ if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3426
+ goto bad;
3427
+ /* version >= 3, feature bits */
3428
+ ceph_decode_32_safe(&p, end, len, bad);
3429
+ if (len) {
3430
+ ceph_decode_64_safe(&p, end, features, bad);
3431
+ p += len - sizeof(features);
3432
+ }
3433
+ }
3434
+
28343435 mutex_lock(&mdsc->mutex);
28353436 if (op == CEPH_SESSION_CLOSE) {
2836
- get_session(session);
3437
+ ceph_get_mds_session(session);
28373438 __unregister_session(mdsc, session);
28383439 }
28393440 /* FIXME: this ttl calculation is generous */
....@@ -2856,7 +3457,10 @@
28563457 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
28573458 pr_info("mds%d reconnect success\n", session->s_mds);
28583459 session->s_state = CEPH_MDS_SESSION_OPEN;
3460
+ session->s_features = features;
28593461 renewed_caps(mdsc, session, 0);
3462
+ if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3463
+ metric_schedule_delayed(&mdsc->metric);
28603464 wake = 1;
28613465 if (mdsc->stopping)
28623466 __close_session(mdsc, session);
....@@ -2870,6 +3474,7 @@
28703474 case CEPH_SESSION_CLOSE:
28713475 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
28723476 pr_info("mds%d reconnect denied\n", session->s_mds);
3477
+ session->s_state = CEPH_MDS_SESSION_CLOSED;
28733478 cleanup_session_requests(mdsc, session);
28743479 remove_session_caps(session);
28753480 wake = 2; /* for good measure */
....@@ -2891,6 +3496,12 @@
28913496 break;
28923497
28933498 case CEPH_SESSION_FLUSHMSG:
3499
+ /* flush cap releases */
3500
+ spin_lock(&session->s_cap_lock);
3501
+ if (session->s_num_cap_releases)
3502
+ ceph_flush_cap_releases(mdsc, session);
3503
+ spin_unlock(&session->s_cap_lock);
3504
+
28943505 send_flushmsg_ack(mdsc, session, seq);
28953506 break;
28963507
....@@ -2899,7 +3510,7 @@
28993510 spin_lock(&session->s_cap_lock);
29003511 session->s_readonly = true;
29013512 spin_unlock(&session->s_cap_lock);
2902
- wake_up_session_caps(session, 0);
3513
+ wake_up_session_caps(session, FORCE_RO);
29033514 break;
29043515
29053516 case CEPH_SESSION_REJECT:
....@@ -2908,6 +3519,8 @@
29083519 session->s_state = CEPH_MDS_SESSION_REJECTED;
29093520 cleanup_session_requests(mdsc, session);
29103521 remove_session_caps(session);
3522
+ if (blocklisted)
3523
+ mdsc->fsc->blocklisted = true;
29113524 wake = 2; /* for good measure */
29123525 break;
29133526
....@@ -2935,6 +3548,28 @@
29353548 return;
29363549 }
29373550
3551
+void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3552
+{
3553
+ int dcaps;
3554
+
3555
+ dcaps = xchg(&req->r_dir_caps, 0);
3556
+ if (dcaps) {
3557
+ dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3558
+ ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3559
+ }
3560
+}
3561
+
3562
+void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3563
+{
3564
+ int dcaps;
3565
+
3566
+ dcaps = xchg(&req->r_dir_caps, 0);
3567
+ if (dcaps) {
3568
+ dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3569
+ ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3570
+ dcaps);
3571
+ }
3572
+}
29383573
29393574 /*
29403575 * called under session->mutex.
....@@ -2944,18 +3579,12 @@
29443579 {
29453580 struct ceph_mds_request *req, *nreq;
29463581 struct rb_node *p;
2947
- int err;
29483582
29493583 dout("replay_unsafe_requests mds%d\n", session->s_mds);
29503584
29513585 mutex_lock(&mdsc->mutex);
2952
- list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2953
- err = __prepare_send_request(mdsc, req, session->s_mds, true);
2954
- if (!err) {
2955
- ceph_msg_get(req->r_request);
2956
- ceph_con_send(&session->s_con, req->r_request);
2957
- }
2958
- }
3586
+ list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3587
+ __send_request(mdsc, session, req, true);
29593588
29603589 /*
29613590 * also re-send old requests when MDS enters reconnect stage. So that MDS
....@@ -2969,23 +3598,131 @@
29693598 continue;
29703599 if (req->r_attempts == 0)
29713600 continue; /* only old requests */
2972
- if (req->r_session &&
2973
- req->r_session->s_mds == session->s_mds) {
2974
- err = __prepare_send_request(mdsc, req,
2975
- session->s_mds, true);
2976
- if (!err) {
2977
- ceph_msg_get(req->r_request);
2978
- ceph_con_send(&session->s_con, req->r_request);
2979
- }
2980
- }
3601
+ if (!req->r_session)
3602
+ continue;
3603
+ if (req->r_session->s_mds != session->s_mds)
3604
+ continue;
3605
+
3606
+ ceph_mdsc_release_dir_caps_no_check(req);
3607
+
3608
+ __send_request(mdsc, session, req, true);
29813609 }
29823610 mutex_unlock(&mdsc->mutex);
3611
+}
3612
+
3613
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3614
+{
3615
+ struct ceph_msg *reply;
3616
+ struct ceph_pagelist *_pagelist;
3617
+ struct page *page;
3618
+ __le32 *addr;
3619
+ int err = -ENOMEM;
3620
+
3621
+ if (!recon_state->allow_multi)
3622
+ return -ENOSPC;
3623
+
3624
+ /* can't handle message that contains both caps and realm */
3625
+ BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3626
+
3627
+ /* pre-allocate new pagelist */
3628
+ _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3629
+ if (!_pagelist)
3630
+ return -ENOMEM;
3631
+
3632
+ reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3633
+ if (!reply)
3634
+ goto fail_msg;
3635
+
3636
+ /* placeholder for nr_caps */
3637
+ err = ceph_pagelist_encode_32(_pagelist, 0);
3638
+ if (err < 0)
3639
+ goto fail;
3640
+
3641
+ if (recon_state->nr_caps) {
3642
+ /* currently encoding caps */
3643
+ err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3644
+ if (err)
3645
+ goto fail;
3646
+ } else {
3647
+ /* placeholder for nr_realms (currently encoding relams) */
3648
+ err = ceph_pagelist_encode_32(_pagelist, 0);
3649
+ if (err < 0)
3650
+ goto fail;
3651
+ }
3652
+
3653
+ err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3654
+ if (err)
3655
+ goto fail;
3656
+
3657
+ page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3658
+ addr = kmap_atomic(page);
3659
+ if (recon_state->nr_caps) {
3660
+ /* currently encoding caps */
3661
+ *addr = cpu_to_le32(recon_state->nr_caps);
3662
+ } else {
3663
+ /* currently encoding relams */
3664
+ *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3665
+ }
3666
+ kunmap_atomic(addr);
3667
+
3668
+ reply->hdr.version = cpu_to_le16(5);
3669
+ reply->hdr.compat_version = cpu_to_le16(4);
3670
+
3671
+ reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3672
+ ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3673
+
3674
+ ceph_con_send(&recon_state->session->s_con, reply);
3675
+ ceph_pagelist_release(recon_state->pagelist);
3676
+
3677
+ recon_state->pagelist = _pagelist;
3678
+ recon_state->nr_caps = 0;
3679
+ recon_state->nr_realms = 0;
3680
+ recon_state->msg_version = 5;
3681
+ return 0;
3682
+fail:
3683
+ ceph_msg_put(reply);
3684
+fail_msg:
3685
+ ceph_pagelist_release(_pagelist);
3686
+ return err;
3687
+}
3688
+
3689
+static struct dentry* d_find_primary(struct inode *inode)
3690
+{
3691
+ struct dentry *alias, *dn = NULL;
3692
+
3693
+ if (hlist_empty(&inode->i_dentry))
3694
+ return NULL;
3695
+
3696
+ spin_lock(&inode->i_lock);
3697
+ if (hlist_empty(&inode->i_dentry))
3698
+ goto out_unlock;
3699
+
3700
+ if (S_ISDIR(inode->i_mode)) {
3701
+ alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3702
+ if (!IS_ROOT(alias))
3703
+ dn = dget(alias);
3704
+ goto out_unlock;
3705
+ }
3706
+
3707
+ hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3708
+ spin_lock(&alias->d_lock);
3709
+ if (!d_unhashed(alias) &&
3710
+ (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3711
+ dn = dget_dlock(alias);
3712
+ }
3713
+ spin_unlock(&alias->d_lock);
3714
+ if (dn)
3715
+ break;
3716
+ }
3717
+out_unlock:
3718
+ spin_unlock(&inode->i_lock);
3719
+ return dn;
29833720 }
29843721
29853722 /*
29863723 * Encode information about a cap for a reconnect with the MDS.
29873724 */
2988
-static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
3725
+static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
29893726 void *arg)
29903727 {
29913728 union {
....@@ -2995,29 +3732,28 @@
29953732 struct ceph_inode_info *ci = cap->ci;
29963733 struct ceph_reconnect_state *recon_state = arg;
29973734 struct ceph_pagelist *pagelist = recon_state->pagelist;
3735
+ struct dentry *dentry;
29983736 char *path;
2999
- int pathlen, err;
3737
+ int pathlen = 0, err;
30003738 u64 pathbase;
30013739 u64 snap_follows;
3002
- struct dentry *dentry;
30033740
30043741 dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
30053742 inode, ceph_vinop(inode), cap, cap->cap_id,
30063743 ceph_cap_string(cap->issued));
3007
- err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3008
- if (err)
3009
- return err;
30103744
3011
- dentry = d_find_alias(inode);
3745
+ dentry = d_find_primary(inode);
30123746 if (dentry) {
3013
- path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
3747
+ /* set pathbase to parent dir when msg_version >= 2 */
3748
+ path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3749
+ recon_state->msg_version >= 2);
3750
+ dput(dentry);
30143751 if (IS_ERR(path)) {
30153752 err = PTR_ERR(path);
3016
- goto out_dput;
3753
+ goto out_err;
30173754 }
30183755 } else {
30193756 path = NULL;
3020
- pathlen = 0;
30213757 pathbase = 0;
30223758 }
30233759
....@@ -3026,6 +3762,15 @@
30263762 cap->issue_seq = 0; /* and issue_seq */
30273763 cap->mseq = 0; /* and migrate_seq */
30283764 cap->cap_gen = cap->session->s_cap_gen;
3765
+
3766
+ /* These are lost when the session goes away */
3767
+ if (S_ISDIR(inode->i_mode)) {
3768
+ if (cap->issued & CEPH_CAP_DIR_CREATE) {
3769
+ ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3770
+ memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3771
+ }
3772
+ cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3773
+ }
30293774
30303775 if (recon_state->msg_version >= 2) {
30313776 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
....@@ -3059,7 +3804,7 @@
30593804 if (recon_state->msg_version >= 2) {
30603805 int num_fcntl_locks, num_flock_locks;
30613806 struct ceph_filelock *flocks = NULL;
3062
- size_t struct_len, total_len = 0;
3807
+ size_t struct_len, total_len = sizeof(u64);
30633808 u8 struct_v = 0;
30643809
30653810 encode_again:
....@@ -3075,7 +3820,7 @@
30753820 GFP_NOFS);
30763821 if (!flocks) {
30773822 err = -ENOMEM;
3078
- goto out_free;
3823
+ goto out_err;
30793824 }
30803825 err = ceph_encode_locks_to_buffer(inode, flocks,
30813826 num_fcntl_locks,
....@@ -3085,7 +3830,7 @@
30853830 flocks = NULL;
30863831 if (err == -ENOSPC)
30873832 goto encode_again;
3088
- goto out_free;
3833
+ goto out_err;
30893834 }
30903835 } else {
30913836 kfree(flocks);
....@@ -3094,7 +3839,7 @@
30943839
30953840 if (recon_state->msg_version >= 3) {
30963841 /* version, compat_version and struct_len */
3097
- total_len = 2 * sizeof(u8) + sizeof(u32);
3842
+ total_len += 2 * sizeof(u8) + sizeof(u32);
30983843 struct_v = 2;
30993844 }
31003845 /*
....@@ -3105,44 +3850,113 @@
31053850 sizeof(struct ceph_filelock);
31063851 rec.v2.flock_len = cpu_to_le32(struct_len);
31073852
3108
- struct_len += sizeof(rec.v2);
3109
- struct_len += sizeof(u32) + pathlen;
3853
+ struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
31103854
31113855 if (struct_v >= 2)
31123856 struct_len += sizeof(u64); /* snap_follows */
31133857
31143858 total_len += struct_len;
3115
- err = ceph_pagelist_reserve(pagelist, total_len);
31163859
3117
- if (!err) {
3118
- if (recon_state->msg_version >= 3) {
3119
- ceph_pagelist_encode_8(pagelist, struct_v);
3120
- ceph_pagelist_encode_8(pagelist, 1);
3121
- ceph_pagelist_encode_32(pagelist, struct_len);
3122
- }
3123
- ceph_pagelist_encode_string(pagelist, path, pathlen);
3124
- ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3125
- ceph_locks_to_pagelist(flocks, pagelist,
3126
- num_fcntl_locks,
3127
- num_flock_locks);
3128
- if (struct_v >= 2)
3129
- ceph_pagelist_encode_64(pagelist, snap_follows);
3860
+ if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3861
+ err = send_reconnect_partial(recon_state);
3862
+ if (err)
3863
+ goto out_freeflocks;
3864
+ pagelist = recon_state->pagelist;
31303865 }
3866
+
3867
+ err = ceph_pagelist_reserve(pagelist, total_len);
3868
+ if (err)
3869
+ goto out_freeflocks;
3870
+
3871
+ ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3872
+ if (recon_state->msg_version >= 3) {
3873
+ ceph_pagelist_encode_8(pagelist, struct_v);
3874
+ ceph_pagelist_encode_8(pagelist, 1);
3875
+ ceph_pagelist_encode_32(pagelist, struct_len);
3876
+ }
3877
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
3878
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3879
+ ceph_locks_to_pagelist(flocks, pagelist,
3880
+ num_fcntl_locks, num_flock_locks);
3881
+ if (struct_v >= 2)
3882
+ ceph_pagelist_encode_64(pagelist, snap_follows);
3883
+out_freeflocks:
31313884 kfree(flocks);
31323885 } else {
3133
- size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
3134
- err = ceph_pagelist_reserve(pagelist, size);
3135
- if (!err) {
3136
- ceph_pagelist_encode_string(pagelist, path, pathlen);
3137
- ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3138
- }
3886
+ err = ceph_pagelist_reserve(pagelist,
3887
+ sizeof(u64) + sizeof(u32) +
3888
+ pathlen + sizeof(rec.v1));
3889
+ if (err)
3890
+ goto out_err;
3891
+
3892
+ ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3893
+ ceph_pagelist_encode_string(pagelist, path, pathlen);
3894
+ ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
31393895 }
31403896
3141
- recon_state->nr_caps++;
3142
-out_free:
3143
- kfree(path);
3144
-out_dput:
3145
- dput(dentry);
3897
+out_err:
3898
+ ceph_mdsc_free_path(path, pathlen);
3899
+ if (!err)
3900
+ recon_state->nr_caps++;
3901
+ return err;
3902
+}
3903
+
3904
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
3905
+ struct ceph_reconnect_state *recon_state)
3906
+{
3907
+ struct rb_node *p;
3908
+ struct ceph_pagelist *pagelist = recon_state->pagelist;
3909
+ int err = 0;
3910
+
3911
+ if (recon_state->msg_version >= 4) {
3912
+ err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3913
+ if (err < 0)
3914
+ goto fail;
3915
+ }
3916
+
3917
+ /*
3918
+ * snaprealms. we provide mds with the ino, seq (version), and
3919
+ * parent for all of our realms. If the mds has any newer info,
3920
+ * it will tell us.
3921
+ */
3922
+ for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3923
+ struct ceph_snap_realm *realm =
3924
+ rb_entry(p, struct ceph_snap_realm, node);
3925
+ struct ceph_mds_snaprealm_reconnect sr_rec;
3926
+
3927
+ if (recon_state->msg_version >= 4) {
3928
+ size_t need = sizeof(u8) * 2 + sizeof(u32) +
3929
+ sizeof(sr_rec);
3930
+
3931
+ if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3932
+ err = send_reconnect_partial(recon_state);
3933
+ if (err)
3934
+ goto fail;
3935
+ pagelist = recon_state->pagelist;
3936
+ }
3937
+
3938
+ err = ceph_pagelist_reserve(pagelist, need);
3939
+ if (err)
3940
+ goto fail;
3941
+
3942
+ ceph_pagelist_encode_8(pagelist, 1);
3943
+ ceph_pagelist_encode_8(pagelist, 1);
3944
+ ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3945
+ }
3946
+
3947
+ dout(" adding snap realm %llx seq %lld parent %llx\n",
3948
+ realm->ino, realm->seq, realm->parent_ino);
3949
+ sr_rec.ino = cpu_to_le64(realm->ino);
3950
+ sr_rec.seq = cpu_to_le64(realm->seq);
3951
+ sr_rec.parent = cpu_to_le64(realm->parent_ino);
3952
+
3953
+ err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3954
+ if (err)
3955
+ goto fail;
3956
+
3957
+ recon_state->nr_realms++;
3958
+ }
3959
+fail:
31463960 return err;
31473961 }
31483962
....@@ -3156,31 +3970,29 @@
31563970 * recovering MDS might have.
31573971 *
31583972 * This is a relatively heavyweight operation, but it's rare.
3159
- *
3160
- * called with mdsc->mutex held.
31613973 */
31623974 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
31633975 struct ceph_mds_session *session)
31643976 {
31653977 struct ceph_msg *reply;
3166
- struct rb_node *p;
31673978 int mds = session->s_mds;
31683979 int err = -ENOMEM;
3169
- int s_nr_caps;
3170
- struct ceph_pagelist *pagelist;
3171
- struct ceph_reconnect_state recon_state;
3980
+ struct ceph_reconnect_state recon_state = {
3981
+ .session = session,
3982
+ };
31723983 LIST_HEAD(dispose);
31733984
31743985 pr_info("mds%d reconnect start\n", mds);
31753986
3176
- pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
3177
- if (!pagelist)
3987
+ recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3988
+ if (!recon_state.pagelist)
31783989 goto fail_nopagelist;
3179
- ceph_pagelist_init(pagelist);
31803990
3181
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
3991
+ reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
31823992 if (!reply)
31833993 goto fail_nomsg;
3994
+
3995
+ xa_destroy(&session->s_delegated_inos);
31843996
31853997 mutex_lock(&session->s_mutex);
31863998 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
....@@ -3219,65 +4031,90 @@
32194031 /* replay unsafe requests */
32204032 replay_unsafe_requests(mdsc, session);
32214033
4034
+ ceph_early_kick_flushing_caps(mdsc, session);
4035
+
32224036 down_read(&mdsc->snap_rwsem);
32234037
3224
- /* traverse this session's caps */
3225
- s_nr_caps = session->s_nr_caps;
3226
- err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
4038
+ /* placeholder for nr_caps */
4039
+ err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
32274040 if (err)
32284041 goto fail;
32294042
3230
- recon_state.nr_caps = 0;
3231
- recon_state.pagelist = pagelist;
3232
- if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
4043
+ if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
32334044 recon_state.msg_version = 3;
3234
- else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
4045
+ recon_state.allow_multi = true;
4046
+ } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4047
+ recon_state.msg_version = 3;
4048
+ } else {
32354049 recon_state.msg_version = 2;
3236
- else
3237
- recon_state.msg_version = 1;
3238
- err = iterate_session_caps(session, encode_caps_cb, &recon_state);
3239
- if (err < 0)
3240
- goto fail;
4050
+ }
4051
+ /* trsaverse this session's caps */
4052
+ err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
32414053
32424054 spin_lock(&session->s_cap_lock);
32434055 session->s_cap_reconnect = 0;
32444056 spin_unlock(&session->s_cap_lock);
32454057
3246
- /*
3247
- * snaprealms. we provide mds with the ino, seq (version), and
3248
- * parent for all of our realms. If the mds has any newer info,
3249
- * it will tell us.
3250
- */
3251
- for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3252
- struct ceph_snap_realm *realm =
3253
- rb_entry(p, struct ceph_snap_realm, node);
3254
- struct ceph_mds_snaprealm_reconnect sr_rec;
4058
+ if (err < 0)
4059
+ goto fail;
32554060
3256
- dout(" adding snap realm %llx seq %lld parent %llx\n",
3257
- realm->ino, realm->seq, realm->parent_ino);
3258
- sr_rec.ino = cpu_to_le64(realm->ino);
3259
- sr_rec.seq = cpu_to_le64(realm->seq);
3260
- sr_rec.parent = cpu_to_le64(realm->parent_ino);
3261
- err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3262
- if (err)
4061
+ /* check if all realms can be encoded into current message */
4062
+ if (mdsc->num_snap_realms) {
4063
+ size_t total_len =
4064
+ recon_state.pagelist->length +
4065
+ mdsc->num_snap_realms *
4066
+ sizeof(struct ceph_mds_snaprealm_reconnect);
4067
+ if (recon_state.msg_version >= 4) {
4068
+ /* number of realms */
4069
+ total_len += sizeof(u32);
4070
+ /* version, compat_version and struct_len */
4071
+ total_len += mdsc->num_snap_realms *
4072
+ (2 * sizeof(u8) + sizeof(u32));
4073
+ }
4074
+ if (total_len > RECONNECT_MAX_SIZE) {
4075
+ if (!recon_state.allow_multi) {
4076
+ err = -ENOSPC;
4077
+ goto fail;
4078
+ }
4079
+ if (recon_state.nr_caps) {
4080
+ err = send_reconnect_partial(&recon_state);
4081
+ if (err)
4082
+ goto fail;
4083
+ }
4084
+ recon_state.msg_version = 5;
4085
+ }
4086
+ }
4087
+
4088
+ err = encode_snap_realms(mdsc, &recon_state);
4089
+ if (err < 0)
4090
+ goto fail;
4091
+
4092
+ if (recon_state.msg_version >= 5) {
4093
+ err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4094
+ if (err < 0)
32634095 goto fail;
32644096 }
32654097
3266
- reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3267
-
3268
- /* raced with cap release? */
3269
- if (s_nr_caps != recon_state.nr_caps) {
3270
- struct page *page = list_first_entry(&pagelist->head,
3271
- struct page, lru);
4098
+ if (recon_state.nr_caps || recon_state.nr_realms) {
4099
+ struct page *page =
4100
+ list_first_entry(&recon_state.pagelist->head,
4101
+ struct page, lru);
32724102 __le32 *addr = kmap_atomic(page);
3273
- *addr = cpu_to_le32(recon_state.nr_caps);
4103
+ if (recon_state.nr_caps) {
4104
+ WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4105
+ *addr = cpu_to_le32(recon_state.nr_caps);
4106
+ } else if (recon_state.msg_version >= 4) {
4107
+ *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4108
+ }
32744109 kunmap_atomic(addr);
32754110 }
32764111
3277
- reply->hdr.data_len = cpu_to_le32(pagelist->length);
3278
- ceph_msg_data_add_pagelist(reply, pagelist);
4112
+ reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4113
+ if (recon_state.msg_version >= 4)
4114
+ reply->hdr.compat_version = cpu_to_le16(4);
32794115
3280
- ceph_early_kick_flushing_caps(mdsc, session);
4116
+ reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4117
+ ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
32814118
32824119 ceph_con_send(&session->s_con, reply);
32834120
....@@ -3288,6 +4125,7 @@
32884125 mutex_unlock(&mdsc->mutex);
32894126
32904127 up_read(&mdsc->snap_rwsem);
4128
+ ceph_pagelist_release(recon_state.pagelist);
32914129 return;
32924130
32934131 fail:
....@@ -3295,7 +4133,7 @@
32954133 up_read(&mdsc->snap_rwsem);
32964134 mutex_unlock(&session->s_mutex);
32974135 fail_nomsg:
3298
- ceph_pagelist_release(pagelist);
4136
+ ceph_pagelist_release(recon_state.pagelist);
32994137 fail_nopagelist:
33004138 pr_err("error %d preparing reconnect for mds%d\n", err, mds);
33014139 return;
....@@ -3319,7 +4157,7 @@
33194157 dout("check_new_map new %u old %u\n",
33204158 newmap->m_epoch, oldmap->m_epoch);
33214159
3322
- for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
4160
+ for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
33234161 if (!mdsc->sessions[i])
33244162 continue;
33254163 s = mdsc->sessions[i];
....@@ -3333,42 +4171,35 @@
33334171 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
33344172 ceph_session_state_name(s->s_state));
33354173
3336
- if (i >= newmap->m_num_mds ||
3337
- memcmp(ceph_mdsmap_get_addr(oldmap, i),
4174
+ if (i >= newmap->possible_max_rank) {
4175
+ /* force close session for stopped mds */
4176
+ ceph_get_mds_session(s);
4177
+ __unregister_session(mdsc, s);
4178
+ __wake_requests(mdsc, &s->s_waiting);
4179
+ mutex_unlock(&mdsc->mutex);
4180
+
4181
+ mutex_lock(&s->s_mutex);
4182
+ cleanup_session_requests(mdsc, s);
4183
+ remove_session_caps(s);
4184
+ mutex_unlock(&s->s_mutex);
4185
+
4186
+ ceph_put_mds_session(s);
4187
+
4188
+ mutex_lock(&mdsc->mutex);
4189
+ kick_requests(mdsc, i);
4190
+ continue;
4191
+ }
4192
+
4193
+ if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
33384194 ceph_mdsmap_get_addr(newmap, i),
33394195 sizeof(struct ceph_entity_addr))) {
3340
- if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3341
- /* the session never opened, just close it
3342
- * out now */
3343
- get_session(s);
3344
- __unregister_session(mdsc, s);
3345
- __wake_requests(mdsc, &s->s_waiting);
3346
- ceph_put_mds_session(s);
3347
- } else if (i >= newmap->m_num_mds) {
3348
- /* force close session for stopped mds */
3349
- get_session(s);
3350
- __unregister_session(mdsc, s);
3351
- __wake_requests(mdsc, &s->s_waiting);
3352
- kick_requests(mdsc, i);
3353
- mutex_unlock(&mdsc->mutex);
3354
-
3355
- mutex_lock(&s->s_mutex);
3356
- cleanup_session_requests(mdsc, s);
3357
- remove_session_caps(s);
3358
- mutex_unlock(&s->s_mutex);
3359
-
3360
- ceph_put_mds_session(s);
3361
-
3362
- mutex_lock(&mdsc->mutex);
3363
- } else {
3364
- /* just close it */
3365
- mutex_unlock(&mdsc->mutex);
3366
- mutex_lock(&s->s_mutex);
3367
- mutex_lock(&mdsc->mutex);
3368
- ceph_con_close(&s->s_con);
3369
- mutex_unlock(&s->s_mutex);
3370
- s->s_state = CEPH_MDS_SESSION_RESTARTING;
3371
- }
4196
+ /* just close it */
4197
+ mutex_unlock(&mdsc->mutex);
4198
+ mutex_lock(&s->s_mutex);
4199
+ mutex_lock(&mdsc->mutex);
4200
+ ceph_con_close(&s->s_con);
4201
+ mutex_unlock(&s->s_mutex);
4202
+ s->s_state = CEPH_MDS_SESSION_RESTARTING;
33724203 } else if (oldstate == newstate) {
33734204 continue; /* nothing new with this mds */
33744205 }
....@@ -3392,12 +4223,16 @@
33924223 oldstate != CEPH_MDS_STATE_STARTING)
33934224 pr_info("mds%d recovery completed\n", s->s_mds);
33944225 kick_requests(mdsc, i);
4226
+ mutex_unlock(&mdsc->mutex);
4227
+ mutex_lock(&s->s_mutex);
4228
+ mutex_lock(&mdsc->mutex);
33954229 ceph_kick_flushing_caps(mdsc, s);
3396
- wake_up_session_caps(s, 1);
4230
+ mutex_unlock(&s->s_mutex);
4231
+ wake_up_session_caps(s, RECONNECT);
33974232 }
33984233 }
33994234
3400
- for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
4235
+ for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
34014236 s = mdsc->sessions[i];
34024237 if (!s)
34034238 continue;
....@@ -3465,7 +4300,7 @@
34654300 dname.len, dname.name);
34664301
34674302 mutex_lock(&session->s_mutex);
3468
- session->s_seq++;
4303
+ inc_session_sequence(session);
34694304
34704305 if (!inode) {
34714306 dout("handle_lease no inode %llx\n", vino.ino);
....@@ -3526,8 +4361,9 @@
35264361 ceph_con_send(&session->s_con, msg);
35274362
35284363 out:
3529
- iput(inode);
35304364 mutex_unlock(&session->s_mutex);
4365
+ /* avoid calling iput_final() in mds dispatch threads */
4366
+ ceph_async_iput(inode);
35314367 return;
35324368
35334369 bad:
....@@ -3536,31 +4372,33 @@
35364372 }
35374373
35384374 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3539
- struct inode *inode,
35404375 struct dentry *dentry, char action,
35414376 u32 seq)
35424377 {
35434378 struct ceph_msg *msg;
35444379 struct ceph_mds_lease *lease;
3545
- int len = sizeof(*lease) + sizeof(u32);
3546
- int dnamelen = 0;
4380
+ struct inode *dir;
4381
+ int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
35474382
3548
- dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3549
- inode, dentry, ceph_lease_op_name(action), session->s_mds);
3550
- dnamelen = dentry->d_name.len;
3551
- len += dnamelen;
4383
+ dout("lease_send_msg identry %p %s to mds%d\n",
4384
+ dentry, ceph_lease_op_name(action), session->s_mds);
35524385
35534386 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
35544387 if (!msg)
35554388 return;
35564389 lease = msg->front.iov_base;
35574390 lease->action = action;
3558
- lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3559
- lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
35604391 lease->seq = cpu_to_le32(seq);
3561
- put_unaligned_le32(dnamelen, lease + 1);
3562
- memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
35634392
4393
+ spin_lock(&dentry->d_lock);
4394
+ dir = d_inode(dentry->d_parent);
4395
+ lease->ino = cpu_to_le64(ceph_ino(dir));
4396
+ lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4397
+
4398
+ put_unaligned_le32(dentry->d_name.len, lease + 1);
4399
+ memcpy((void *)(lease + 1) + 4,
4400
+ dentry->d_name.name, dentry->d_name.len);
4401
+ spin_unlock(&dentry->d_lock);
35644402 /*
35654403 * if this is a preemptive lease RELEASE, no need to
35664404 * flush request stream, since the actual request will
....@@ -3572,50 +4410,108 @@
35724410 }
35734411
35744412 /*
3575
- * lock unlock sessions, to wait ongoing session activities
4413
+ * lock unlock the session, to wait ongoing session activities
35764414 */
3577
-static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4415
+static void lock_unlock_session(struct ceph_mds_session *s)
35784416 {
3579
- int i;
3580
-
3581
- mutex_lock(&mdsc->mutex);
3582
- for (i = 0; i < mdsc->max_sessions; i++) {
3583
- struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3584
- if (!s)
3585
- continue;
3586
- mutex_unlock(&mdsc->mutex);
3587
- mutex_lock(&s->s_mutex);
3588
- mutex_unlock(&s->s_mutex);
3589
- ceph_put_mds_session(s);
3590
- mutex_lock(&mdsc->mutex);
3591
- }
3592
- mutex_unlock(&mdsc->mutex);
4417
+ mutex_lock(&s->s_mutex);
4418
+ mutex_unlock(&s->s_mutex);
35934419 }
35944420
4421
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
4422
+{
4423
+ struct ceph_fs_client *fsc = mdsc->fsc;
35954424
4425
+ if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4426
+ return;
4427
+
4428
+ if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4429
+ return;
4430
+
4431
+ if (!READ_ONCE(fsc->blocklisted))
4432
+ return;
4433
+
4434
+ if (fsc->last_auto_reconnect &&
4435
+ time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4436
+ return;
4437
+
4438
+ pr_info("auto reconnect after blocklisted\n");
4439
+ fsc->last_auto_reconnect = jiffies;
4440
+ ceph_force_reconnect(fsc->sb);
4441
+}
4442
+
4443
+bool check_session_state(struct ceph_mds_session *s)
4444
+{
4445
+ switch (s->s_state) {
4446
+ case CEPH_MDS_SESSION_OPEN:
4447
+ if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4448
+ s->s_state = CEPH_MDS_SESSION_HUNG;
4449
+ pr_info("mds%d hung\n", s->s_mds);
4450
+ }
4451
+ break;
4452
+ case CEPH_MDS_SESSION_CLOSING:
4453
+ /* Should never reach this when we're unmounting */
4454
+ WARN_ON_ONCE(s->s_ttl);
4455
+ fallthrough;
4456
+ case CEPH_MDS_SESSION_NEW:
4457
+ case CEPH_MDS_SESSION_RESTARTING:
4458
+ case CEPH_MDS_SESSION_CLOSED:
4459
+ case CEPH_MDS_SESSION_REJECTED:
4460
+ return false;
4461
+ }
4462
+
4463
+ return true;
4464
+}
35964465
35974466 /*
3598
- * delayed work -- periodically trim expired leases, renew caps with mds
4467
+ * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4468
+ * then we need to retransmit that request.
35994469 */
3600
-static void schedule_delayed(struct ceph_mds_client *mdsc)
4470
+void inc_session_sequence(struct ceph_mds_session *s)
36014471 {
3602
- int delay = 5;
3603
- unsigned hz = round_jiffies_relative(HZ * delay);
3604
- schedule_delayed_work(&mdsc->delayed_work, hz);
4472
+ lockdep_assert_held(&s->s_mutex);
4473
+
4474
+ s->s_seq++;
4475
+
4476
+ if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4477
+ int ret;
4478
+
4479
+ dout("resending session close request for mds%d\n", s->s_mds);
4480
+ ret = request_close_session(s);
4481
+ if (ret < 0)
4482
+ pr_err("unable to close session to mds%d: %d\n",
4483
+ s->s_mds, ret);
4484
+ }
4485
+}
4486
+
4487
+/*
4488
+ * delayed work -- periodically trim expired leases, renew caps with mds. If
4489
+ * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4490
+ * workqueue delay value of 5 secs will be used.
4491
+ */
4492
+static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4493
+{
4494
+ unsigned long max_delay = HZ * 5;
4495
+
4496
+ /* 5 secs default delay */
4497
+ if (!delay || (delay > max_delay))
4498
+ delay = max_delay;
4499
+ schedule_delayed_work(&mdsc->delayed_work,
4500
+ round_jiffies_relative(delay));
36054501 }
36064502
36074503 static void delayed_work(struct work_struct *work)
36084504 {
3609
- int i;
36104505 struct ceph_mds_client *mdsc =
36114506 container_of(work, struct ceph_mds_client, delayed_work.work);
4507
+ unsigned long delay;
36124508 int renew_interval;
36134509 int renew_caps;
4510
+ int i;
36144511
36154512 dout("mdsc delayed_work\n");
3616
- ceph_check_delayed_caps(mdsc);
36174513
3618
- if (mdsc->stopping)
4514
+ if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
36194515 return;
36204516
36214517 mutex_lock(&mdsc->mutex);
....@@ -3629,23 +4525,8 @@
36294525 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
36304526 if (!s)
36314527 continue;
3632
- if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3633
- dout("resending session close request for mds%d\n",
3634
- s->s_mds);
3635
- request_close_session(mdsc, s);
3636
- ceph_put_mds_session(s);
3637
- continue;
3638
- }
3639
- if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3640
- if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3641
- s->s_state = CEPH_MDS_SESSION_HUNG;
3642
- pr_info("mds%d hung\n", s->s_mds);
3643
- }
3644
- }
3645
- if (s->s_state == CEPH_MDS_SESSION_NEW ||
3646
- s->s_state == CEPH_MDS_SESSION_RESTARTING ||
3647
- s->s_state == CEPH_MDS_SESSION_REJECTED) {
3648
- /* this mds is failed or recovering, just wait */
4528
+
4529
+ if (!check_session_state(s)) {
36494530 ceph_put_mds_session(s);
36504531 continue;
36514532 }
....@@ -3666,13 +4547,22 @@
36664547 }
36674548 mutex_unlock(&mdsc->mutex);
36684549
3669
- schedule_delayed(mdsc);
4550
+ delay = ceph_check_delayed_caps(mdsc);
4551
+
4552
+ ceph_queue_cap_reclaim_work(mdsc);
4553
+
4554
+ ceph_trim_snapid_map(mdsc);
4555
+
4556
+ maybe_recover_session(mdsc);
4557
+
4558
+ schedule_delayed(mdsc, delay);
36704559 }
36714560
36724561 int ceph_mdsc_init(struct ceph_fs_client *fsc)
36734562
36744563 {
36754564 struct ceph_mds_client *mdsc;
4565
+ int err;
36764566
36774567 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
36784568 if (!mdsc)
....@@ -3681,8 +4571,8 @@
36814571 mutex_init(&mdsc->mutex);
36824572 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
36834573 if (!mdsc->mdsmap) {
3684
- kfree(mdsc);
3685
- return -ENOMEM;
4574
+ err = -ENOMEM;
4575
+ goto err_mdsc;
36864576 }
36874577
36884578 init_completion(&mdsc->safe_umount_waiters);
....@@ -3693,10 +4583,13 @@
36934583 mdsc->max_sessions = 0;
36944584 mdsc->stopping = 0;
36954585 atomic64_set(&mdsc->quotarealms_count, 0);
4586
+ mdsc->quotarealms_inodes = RB_ROOT;
4587
+ mutex_init(&mdsc->quotarealms_inodes_mutex);
36964588 mdsc->last_snap_seq = 0;
36974589 init_rwsem(&mdsc->snap_rwsem);
36984590 mdsc->snap_realms = RB_ROOT;
36994591 INIT_LIST_HEAD(&mdsc->snap_empty);
4592
+ mdsc->num_snap_realms = 0;
37004593 spin_lock_init(&mdsc->snap_empty_lock);
37014594 mdsc->last_tid = 0;
37024595 mdsc->oldest_tid = 0;
....@@ -3704,21 +4597,32 @@
37044597 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
37054598 mdsc->last_renew_caps = jiffies;
37064599 INIT_LIST_HEAD(&mdsc->cap_delay_list);
4600
+ INIT_LIST_HEAD(&mdsc->cap_wait_list);
37074601 spin_lock_init(&mdsc->cap_delay_lock);
37084602 INIT_LIST_HEAD(&mdsc->snap_flush_list);
37094603 spin_lock_init(&mdsc->snap_flush_lock);
37104604 mdsc->last_cap_flush_tid = 1;
37114605 INIT_LIST_HEAD(&mdsc->cap_flush_list);
3712
- INIT_LIST_HEAD(&mdsc->cap_dirty);
37134606 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
37144607 mdsc->num_cap_flushing = 0;
37154608 spin_lock_init(&mdsc->cap_dirty_lock);
37164609 init_waitqueue_head(&mdsc->cap_flushing_wq);
3717
- spin_lock_init(&mdsc->dentry_lru_lock);
3718
- INIT_LIST_HEAD(&mdsc->dentry_lru);
4610
+ INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4611
+ atomic_set(&mdsc->cap_reclaim_pending, 0);
4612
+ err = ceph_metric_init(&mdsc->metric);
4613
+ if (err)
4614
+ goto err_mdsmap;
4615
+
4616
+ spin_lock_init(&mdsc->dentry_list_lock);
4617
+ INIT_LIST_HEAD(&mdsc->dentry_leases);
4618
+ INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
37194619
37204620 ceph_caps_init(mdsc);
3721
- ceph_adjust_min_caps(mdsc, fsc->min_caps);
4621
+ ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4622
+
4623
+ spin_lock_init(&mdsc->snapid_map_lock);
4624
+ mdsc->snapid_map_tree = RB_ROOT;
4625
+ INIT_LIST_HEAD(&mdsc->snapid_map_lru);
37224626
37234627 init_rwsem(&mdsc->pool_perm_rwsem);
37244628 mdsc->pool_perm_tree = RB_ROOT;
....@@ -3728,6 +4632,12 @@
37284632
37294633 fsc->mdsc = mdsc;
37304634 return 0;
4635
+
4636
+err_mdsmap:
4637
+ kfree(mdsc->mdsmap);
4638
+err_mdsc:
4639
+ kfree(mdsc);
4640
+ return err;
37314641 }
37324642
37334643 /*
....@@ -3752,11 +4662,36 @@
37524662 while ((req = __get_oldest_req(mdsc))) {
37534663 dout("wait_requests timed out on tid %llu\n",
37544664 req->r_tid);
4665
+ list_del_init(&req->r_wait);
37554666 __unregister_request(mdsc, req);
37564667 }
37574668 }
37584669 mutex_unlock(&mdsc->mutex);
37594670 dout("wait_requests done\n");
4671
+}
4672
+
4673
+void send_flush_mdlog(struct ceph_mds_session *s)
4674
+{
4675
+ struct ceph_msg *msg;
4676
+
4677
+ /*
4678
+ * Pre-luminous MDS crashes when it sees an unknown session request
4679
+ */
4680
+ if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4681
+ return;
4682
+
4683
+ mutex_lock(&s->s_mutex);
4684
+ dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4685
+ ceph_session_state_name(s->s_state), s->s_seq);
4686
+ msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4687
+ s->s_seq);
4688
+ if (!msg) {
4689
+ pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4690
+ s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4691
+ } else {
4692
+ ceph_con_send(&s->s_con, msg);
4693
+ }
4694
+ mutex_unlock(&s->s_mutex);
37604695 }
37614696
37624697 /*
....@@ -3766,9 +4701,10 @@
37664701 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
37674702 {
37684703 dout("pre_umount\n");
3769
- mdsc->stopping = 1;
4704
+ mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
37704705
3771
- lock_unlock_sessions(mdsc);
4706
+ ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4707
+ ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
37724708 ceph_flush_dirty_caps(mdsc);
37734709 wait_requests(mdsc);
37744710
....@@ -3777,6 +4713,8 @@
37774713 * their inode/dcache refs
37784714 */
37794715 ceph_msgr_flush();
4716
+
4717
+ ceph_cleanup_quotarealms_inodes(mdsc);
37804718 }
37814719
37824720 /*
....@@ -3902,7 +4840,7 @@
39024840 mutex_lock(&mdsc->mutex);
39034841 for (i = 0; i < mdsc->max_sessions; i++) {
39044842 if (mdsc->sessions[i]) {
3905
- session = get_session(mdsc->sessions[i]);
4843
+ session = ceph_get_mds_session(mdsc->sessions[i]);
39064844 __unregister_session(mdsc, session);
39074845 mutex_unlock(&mdsc->mutex);
39084846 mutex_lock(&session->s_mutex);
....@@ -3915,8 +4853,10 @@
39154853 WARN_ON(!list_empty(&mdsc->cap_delay_list));
39164854 mutex_unlock(&mdsc->mutex);
39174855
4856
+ ceph_cleanup_snapid_map(mdsc);
39184857 ceph_cleanup_empty_realms(mdsc);
39194858
4859
+ cancel_work_sync(&mdsc->cap_reclaim_work);
39204860 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
39214861
39224862 dout("stopped\n");
....@@ -3934,7 +4874,12 @@
39344874 session = __ceph_lookup_mds_session(mdsc, mds);
39354875 if (!session)
39364876 continue;
4877
+
4878
+ if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4879
+ __unregister_session(mdsc, session);
4880
+ __wake_requests(mdsc, &session->s_waiting);
39374881 mutex_unlock(&mdsc->mutex);
4882
+
39384883 mutex_lock(&session->s_mutex);
39394884 __close_session(mdsc, session);
39404885 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
....@@ -3943,6 +4888,7 @@
39434888 }
39444889 mutex_unlock(&session->s_mutex);
39454890 ceph_put_mds_session(session);
4891
+
39464892 mutex_lock(&mdsc->mutex);
39474893 kick_requests(mdsc, mds);
39484894 }
....@@ -3982,6 +4928,8 @@
39824928 ceph_msgr_flush();
39834929
39844930 ceph_mdsc_stop(mdsc);
4931
+
4932
+ ceph_metric_destroy(&mdsc->metric);
39854933
39864934 fsc->mdsc = NULL;
39874935 kfree(mdsc);
....@@ -4117,7 +5065,7 @@
41175065 mdsc->mdsmap->m_epoch);
41185066
41195067 mutex_unlock(&mdsc->mutex);
4120
- schedule_delayed(mdsc);
5068
+ schedule_delayed(mdsc, 0);
41215069 return;
41225070
41235071 bad_unlock:
....@@ -4131,11 +5079,8 @@
41315079 {
41325080 struct ceph_mds_session *s = con->private;
41335081
4134
- if (get_session(s)) {
4135
- dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
5082
+ if (ceph_get_mds_session(s))
41365083 return con;
4137
- }
4138
- dout("mdsc con_get %p FAIL\n", s);
41395084 return NULL;
41405085 }
41415086
....@@ -4143,7 +5088,6 @@
41435088 {
41445089 struct ceph_mds_session *s = con->private;
41455090
4146
- dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
41475091 ceph_put_mds_session(s);
41485092 }
41495093