hc
2024-05-16 8d2a02b24d66aa359e83eebc1ed3c0f85367a1cb
kernel/fs/ceph/addr.c
....@@ -10,10 +10,13 @@
1010 #include <linux/pagevec.h>
1111 #include <linux/task_io_accounting_ops.h>
1212 #include <linux/signal.h>
13
+#include <linux/iversion.h>
14
+#include <linux/ktime.h>
1315
1416 #include "super.h"
1517 #include "mds_client.h"
1618 #include "cache.h"
19
+#include "metric.h"
1720 #include <linux/ceph/osd_client.h>
1821 #include <linux/ceph/striper.h>
1922
....@@ -150,8 +153,6 @@
150153 if (!PagePrivate(page))
151154 return;
152155
153
- ClearPageChecked(page);
154
-
155156 dout("%p invalidatepage %p idx %lu full dirty page\n",
156157 inode, page, page->index);
157158
....@@ -173,15 +174,15 @@
173174 return !PagePrivate(page);
174175 }
175176
176
-/*
177
- * read a single page, without unlocking it.
178
- */
177
+/* read a single page, without unlocking it. */
179178 static int ceph_do_readpage(struct file *filp, struct page *page)
180179 {
181180 struct inode *inode = file_inode(filp);
182181 struct ceph_inode_info *ci = ceph_inode(inode);
183
- struct ceph_osd_client *osdc =
184
- &ceph_inode_to_client(inode)->client->osdc;
182
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
183
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
184
+ struct ceph_osd_request *req;
185
+ struct ceph_vino vino = ceph_vino(inode);
185186 int err = 0;
186187 u64 off = page_offset(page);
187188 u64 len = PAGE_SIZE;
....@@ -208,17 +209,33 @@
208209 if (err == 0)
209210 return -EINPROGRESS;
210211
211
- dout("readpage inode %p file %p page %p index %lu\n",
212
- inode, filp, page, page->index);
213
- err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
214
- off, &len,
215
- ci->i_truncate_seq, ci->i_truncate_size,
216
- &page, 1, 0);
212
+ dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
213
+ vino.ino, vino.snap, filp, off, len, page, page->index);
214
+ req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
215
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
216
+ ci->i_truncate_seq, ci->i_truncate_size,
217
+ false);
218
+ if (IS_ERR(req))
219
+ return PTR_ERR(req);
220
+
221
+ osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
222
+
223
+ err = ceph_osdc_start_request(osdc, req, false);
224
+ if (!err)
225
+ err = ceph_osdc_wait_request(osdc, req);
226
+
227
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
228
+ req->r_end_latency, err);
229
+
230
+ ceph_osdc_put_request(req);
231
+ dout("readpage result %d\n", err);
232
+
217233 if (err == -ENOENT)
218234 err = 0;
219235 if (err < 0) {
220
- SetPageError(page);
221236 ceph_fscache_readpage_cancel(inode, page);
237
+ if (err == -EBLOCKLISTED)
238
+ fsc->blocklisted = true;
222239 goto out;
223240 }
224241 if (err < PAGE_SIZE)
....@@ -250,6 +267,7 @@
250267 static void finish_read(struct ceph_osd_request *req)
251268 {
252269 struct inode *inode = req->r_inode;
270
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
253271 struct ceph_osd_data *osd_data;
254272 int rc = req->r_result <= 0 ? req->r_result : 0;
255273 int bytes = req->r_result >= 0 ? req->r_result : 0;
....@@ -257,6 +275,8 @@
257275 int i;
258276
259277 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
278
+ if (rc == -EBLOCKLISTED)
279
+ ceph_inode_to_client(inode)->blocklisted = true;
260280
261281 /* unlock all pages, zeroing any data we didn't read */
262282 osd_data = osd_req_op_extent_osd_data(req, 0);
....@@ -285,6 +305,10 @@
285305 put_page(page);
286306 bytes -= PAGE_SIZE;
287307 }
308
+
309
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
310
+ req->r_end_latency, rc);
311
+
288312 kfree(osd_data->pages);
289313 }
290314
....@@ -298,7 +322,7 @@
298322 struct ceph_osd_client *osdc =
299323 &ceph_inode_to_client(inode)->client->osdc;
300324 struct ceph_inode_info *ci = ceph_inode(inode);
301
- struct page *page = list_entry(page_list->prev, struct page, lru);
325
+ struct page *page = lru_to_page(page_list);
302326 struct ceph_vino vino;
303327 struct ceph_osd_request *req;
304328 u64 off;
....@@ -314,7 +338,8 @@
314338 /* caller of readpages does not hold buffer and read caps
315339 * (fadvise, madvise and readahead cases) */
316340 int want = CEPH_CAP_FILE_CACHE;
317
- ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
341
+ ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want,
342
+ true, &got);
318343 if (ret < 0) {
319344 dout("start_read %p, error getting cap\n", inode);
320345 } else if (!(got & want)) {
....@@ -325,8 +350,7 @@
325350 if (got)
326351 ceph_put_cap_refs(ci, got);
327352 while (!list_empty(page_list)) {
328
- page = list_entry(page_list->prev,
329
- struct page, lru);
353
+ page = lru_to_page(page_list);
330354 list_del(&page->lru);
331355 put_page(page);
332356 }
....@@ -561,24 +585,23 @@
561585 /*
562586 * Write a single page, but leave the page locked.
563587 *
564
- * If we get a write error, set the page error bit, but still adjust the
588
+ * If we get a write error, mark the mapping for error, but still adjust the
565589 * dirty page accounting (i.e., page is no longer dirty).
566590 */
567591 static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
568592 {
569
- struct inode *inode;
570
- struct ceph_inode_info *ci;
571
- struct ceph_fs_client *fsc;
593
+ struct inode *inode = page->mapping->host;
594
+ struct ceph_inode_info *ci = ceph_inode(inode);
595
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
572596 struct ceph_snap_context *snapc, *oldest;
573597 loff_t page_off = page_offset(page);
574
- int err, len = PAGE_SIZE;
598
+ int err;
599
+ loff_t len = PAGE_SIZE;
575600 struct ceph_writeback_ctl ceph_wbc;
601
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
602
+ struct ceph_osd_request *req;
576603
577604 dout("writepage %p idx %lu\n", page, page->index);
578
-
579
- inode = page->mapping->host;
580
- ci = ceph_inode(inode);
581
- fsc = ceph_inode_to_client(inode);
582605
583606 /* verify this is a writeable snap context */
584607 snapc = page_snap_context(page);
....@@ -608,7 +631,7 @@
608631 if (ceph_wbc.i_size < page_off + len)
609632 len = ceph_wbc.i_size - page_off;
610633
611
- dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
634
+ dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
612635 inode, page, page->index, page_off, len, snapc, snapc->seq);
613636
614637 if (atomic_long_inc_return(&fsc->writeback_count) >
....@@ -616,11 +639,33 @@
616639 set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
617640
618641 set_page_writeback(page);
619
- err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
620
- &ci->i_layout, snapc, page_off, len,
621
- ceph_wbc.truncate_seq,
622
- ceph_wbc.truncate_size,
623
- &inode->i_mtime, &page, 1);
642
+ req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
643
+ CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
644
+ ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
645
+ true);
646
+ if (IS_ERR(req)) {
647
+ redirty_page_for_writepage(wbc, page);
648
+ end_page_writeback(page);
649
+ return PTR_ERR(req);
650
+ }
651
+
652
+ /* it may be a short write due to an object boundary */
653
+ WARN_ON_ONCE(len > PAGE_SIZE);
654
+ osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
655
+ dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
656
+
657
+ req->r_mtime = inode->i_mtime;
658
+ err = ceph_osdc_start_request(osdc, req, true);
659
+ if (!err)
660
+ err = ceph_osdc_wait_request(osdc, req);
661
+
662
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
663
+ req->r_end_latency, err);
664
+
665
+ ceph_osdc_put_request(req);
666
+ if (err == 0)
667
+ err = len;
668
+
624669 if (err < 0) {
625670 struct writeback_control tmp_wbc;
626671 if (!wbc)
....@@ -632,9 +677,10 @@
632677 end_page_writeback(page);
633678 return err;
634679 }
680
+ if (err == -EBLOCKLISTED)
681
+ fsc->blocklisted = true;
635682 dout("writepage setting page/mapping error %d %p\n",
636683 err, page);
637
- SetPageError(page);
638684 mapping_set_error(&inode->i_data, err);
639685 wbc->pages_skipped++;
640686 } else {
....@@ -672,23 +718,6 @@
672718 }
673719
674720 /*
675
- * lame release_pages helper. release_pages() isn't exported to
676
- * modules.
677
- */
678
-static void ceph_release_pages(struct page **pages, int num)
679
-{
680
- struct pagevec pvec;
681
- int i;
682
-
683
- pagevec_init(&pvec);
684
- for (i = 0; i < num; i++) {
685
- if (pagevec_add(&pvec, pages[i]) == 0)
686
- pagevec_release(&pvec);
687
- }
688
- pagevec_release(&pvec);
689
-}
690
-
691
-/*
692721 * async writeback completion handler.
693722 *
694723 * If we get an error, set the mapping error bit, but not the individual
....@@ -712,9 +741,14 @@
712741 if (rc < 0) {
713742 mapping_set_error(mapping, rc);
714743 ceph_set_error_write(ci);
744
+ if (rc == -EBLOCKLISTED)
745
+ fsc->blocklisted = true;
715746 } else {
716747 ceph_clear_error_write(ci);
717748 }
749
+
750
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
751
+ req->r_end_latency, rc);
718752
719753 /*
720754 * We lost the cache cap, need to truncate the page before
....@@ -761,15 +795,14 @@
761795 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
762796 inode, osd_data->length, rc >= 0 ? num_pages : 0);
763797
764
- ceph_release_pages(osd_data->pages, num_pages);
798
+ release_pages(osd_data->pages, num_pages);
765799 }
766800
767801 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
768802
769803 osd_data = osd_req_op_extent_osd_data(req, 0);
770804 if (osd_data->pages_from_pool)
771
- mempool_free(osd_data->pages,
772
- ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
805
+ mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
773806 else
774807 kfree(osd_data->pages);
775808 ceph_osdc_put_request(req);
....@@ -861,17 +894,16 @@
861894 int num_ops = 0, op_idx;
862895 unsigned i, pvec_pages, max_pages, locked_pages = 0;
863896 struct page **pages = NULL, **data_pages;
864
- mempool_t *pool = NULL; /* Becomes non-null if mempool used */
865897 struct page *page;
866898 pgoff_t strip_unit_end = 0;
867899 u64 offset = 0, len = 0;
900
+ bool from_pool = false;
868901
869902 max_pages = wsize >> PAGE_SHIFT;
870903
871904 get_more_pages:
872
- pvec_pages = pagevec_lookup_range_nr_tag(&pvec, mapping, &index,
873
- end, PAGECACHE_TAG_DIRTY,
874
- max_pages - locked_pages);
905
+ pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
906
+ end, PAGECACHE_TAG_DIRTY);
875907 dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
876908 if (!pvec_pages && !locked_pages)
877909 break;
....@@ -963,16 +995,16 @@
963995 sizeof(*pages),
964996 GFP_NOFS);
965997 if (!pages) {
966
- pool = fsc->wb_pagevec_pool;
967
- pages = mempool_alloc(pool, GFP_NOFS);
998
+ from_pool = true;
999
+ pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
9681000 BUG_ON(!pages);
9691001 }
9701002
9711003 len = 0;
9721004 } else if (page->index !=
9731005 (offset + len) >> PAGE_SHIFT) {
974
- if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
975
- CEPH_OSD_MAX_OPS)) {
1006
+ if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
1007
+ CEPH_OSD_MAX_OPS)) {
9761008 redirty_page_for_writepage(wbc, page);
9771009 unlock_page(page);
9781010 break;
....@@ -1067,7 +1099,7 @@
10671099 offset, len);
10681100 osd_req_op_extent_osd_data_pages(req, op_idx,
10691101 data_pages, len, 0,
1070
- !!pool, false);
1102
+ from_pool, false);
10711103 osd_req_op_extent_update(req, op_idx, len);
10721104
10731105 len = 0;
....@@ -1094,12 +1126,12 @@
10941126 dout("writepages got pages at %llu~%llu\n", offset, len);
10951127
10961128 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
1097
- 0, !!pool, false);
1129
+ 0, from_pool, false);
10981130 osd_req_op_extent_update(req, op_idx, len);
10991131
11001132 BUG_ON(op_idx + 1 != req->r_num_ops);
11011133
1102
- pool = NULL;
1134
+ from_pool = false;
11031135 if (i < locked_pages) {
11041136 BUG_ON(num_ops <= req->r_num_ops);
11051137 num_ops -= req->r_num_ops;
....@@ -1110,8 +1142,8 @@
11101142 pages = kmalloc_array(locked_pages, sizeof(*pages),
11111143 GFP_NOFS);
11121144 if (!pages) {
1113
- pool = fsc->wb_pagevec_pool;
1114
- pages = mempool_alloc(pool, GFP_NOFS);
1145
+ from_pool = true;
1146
+ pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
11151147 BUG_ON(!pages);
11161148 }
11171149 memcpy(pages, data_pages + i,
....@@ -1206,110 +1238,99 @@
12061238 return ret;
12071239 }
12081240
1209
-/*
1210
- * We are only allowed to write into/dirty the page if the page is
1211
- * clean, or already dirty within the same snap context.
1241
+/**
1242
+ * ceph_find_incompatible - find an incompatible context and return it
1243
+ * @page: page being dirtied
12121244 *
1213
- * called with page locked.
1214
- * return success with page locked,
1215
- * or any failure (incl -EAGAIN) with page unlocked.
1245
+ * We are only allowed to write into/dirty a page if the page is
1246
+ * clean, or already dirty within the same snap context. Returns a
1247
+ * conflicting context if there is one, NULL if there isn't, or a
1248
+ * negative error code on other errors.
1249
+ *
1250
+ * Must be called with page lock held.
12161251 */
1217
-static int ceph_update_writeable_page(struct file *file,
1218
- loff_t pos, unsigned len,
1219
- struct page *page)
1252
+static struct ceph_snap_context *
1253
+ceph_find_incompatible(struct page *page)
12201254 {
1221
- struct inode *inode = file_inode(file);
1255
+ struct inode *inode = page->mapping->host;
12221256 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
12231257 struct ceph_inode_info *ci = ceph_inode(inode);
1224
- loff_t page_off = pos & PAGE_MASK;
1225
- int pos_in_page = pos & ~PAGE_MASK;
1226
- int end_in_page = pos_in_page + len;
1227
- loff_t i_size;
1228
- int r;
1229
- struct ceph_snap_context *snapc, *oldest;
12301258
12311259 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
12321260 dout(" page %p forced umount\n", page);
1233
- unlock_page(page);
1234
- return -EIO;
1261
+ return ERR_PTR(-EIO);
12351262 }
12361263
1237
-retry_locked:
1238
- /* writepages currently holds page lock, but if we change that later, */
1239
- wait_on_page_writeback(page);
1264
+ for (;;) {
1265
+ struct ceph_snap_context *snapc, *oldest;
12401266
1241
- snapc = page_snap_context(page);
1242
- if (snapc && snapc != ci->i_head_snapc) {
1267
+ wait_on_page_writeback(page);
1268
+
1269
+ snapc = page_snap_context(page);
1270
+ if (!snapc || snapc == ci->i_head_snapc)
1271
+ break;
1272
+
12431273 /*
12441274 * this page is already dirty in another (older) snap
12451275 * context! is it writeable now?
12461276 */
12471277 oldest = get_oldest_context(inode, NULL, NULL);
12481278 if (snapc->seq > oldest->seq) {
1279
+ /* not writeable -- return it for the caller to deal with */
12491280 ceph_put_snap_context(oldest);
1250
- dout(" page %p snapc %p not current or oldest\n",
1251
- page, snapc);
1252
- /*
1253
- * queue for writeback, and wait for snapc to
1254
- * be writeable or written
1255
- */
1256
- snapc = ceph_get_snap_context(snapc);
1257
- unlock_page(page);
1258
- ceph_queue_writeback(inode);
1259
- r = wait_event_killable(ci->i_cap_wq,
1260
- context_is_writeable_or_written(inode, snapc));
1261
- ceph_put_snap_context(snapc);
1262
- if (r == -ERESTARTSYS)
1263
- return r;
1264
- return -EAGAIN;
1281
+ dout(" page %p snapc %p not current or oldest\n", page, snapc);
1282
+ return ceph_get_snap_context(snapc);
12651283 }
12661284 ceph_put_snap_context(oldest);
12671285
12681286 /* yay, writeable, do it now (without dropping page lock) */
1269
- dout(" page %p snapc %p not current, but oldest\n",
1270
- page, snapc);
1271
- if (!clear_page_dirty_for_io(page))
1272
- goto retry_locked;
1273
- r = writepage_nounlock(page, NULL);
1274
- if (r < 0)
1275
- goto fail_unlock;
1276
- goto retry_locked;
1287
+ dout(" page %p snapc %p not current, but oldest\n", page, snapc);
1288
+ if (clear_page_dirty_for_io(page)) {
1289
+ int r = writepage_nounlock(page, NULL);
1290
+ if (r < 0)
1291
+ return ERR_PTR(r);
1292
+ }
12771293 }
1294
+ return NULL;
1295
+}
12781296
1279
- if (PageUptodate(page)) {
1280
- dout(" page %p already uptodate\n", page);
1281
- return 0;
1282
- }
1297
+/**
1298
+ * prep_noread_page - prep a page for writing without reading first
1299
+ * @page: page being prepared
1300
+ * @pos: starting position for the write
1301
+ * @len: length of write
1302
+ *
1303
+ * In some cases, write_begin doesn't need to read at all:
1304
+ * - full page write
1305
+ * - file is currently zero-length
1306
+ * - write that lies in a page that is completely beyond EOF
1307
+ * - write that covers the the page from start to EOF or beyond it
1308
+ *
1309
+ * If any of these criteria are met, then zero out the unwritten parts
1310
+ * of the page and return true. Otherwise, return false.
1311
+ */
1312
+static bool skip_page_read(struct page *page, loff_t pos, size_t len)
1313
+{
1314
+ struct inode *inode = page->mapping->host;
1315
+ loff_t i_size = i_size_read(inode);
1316
+ size_t offset = offset_in_page(pos);
12831317
1284
- /* full page? */
1285
- if (pos_in_page == 0 && len == PAGE_SIZE)
1286
- return 0;
1318
+ /* Full page write */
1319
+ if (offset == 0 && len >= PAGE_SIZE)
1320
+ return true;
12871321
1288
- /* past end of file? */
1289
- i_size = i_size_read(inode);
1322
+ /* pos beyond last page in the file */
1323
+ if (pos - offset >= i_size)
1324
+ goto zero_out;
12901325
1291
- if (page_off >= i_size ||
1292
- (pos_in_page == 0 && (pos+len) >= i_size &&
1293
- end_in_page - pos_in_page != PAGE_SIZE)) {
1294
- dout(" zeroing %p 0 - %d and %d - %d\n",
1295
- page, pos_in_page, end_in_page, (int)PAGE_SIZE);
1296
- zero_user_segments(page,
1297
- 0, pos_in_page,
1298
- end_in_page, PAGE_SIZE);
1299
- return 0;
1300
- }
1326
+ /* write that covers the whole page from start to EOF or beyond it */
1327
+ if (offset == 0 && (pos + len) >= i_size)
1328
+ goto zero_out;
13011329
1302
- /* we need to read it. */
1303
- r = ceph_do_readpage(file, page);
1304
- if (r < 0) {
1305
- if (r == -EINPROGRESS)
1306
- return -EAGAIN;
1307
- goto fail_unlock;
1308
- }
1309
- goto retry_locked;
1310
-fail_unlock:
1311
- unlock_page(page);
1312
- return r;
1330
+ return false;
1331
+zero_out:
1332
+ zero_user_segments(page, 0, offset, offset + len, PAGE_SIZE);
1333
+ return true;
13131334 }
13141335
13151336 /*
....@@ -1321,26 +1342,67 @@
13211342 struct page **pagep, void **fsdata)
13221343 {
13231344 struct inode *inode = file_inode(file);
1324
- struct page *page;
1345
+ struct ceph_inode_info *ci = ceph_inode(inode);
1346
+ struct ceph_snap_context *snapc;
1347
+ struct page *page = NULL;
13251348 pgoff_t index = pos >> PAGE_SHIFT;
1326
- int r;
1349
+ int r = 0;
13271350
1328
- do {
1329
- /* get a page */
1351
+ dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len);
1352
+
1353
+ for (;;) {
13301354 page = grab_cache_page_write_begin(mapping, index, 0);
1331
- if (!page)
1332
- return -ENOMEM;
1355
+ if (!page) {
1356
+ r = -ENOMEM;
1357
+ break;
1358
+ }
13331359
1334
- dout("write_begin file %p inode %p page %p %d~%d\n", file,
1335
- inode, page, (int)pos, (int)len);
1336
-
1337
- r = ceph_update_writeable_page(file, pos, len, page);
1338
- if (r < 0)
1360
+ snapc = ceph_find_incompatible(page);
1361
+ if (snapc) {
1362
+ if (IS_ERR(snapc)) {
1363
+ r = PTR_ERR(snapc);
1364
+ break;
1365
+ }
1366
+ unlock_page(page);
13391367 put_page(page);
1340
- else
1341
- *pagep = page;
1342
- } while (r == -EAGAIN);
1368
+ page = NULL;
1369
+ ceph_queue_writeback(inode);
1370
+ r = wait_event_killable(ci->i_cap_wq,
1371
+ context_is_writeable_or_written(inode, snapc));
1372
+ ceph_put_snap_context(snapc);
1373
+ if (r != 0)
1374
+ break;
1375
+ continue;
1376
+ }
13431377
1378
+ if (PageUptodate(page)) {
1379
+ dout(" page %p already uptodate\n", page);
1380
+ break;
1381
+ }
1382
+
1383
+ /* No need to read in some cases */
1384
+ if (skip_page_read(page, pos, len))
1385
+ break;
1386
+
1387
+ /*
1388
+ * We need to read it. If we get back -EINPROGRESS, then the page was
1389
+ * handed off to fscache and it will be unlocked when the read completes.
1390
+ * Refind the page in that case so we can reacquire the page lock. Otherwise
1391
+ * we got a hard error or the read was completed synchronously.
1392
+ */
1393
+ r = ceph_do_readpage(file, page);
1394
+ if (r != -EINPROGRESS)
1395
+ break;
1396
+ }
1397
+
1398
+ if (r < 0) {
1399
+ if (page) {
1400
+ unlock_page(page);
1401
+ put_page(page);
1402
+ }
1403
+ } else {
1404
+ *pagep = page;
1405
+ }
13441406 return r;
13451407 }
13461408
....@@ -1444,7 +1506,8 @@
14441506 want = CEPH_CAP_FILE_CACHE;
14451507
14461508 got = 0;
1447
- err = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
1509
+ err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
1510
+ &got, &pinned_page);
14481511 if (err < 0)
14491512 goto out_restore;
14501513
....@@ -1488,10 +1551,7 @@
14881551 if (err < 0 || off >= i_size_read(inode)) {
14891552 unlock_page(page);
14901553 put_page(page);
1491
- if (err == -ENOMEM)
1492
- ret = VM_FAULT_OOM;
1493
- else
1494
- ret = VM_FAULT_SIGBUS;
1554
+ ret = vmf_error(err);
14951555 goto out_inline;
14961556 }
14971557 if (err < PAGE_SIZE)
....@@ -1535,6 +1595,7 @@
15351595 if (!prealloc_cf)
15361596 return VM_FAULT_OOM;
15371597
1598
+ sb_start_pagefault(inode->i_sb);
15381599 ceph_block_sigs(&oldset);
15391600
15401601 if (ci->i_inline_version != CEPH_INLINE_NONE) {
....@@ -1563,7 +1624,7 @@
15631624 want = CEPH_CAP_FILE_BUFFER;
15641625
15651626 got = 0;
1566
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
1627
+ err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
15671628 &got, NULL);
15681629 if (err < 0)
15691630 goto out_free;
....@@ -1573,23 +1634,39 @@
15731634
15741635 /* Update time before taking page lock */
15751636 file_update_time(vma->vm_file);
1637
+ inode_inc_iversion_raw(inode);
15761638
15771639 do {
1640
+ struct ceph_snap_context *snapc;
1641
+
15781642 lock_page(page);
15791643
1580
- if ((off > size) || (page->mapping != inode->i_mapping)) {
1644
+ if (page_mkwrite_check_truncate(page, inode) < 0) {
15811645 unlock_page(page);
15821646 ret = VM_FAULT_NOPAGE;
15831647 break;
15841648 }
15851649
1586
- err = ceph_update_writeable_page(vma->vm_file, off, len, page);
1587
- if (err >= 0) {
1650
+ snapc = ceph_find_incompatible(page);
1651
+ if (!snapc) {
15881652 /* success. we'll keep the page locked. */
15891653 set_page_dirty(page);
15901654 ret = VM_FAULT_LOCKED;
1655
+ break;
15911656 }
1592
- } while (err == -EAGAIN);
1657
+
1658
+ unlock_page(page);
1659
+
1660
+ if (IS_ERR(snapc)) {
1661
+ ret = VM_FAULT_SIGBUS;
1662
+ break;
1663
+ }
1664
+
1665
+ ceph_queue_writeback(inode);
1666
+ err = wait_event_killable(ci->i_cap_wq,
1667
+ context_is_writeable_or_written(inode, snapc));
1668
+ ceph_put_snap_context(snapc);
1669
+ } while (err == 0);
15931670
15941671 if (ret == VM_FAULT_LOCKED ||
15951672 ci->i_inline_version != CEPH_INLINE_NONE) {
....@@ -1608,6 +1685,7 @@
16081685 ceph_put_cap_refs(ci, got);
16091686 out_free:
16101687 ceph_restore_sigs(&oldset);
1688
+ sb_end_pagefault(inode->i_sb);
16111689 ceph_free_cap_flush(prealloc_cf);
16121690 if (err < 0)
16131691 ret = vmf_error(err);
....@@ -1773,6 +1851,10 @@
17731851 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
17741852 if (!err)
17751853 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1854
+
1855
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
1856
+ req->r_end_latency, err);
1857
+
17761858 out_put:
17771859 ceph_osdc_put_request(req);
17781860 if (err == -ECANCELED)
....@@ -1940,12 +2022,17 @@
19402022
19412023 if (err >= 0 || err == -ENOENT)
19422024 have |= POOL_READ;
1943
- else if (err != -EPERM)
2025
+ else if (err != -EPERM) {
2026
+ if (err == -EBLOCKLISTED)
2027
+ fsc->blocklisted = true;
19442028 goto out_unlock;
2029
+ }
19452030
19462031 if (err2 == 0 || err2 == -EEXIST)
19472032 have |= POOL_WRITE;
19482033 else if (err2 != -EPERM) {
2034
+ if (err2 == -EBLOCKLISTED)
2035
+ fsc->blocklisted = true;
19492036 err = err2;
19502037 goto out_unlock;
19512038 }
....@@ -1983,10 +2070,11 @@
19832070 return err;
19842071 }
19852072
1986
-int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
2073
+int ceph_pool_perm_check(struct inode *inode, int need)
19872074 {
1988
- s64 pool;
2075
+ struct ceph_inode_info *ci = ceph_inode(inode);
19892076 struct ceph_string *pool_ns;
2077
+ s64 pool;
19902078 int ret, flags;
19912079
19922080 if (ci->i_vino.snap != CEPH_NOSNAP) {
....@@ -1998,7 +2086,7 @@
19982086 return 0;
19992087 }
20002088
2001
- if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
2089
+ if (ceph_test_mount_opt(ceph_inode_to_client(inode),
20022090 NOPOOLPERM))
20032091 return 0;
20042092