hc
2024-12-19 9370bb92b2d16684ee45cf24e879c93c509162da
kernel/drivers/block/rbd.c
....@@ -34,7 +34,7 @@
3434 #include <linux/ceph/cls_lock_client.h>
3535 #include <linux/ceph/striper.h>
3636 #include <linux/ceph/decode.h>
37
-#include <linux/parser.h>
37
+#include <linux/fs_parser.h>
3838 #include <linux/bsearch.h>
3939
4040 #include <linux/kernel.h>
....@@ -115,12 +115,18 @@
115115 #define RBD_FEATURE_LAYERING (1ULL<<0)
116116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118
+#define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119
+#define RBD_FEATURE_FAST_DIFF (1ULL<<4)
120
+#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
118121 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
119122 #define RBD_FEATURE_OPERATIONS (1ULL<<8)
120123
121124 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
122125 RBD_FEATURE_STRIPINGV2 | \
123126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127
+ RBD_FEATURE_OBJECT_MAP | \
128
+ RBD_FEATURE_FAST_DIFF | \
129
+ RBD_FEATURE_DEEP_FLATTEN | \
124130 RBD_FEATURE_DATA_POOL | \
125131 RBD_FEATURE_OPERATIONS)
126132
....@@ -201,6 +207,11 @@
201207 struct list_head node;
202208 };
203209
210
+struct pending_result {
211
+ int result; /* first nonzero result */
212
+ int num_pending;
213
+};
214
+
204215 struct rbd_img_request;
205216
206217 enum obj_request_type {
....@@ -214,34 +225,69 @@
214225 OBJ_OP_READ = 1,
215226 OBJ_OP_WRITE,
216227 OBJ_OP_DISCARD,
228
+ OBJ_OP_ZEROOUT,
229
+};
230
+
231
+#define RBD_OBJ_FLAG_DELETION (1U << 0)
232
+#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
233
+#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234
+#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235
+#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
236
+
237
+enum rbd_obj_read_state {
238
+ RBD_OBJ_READ_START = 1,
239
+ RBD_OBJ_READ_OBJECT,
240
+ RBD_OBJ_READ_PARENT,
217241 };
218242
219243 /*
220244 * Writes go through the following state machine to deal with
221245 * layering:
222246 *
223
- * need copyup
224
- * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
225
- * | ^ |
226
- * v \------------------------------/
227
- * done
228
- * ^
229
- * |
230
- * RBD_OBJ_WRITE_FLAT
247
+ * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248
+ * . | .
249
+ * . v .
250
+ * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . .
251
+ * . | . .
252
+ * . v v (deep-copyup .
253
+ * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) .
254
+ * flattened) v | . .
255
+ * . v . .
256
+ * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup .
257
+ * | not needed) v
258
+ * v .
259
+ * done . . . . . . . . . . . . . . . . . .
260
+ * ^
261
+ * |
262
+ * RBD_OBJ_WRITE_FLAT
231263 *
232264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
233
- * there is a parent or not.
265
+ * assert_exists guard is needed or not (in some cases it's not needed
266
+ * even if there is a parent).
234267 */
235268 enum rbd_obj_write_state {
236
- RBD_OBJ_WRITE_FLAT = 1,
237
- RBD_OBJ_WRITE_GUARD,
269
+ RBD_OBJ_WRITE_START = 1,
270
+ RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271
+ RBD_OBJ_WRITE_OBJECT,
272
+ __RBD_OBJ_WRITE_COPYUP,
238273 RBD_OBJ_WRITE_COPYUP,
274
+ RBD_OBJ_WRITE_POST_OBJECT_MAP,
275
+};
276
+
277
+enum rbd_obj_copyup_state {
278
+ RBD_OBJ_COPYUP_START = 1,
279
+ RBD_OBJ_COPYUP_READ_PARENT,
280
+ __RBD_OBJ_COPYUP_OBJECT_MAPS,
281
+ RBD_OBJ_COPYUP_OBJECT_MAPS,
282
+ __RBD_OBJ_COPYUP_WRITE_OBJECT,
283
+ RBD_OBJ_COPYUP_WRITE_OBJECT,
239284 };
240285
241286 struct rbd_obj_request {
242287 struct ceph_object_extent ex;
288
+ unsigned int flags; /* RBD_OBJ_FLAG_* */
243289 union {
244
- bool tried_parent; /* for reads */
290
+ enum rbd_obj_read_state read_state; /* for reads */
245291 enum rbd_obj_write_state write_state; /* for writes */
246292 };
247293
....@@ -257,14 +303,15 @@
257303 u32 bvec_idx;
258304 };
259305 };
306
+
307
+ enum rbd_obj_copyup_state copyup_state;
260308 struct bio_vec *copyup_bvecs;
261309 u32 copyup_bvec_count;
262310
263
- struct ceph_osd_request *osd_req;
311
+ struct list_head osd_reqs; /* w/ r_private_item */
264312
265
- u64 xferred; /* bytes transferred */
266
- int result;
267
-
313
+ struct mutex state_mutex;
314
+ struct pending_result pending;
268315 struct kref kref;
269316 };
270317
....@@ -273,28 +320,32 @@
273320 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
274321 };
275322
323
+enum rbd_img_state {
324
+ RBD_IMG_START = 1,
325
+ RBD_IMG_EXCLUSIVE_LOCK,
326
+ __RBD_IMG_OBJECT_REQUESTS,
327
+ RBD_IMG_OBJECT_REQUESTS,
328
+};
329
+
276330 struct rbd_img_request {
277331 struct rbd_device *rbd_dev;
278332 enum obj_operation_type op_type;
279333 enum obj_request_type data_type;
280334 unsigned long flags;
335
+ enum rbd_img_state state;
281336 union {
282337 u64 snap_id; /* for reads */
283338 struct ceph_snap_context *snapc; /* for writes */
284339 };
285
- union {
286
- struct request *rq; /* block request */
287
- struct rbd_obj_request *obj_request; /* obj req initiator */
288
- };
289
- spinlock_t completion_lock;
290
- u64 xferred;/* aggregate bytes transferred */
291
- int result; /* first nonzero obj_request result */
340
+ struct rbd_obj_request *obj_request; /* obj req initiator */
292341
342
+ struct list_head lock_item;
293343 struct list_head object_extents; /* obj_req.ex structs */
294
- u32 obj_request_count;
295
- u32 pending_count;
296344
297
- struct kref kref;
345
+ struct mutex state_mutex;
346
+ struct pending_result pending;
347
+ struct work_struct work;
348
+ int work_result;
298349 };
299350
300351 #define for_each_obj_request(ireq, oreq) \
....@@ -322,7 +373,6 @@
322373
323374 struct rbd_mapping {
324375 u64 size;
325
- u64 features;
326376 };
327377
328378 /*
....@@ -367,7 +417,17 @@
367417 struct work_struct released_lock_work;
368418 struct delayed_work lock_dwork;
369419 struct work_struct unlock_work;
370
- wait_queue_head_t lock_waitq;
420
+ spinlock_t lock_lists_lock;
421
+ struct list_head acquiring_list;
422
+ struct list_head running_list;
423
+ struct completion acquire_wait;
424
+ int acquire_err;
425
+ struct completion releasing_wait;
426
+
427
+ spinlock_t object_map_lock;
428
+ u8 *object_map;
429
+ u64 object_map_size; /* in objects */
430
+ u64 object_map_flags;
371431
372432 struct workqueue_struct *task_wq;
373433
....@@ -395,12 +455,11 @@
395455 * Flag bits for rbd_dev->flags:
396456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397457 * by rbd_dev->lock
398
- * - BLACKLISTED is protected by rbd_dev->lock_rwsem
399458 */
400459 enum rbd_dev_flags {
401
- RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
460
+ RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */
402461 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
403
- RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
462
+ RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */
404463 };
405464
406465 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
....@@ -421,6 +480,10 @@
421480
422481 static struct workqueue_struct *rbd_wq;
423482
483
+static struct ceph_snap_context rbd_empty_snapc = {
484
+ .nref = REFCOUNT_INIT(1),
485
+};
486
+
424487 /*
425488 * single-major requires >= 0.75 version of userspace rbd utility.
426489 */
....@@ -428,14 +491,13 @@
428491 module_param(single_major, bool, 0444);
429492 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
430493
431
-static ssize_t rbd_add(struct bus_type *bus, const char *buf,
432
- size_t count);
433
-static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
434
- size_t count);
435
-static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
436
- size_t count);
437
-static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
438
- size_t count);
494
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
495
+static ssize_t remove_store(struct bus_type *bus, const char *buf,
496
+ size_t count);
497
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
498
+ size_t count);
499
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
500
+ size_t count);
439501 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
440502
441503 static int rbd_dev_id_to_minor(int dev_id)
....@@ -448,8 +510,20 @@
448510 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
449511 }
450512
513
+static bool rbd_is_ro(struct rbd_device *rbd_dev)
514
+{
515
+ return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
516
+}
517
+
518
+static bool rbd_is_snap(struct rbd_device *rbd_dev)
519
+{
520
+ return rbd_dev->spec->snap_id != CEPH_NOSNAP;
521
+}
522
+
451523 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
452524 {
525
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
526
+
453527 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
454528 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
455529 }
....@@ -464,16 +538,16 @@
464538 return is_lock_owner;
465539 }
466540
467
-static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
541
+static ssize_t supported_features_show(struct bus_type *bus, char *buf)
468542 {
469543 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
470544 }
471545
472
-static BUS_ATTR(add, 0200, NULL, rbd_add);
473
-static BUS_ATTR(remove, 0200, NULL, rbd_remove);
474
-static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
475
-static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
476
-static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
546
+static BUS_ATTR_WO(add);
547
+static BUS_ATTR_WO(remove);
548
+static BUS_ATTR_WO(add_single_major);
549
+static BUS_ATTR_WO(remove_single_major);
550
+static BUS_ATTR_RO(supported_features);
477551
478552 static struct attribute *rbd_bus_attrs[] = {
479553 &bus_attr_add.attr,
....@@ -558,15 +632,32 @@
558632 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
559633
560634 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
561
-static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
562
-static int rbd_dev_header_info(struct rbd_device *rbd_dev);
563
-static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
635
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev,
636
+ struct rbd_image_header *header);
564637 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
565638 u64 snap_id);
566639 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
567640 u8 *order, u64 *snap_size);
568
-static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
569
- u64 *snap_features);
641
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
642
+
643
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
644
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
645
+
646
+/*
647
+ * Return true if nothing else is pending.
648
+ */
649
+static bool pending_result_dec(struct pending_result *pending, int *result)
650
+{
651
+ rbd_assert(pending->num_pending > 0);
652
+
653
+ if (*result && !pending->result)
654
+ pending->result = *result;
655
+ if (--pending->num_pending)
656
+ return false;
657
+
658
+ *result = pending->result;
659
+ return true;
660
+}
570661
571662 static int rbd_open(struct block_device *bdev, fmode_t mode)
572663 {
....@@ -607,9 +698,16 @@
607698 if (get_user(ro, (int __user *)arg))
608699 return -EFAULT;
609700
610
- /* Snapshots can't be marked read-write */
611
- if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
612
- return -EROFS;
701
+ /*
702
+ * Both images mapped read-only and snapshots can't be marked
703
+ * read-write.
704
+ */
705
+ if (!ro) {
706
+ if (rbd_is_ro(rbd_dev))
707
+ return -EROFS;
708
+
709
+ rbd_assert(!rbd_is_snap(rbd_dev));
710
+ }
613711
614712 /* Let blkdev_roset() handle it */
615713 return -ENOTTY;
....@@ -733,121 +831,74 @@
733831 */
734832 enum {
735833 Opt_queue_depth,
834
+ Opt_alloc_size,
736835 Opt_lock_timeout,
737
- Opt_last_int,
738836 /* int args above */
739837 Opt_pool_ns,
740
- Opt_last_string,
838
+ Opt_compression_hint,
741839 /* string args above */
742840 Opt_read_only,
743841 Opt_read_write,
744842 Opt_lock_on_read,
745843 Opt_exclusive,
746844 Opt_notrim,
747
- Opt_err
748845 };
749846
750
-static match_table_t rbd_opts_tokens = {
751
- {Opt_queue_depth, "queue_depth=%d"},
752
- {Opt_lock_timeout, "lock_timeout=%d"},
753
- /* int args above */
754
- {Opt_pool_ns, "_pool_ns=%s"},
755
- /* string args above */
756
- {Opt_read_only, "read_only"},
757
- {Opt_read_only, "ro"}, /* Alternate spelling */
758
- {Opt_read_write, "read_write"},
759
- {Opt_read_write, "rw"}, /* Alternate spelling */
760
- {Opt_lock_on_read, "lock_on_read"},
761
- {Opt_exclusive, "exclusive"},
762
- {Opt_notrim, "notrim"},
763
- {Opt_err, NULL}
847
+enum {
848
+ Opt_compression_hint_none,
849
+ Opt_compression_hint_compressible,
850
+ Opt_compression_hint_incompressible,
851
+};
852
+
853
+static const struct constant_table rbd_param_compression_hint[] = {
854
+ {"none", Opt_compression_hint_none},
855
+ {"compressible", Opt_compression_hint_compressible},
856
+ {"incompressible", Opt_compression_hint_incompressible},
857
+ {}
858
+};
859
+
860
+static const struct fs_parameter_spec rbd_parameters[] = {
861
+ fsparam_u32 ("alloc_size", Opt_alloc_size),
862
+ fsparam_enum ("compression_hint", Opt_compression_hint,
863
+ rbd_param_compression_hint),
864
+ fsparam_flag ("exclusive", Opt_exclusive),
865
+ fsparam_flag ("lock_on_read", Opt_lock_on_read),
866
+ fsparam_u32 ("lock_timeout", Opt_lock_timeout),
867
+ fsparam_flag ("notrim", Opt_notrim),
868
+ fsparam_string ("_pool_ns", Opt_pool_ns),
869
+ fsparam_u32 ("queue_depth", Opt_queue_depth),
870
+ fsparam_flag ("read_only", Opt_read_only),
871
+ fsparam_flag ("read_write", Opt_read_write),
872
+ fsparam_flag ("ro", Opt_read_only),
873
+ fsparam_flag ("rw", Opt_read_write),
874
+ {}
764875 };
765876
766877 struct rbd_options {
767878 int queue_depth;
879
+ int alloc_size;
768880 unsigned long lock_timeout;
769881 bool read_only;
770882 bool lock_on_read;
771883 bool exclusive;
772884 bool trim;
885
+
886
+ u32 alloc_hint_flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
773887 };
774888
775889 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
890
+#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
776891 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
777892 #define RBD_READ_ONLY_DEFAULT false
778893 #define RBD_LOCK_ON_READ_DEFAULT false
779894 #define RBD_EXCLUSIVE_DEFAULT false
780895 #define RBD_TRIM_DEFAULT true
781896
782
-struct parse_rbd_opts_ctx {
897
+struct rbd_parse_opts_ctx {
783898 struct rbd_spec *spec;
899
+ struct ceph_options *copts;
784900 struct rbd_options *opts;
785901 };
786
-
787
-static int parse_rbd_opts_token(char *c, void *private)
788
-{
789
- struct parse_rbd_opts_ctx *pctx = private;
790
- substring_t argstr[MAX_OPT_ARGS];
791
- int token, intval, ret;
792
-
793
- token = match_token(c, rbd_opts_tokens, argstr);
794
- if (token < Opt_last_int) {
795
- ret = match_int(&argstr[0], &intval);
796
- if (ret < 0) {
797
- pr_err("bad option arg (not int) at '%s'\n", c);
798
- return ret;
799
- }
800
- dout("got int token %d val %d\n", token, intval);
801
- } else if (token > Opt_last_int && token < Opt_last_string) {
802
- dout("got string token %d val %s\n", token, argstr[0].from);
803
- } else {
804
- dout("got token %d\n", token);
805
- }
806
-
807
- switch (token) {
808
- case Opt_queue_depth:
809
- if (intval < 1) {
810
- pr_err("queue_depth out of range\n");
811
- return -EINVAL;
812
- }
813
- pctx->opts->queue_depth = intval;
814
- break;
815
- case Opt_lock_timeout:
816
- /* 0 is "wait forever" (i.e. infinite timeout) */
817
- if (intval < 0 || intval > INT_MAX / 1000) {
818
- pr_err("lock_timeout out of range\n");
819
- return -EINVAL;
820
- }
821
- pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
822
- break;
823
- case Opt_pool_ns:
824
- kfree(pctx->spec->pool_ns);
825
- pctx->spec->pool_ns = match_strdup(argstr);
826
- if (!pctx->spec->pool_ns)
827
- return -ENOMEM;
828
- break;
829
- case Opt_read_only:
830
- pctx->opts->read_only = true;
831
- break;
832
- case Opt_read_write:
833
- pctx->opts->read_only = false;
834
- break;
835
- case Opt_lock_on_read:
836
- pctx->opts->lock_on_read = true;
837
- break;
838
- case Opt_exclusive:
839
- pctx->opts->exclusive = true;
840
- break;
841
- case Opt_notrim:
842
- pctx->opts->trim = false;
843
- break;
844
- default:
845
- /* libceph prints "bad option" msg */
846
- return -EINVAL;
847
- }
848
-
849
- return 0;
850
-}
851902
852903 static char* obj_op_name(enum obj_operation_type op_type)
853904 {
....@@ -858,6 +909,8 @@
858909 return "write";
859910 case OBJ_OP_DISCARD:
860911 return "discard";
912
+ case OBJ_OP_ZEROOUT:
913
+ return "zeroout";
861914 default:
862915 return "???";
863916 }
....@@ -891,23 +944,6 @@
891944 kref_put(&rbdc->kref, rbd_client_release);
892945 }
893946
894
-static int wait_for_latest_osdmap(struct ceph_client *client)
895
-{
896
- u64 newest_epoch;
897
- int ret;
898
-
899
- ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
900
- if (ret)
901
- return ret;
902
-
903
- if (client->osdc.osdmap->epoch >= newest_epoch)
904
- return 0;
905
-
906
- ceph_osdc_maybe_request_map(&client->osdc);
907
- return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
908
- client->options->mount_timeout);
909
-}
910
-
911947 /*
912948 * Get a ceph client with specific addr and configuration, if one does
913949 * not exist create it. Either way, ceph_opts is consumed by this
....@@ -918,7 +954,7 @@
918954 struct rbd_client *rbdc;
919955 int ret;
920956
921
- mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
957
+ mutex_lock(&client_mutex);
922958 rbdc = rbd_client_find(ceph_opts);
923959 if (rbdc) {
924960 ceph_destroy_options(ceph_opts);
....@@ -927,7 +963,8 @@
927963 * Using an existing client. Make sure ->pg_pools is up to
928964 * date before we look up the pool id in do_rbd_add().
929965 */
930
- ret = wait_for_latest_osdmap(rbdc->client);
966
+ ret = ceph_wait_for_latest_osdmap(rbdc->client,
967
+ rbdc->client->options->mount_timeout);
931968 if (ret) {
932969 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
933970 rbd_put_client(rbdc);
....@@ -1009,15 +1046,24 @@
10091046 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
10101047 }
10111048
1049
+static void rbd_image_header_cleanup(struct rbd_image_header *header)
1050
+{
1051
+ kfree(header->object_prefix);
1052
+ ceph_put_snap_context(header->snapc);
1053
+ kfree(header->snap_sizes);
1054
+ kfree(header->snap_names);
1055
+
1056
+ memset(header, 0, sizeof(*header));
1057
+}
1058
+
10121059 /*
10131060 * Fill an rbd image header with information from the given format 1
10141061 * on-disk header.
10151062 */
1016
-static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1017
- struct rbd_image_header_ondisk *ondisk)
1063
+static int rbd_header_from_disk(struct rbd_image_header *header,
1064
+ struct rbd_image_header_ondisk *ondisk,
1065
+ bool first_time)
10181066 {
1019
- struct rbd_image_header *header = &rbd_dev->header;
1020
- bool first_time = header->object_prefix == NULL;
10211067 struct ceph_snap_context *snapc;
10221068 char *object_prefix = NULL;
10231069 char *snap_names = NULL;
....@@ -1084,11 +1130,6 @@
10841130 if (first_time) {
10851131 header->object_prefix = object_prefix;
10861132 header->obj_order = ondisk->options.order;
1087
- rbd_init_layout(rbd_dev);
1088
- } else {
1089
- ceph_put_snap_context(header->snapc);
1090
- kfree(header->snap_names);
1091
- kfree(header->snap_sizes);
10921133 }
10931134
10941135 /* The remaining fields always get updated (when we refresh) */
....@@ -1213,51 +1254,23 @@
12131254 return 0;
12141255 }
12151256
1216
-static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1217
- u64 *snap_features)
1218
-{
1219
- rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1220
- if (snap_id == CEPH_NOSNAP) {
1221
- *snap_features = rbd_dev->header.features;
1222
- } else if (rbd_dev->image_format == 1) {
1223
- *snap_features = 0; /* No features for format 1 */
1224
- } else {
1225
- u64 features = 0;
1226
- int ret;
1227
-
1228
- ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1229
- if (ret)
1230
- return ret;
1231
-
1232
- *snap_features = features;
1233
- }
1234
- return 0;
1235
-}
1236
-
12371257 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
12381258 {
12391259 u64 snap_id = rbd_dev->spec->snap_id;
12401260 u64 size = 0;
1241
- u64 features = 0;
12421261 int ret;
12431262
12441263 ret = rbd_snap_size(rbd_dev, snap_id, &size);
12451264 if (ret)
12461265 return ret;
1247
- ret = rbd_snap_features(rbd_dev, snap_id, &features);
1248
- if (ret)
1249
- return ret;
12501266
12511267 rbd_dev->mapping.size = size;
1252
- rbd_dev->mapping.features = features;
1253
-
12541268 return 0;
12551269 }
12561270
12571271 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
12581272 {
12591273 rbd_dev->mapping.size = 0;
1260
- rbd_dev->mapping.features = 0;
12611274 }
12621275
12631276 static void zero_bvec(struct bio_vec *bv)
....@@ -1300,6 +1313,8 @@
13001313 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
13011314 u32 bytes)
13021315 {
1316
+ dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1317
+
13031318 switch (obj_req->img_request->data_type) {
13041319 case OBJ_REQUEST_BIO:
13051320 zero_bios(&obj_req->bio_pos, off, bytes);
....@@ -1309,7 +1324,7 @@
13091324 zero_bvecs(&obj_req->bvec_pos, off, bytes);
13101325 break;
13111326 default:
1312
- rbd_assert(0);
1327
+ BUG();
13131328 }
13141329 }
13151330
....@@ -1322,22 +1337,6 @@
13221337 kref_put(&obj_request->kref, rbd_obj_request_destroy);
13231338 }
13241339
1325
-static void rbd_img_request_get(struct rbd_img_request *img_request)
1326
-{
1327
- dout("%s: img %p (was %d)\n", __func__, img_request,
1328
- kref_read(&img_request->kref));
1329
- kref_get(&img_request->kref);
1330
-}
1331
-
1332
-static void rbd_img_request_destroy(struct kref *kref);
1333
-static void rbd_img_request_put(struct rbd_img_request *img_request)
1334
-{
1335
- rbd_assert(img_request != NULL);
1336
- dout("%s: img %p (was %d)\n", __func__, img_request,
1337
- kref_read(&img_request->kref));
1338
- kref_put(&img_request->kref, rbd_img_request_destroy);
1339
-}
1340
-
13411340 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
13421341 struct rbd_obj_request *obj_request)
13431342 {
....@@ -1345,8 +1344,6 @@
13451344
13461345 /* Image request now owns object's original reference */
13471346 obj_request->img_request = img_request;
1348
- img_request->obj_request_count++;
1349
- img_request->pending_count++;
13501347 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
13511348 }
13521349
....@@ -1355,19 +1352,17 @@
13551352 {
13561353 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
13571354 list_del(&obj_request->ex.oe_item);
1358
- rbd_assert(img_request->obj_request_count > 0);
1359
- img_request->obj_request_count--;
13601355 rbd_assert(obj_request->img_request == img_request);
13611356 rbd_obj_request_put(obj_request);
13621357 }
13631358
1364
-static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1359
+static void rbd_osd_submit(struct ceph_osd_request *osd_req)
13651360 {
1366
- struct ceph_osd_request *osd_req = obj_request->osd_req;
1361
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
13671362
1368
- dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1369
- obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1370
- obj_request->ex.oe_len, osd_req);
1363
+ dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1364
+ __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1365
+ obj_req->ex.oe_off, obj_req->ex.oe_len);
13711366 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
13721367 }
13731368
....@@ -1379,18 +1374,10 @@
13791374 static void img_request_layered_set(struct rbd_img_request *img_request)
13801375 {
13811376 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1382
- smp_mb();
1383
-}
1384
-
1385
-static void img_request_layered_clear(struct rbd_img_request *img_request)
1386
-{
1387
- clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1388
- smp_mb();
13891377 }
13901378
13911379 static bool img_request_layered_test(struct rbd_img_request *img_request)
13921380 {
1393
- smp_mb();
13941381 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
13951382 }
13961383
....@@ -1410,6 +1397,35 @@
14101397 rbd_dev->layout.object_size;
14111398 }
14121399
1400
+/*
1401
+ * Must be called after rbd_obj_calc_img_extents().
1402
+ */
1403
+static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
1404
+{
1405
+ rbd_assert(obj_req->img_request->snapc);
1406
+
1407
+ if (obj_req->img_request->op_type == OBJ_OP_DISCARD) {
1408
+ dout("%s %p objno %llu discard\n", __func__, obj_req,
1409
+ obj_req->ex.oe_objno);
1410
+ return;
1411
+ }
1412
+
1413
+ if (!obj_req->num_img_extents) {
1414
+ dout("%s %p objno %llu not overlapping\n", __func__, obj_req,
1415
+ obj_req->ex.oe_objno);
1416
+ return;
1417
+ }
1418
+
1419
+ if (rbd_obj_is_entire(obj_req) &&
1420
+ !obj_req->img_request->snapc->num_snaps) {
1421
+ dout("%s %p objno %llu entire\n", __func__, obj_req,
1422
+ obj_req->ex.oe_objno);
1423
+ return;
1424
+ }
1425
+
1426
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
1427
+}
1428
+
14131429 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
14141430 {
14151431 return ceph_file_extents_bytes(obj_req->img_extents,
....@@ -1423,47 +1439,47 @@
14231439 return false;
14241440 case OBJ_OP_WRITE:
14251441 case OBJ_OP_DISCARD:
1442
+ case OBJ_OP_ZEROOUT:
14261443 return true;
14271444 default:
14281445 BUG();
14291446 }
14301447 }
14311448
1432
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1433
-
14341449 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
14351450 {
14361451 struct rbd_obj_request *obj_req = osd_req->r_priv;
1452
+ int result;
14371453
14381454 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
14391455 osd_req->r_result, obj_req);
1440
- rbd_assert(osd_req == obj_req->osd_req);
14411456
1442
- obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1443
- if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1444
- obj_req->xferred = osd_req->r_result;
1457
+ /*
1458
+ * Writes aren't allowed to return a data payload. In some
1459
+ * guarded write cases (e.g. stat + zero on an empty object)
1460
+ * a stat response makes it through, but we don't care.
1461
+ */
1462
+ if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1463
+ result = 0;
14451464 else
1446
- /*
1447
- * Writes aren't allowed to return a data payload. In some
1448
- * guarded write cases (e.g. stat + zero on an empty object)
1449
- * a stat response makes it through, but we don't care.
1450
- */
1451
- obj_req->xferred = 0;
1465
+ result = osd_req->r_result;
14521466
1453
- rbd_obj_handle_request(obj_req);
1467
+ rbd_obj_handle_request(obj_req, result);
14541468 }
14551469
1456
-static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1470
+static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
14571471 {
1458
- struct ceph_osd_request *osd_req = obj_request->osd_req;
1472
+ struct rbd_obj_request *obj_request = osd_req->r_priv;
1473
+ struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1474
+ struct ceph_options *opt = rbd_dev->rbd_client->client->options;
14591475
1460
- osd_req->r_flags = CEPH_OSD_FLAG_READ;
1476
+ osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
14611477 osd_req->r_snapid = obj_request->img_request->snap_id;
14621478 }
14631479
1464
-static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1480
+static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
14651481 {
1466
- struct ceph_osd_request *osd_req = obj_request->osd_req;
1482
+ struct rbd_obj_request *obj_request = osd_req->r_priv;
14671483
14681484 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
14691485 ktime_get_real_ts64(&osd_req->r_mtime);
....@@ -1471,21 +1487,21 @@
14711487 }
14721488
14731489 static struct ceph_osd_request *
1474
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1490
+__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1491
+ struct ceph_snap_context *snapc, int num_ops)
14751492 {
1476
- struct rbd_img_request *img_req = obj_req->img_request;
1477
- struct rbd_device *rbd_dev = img_req->rbd_dev;
1493
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
14781494 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
14791495 struct ceph_osd_request *req;
14801496 const char *name_format = rbd_dev->image_format == 1 ?
14811497 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1498
+ int ret;
14821499
1483
- req = ceph_osdc_alloc_request(osdc,
1484
- (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1485
- num_ops, false, GFP_NOIO);
1500
+ req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
14861501 if (!req)
1487
- return NULL;
1502
+ return ERR_PTR(-ENOMEM);
14881503
1504
+ list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
14891505 req->r_callback = rbd_osd_req_callback;
14901506 req->r_priv = obj_req;
14911507
....@@ -1496,23 +1512,21 @@
14961512 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
14971513 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
14981514
1499
- if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1500
- rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1501
- goto err_req;
1502
-
1503
- if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1504
- goto err_req;
1515
+ ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1516
+ rbd_dev->header.object_prefix,
1517
+ obj_req->ex.oe_objno);
1518
+ if (ret)
1519
+ return ERR_PTR(ret);
15051520
15061521 return req;
1507
-
1508
-err_req:
1509
- ceph_osdc_put_request(req);
1510
- return NULL;
15111522 }
15121523
1513
-static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1524
+static struct ceph_osd_request *
1525
+rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
15141526 {
1515
- ceph_osdc_put_request(osd_req);
1527
+ rbd_assert(obj_req->img_request->snapc);
1528
+ return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1529
+ num_ops);
15161530 }
15171531
15181532 static struct rbd_obj_request *rbd_obj_request_create(void)
....@@ -1524,6 +1538,8 @@
15241538 return NULL;
15251539
15261540 ceph_object_extent_init(&obj_request->ex);
1541
+ INIT_LIST_HEAD(&obj_request->osd_reqs);
1542
+ mutex_init(&obj_request->state_mutex);
15271543 kref_init(&obj_request->kref);
15281544
15291545 dout("%s %p\n", __func__, obj_request);
....@@ -1533,14 +1549,19 @@
15331549 static void rbd_obj_request_destroy(struct kref *kref)
15341550 {
15351551 struct rbd_obj_request *obj_request;
1552
+ struct ceph_osd_request *osd_req;
15361553 u32 i;
15371554
15381555 obj_request = container_of(kref, struct rbd_obj_request, kref);
15391556
15401557 dout("%s: obj %p\n", __func__, obj_request);
15411558
1542
- if (obj_request->osd_req)
1543
- rbd_osd_req_destroy(obj_request->osd_req);
1559
+ while (!list_empty(&obj_request->osd_reqs)) {
1560
+ osd_req = list_first_entry(&obj_request->osd_reqs,
1561
+ struct ceph_osd_request, r_private_item);
1562
+ list_del_init(&osd_req->r_private_item);
1563
+ ceph_osdc_put_request(osd_req);
1564
+ }
15441565
15451566 switch (obj_request->img_request->data_type) {
15461567 case OBJ_REQUEST_NODATA:
....@@ -1551,7 +1572,7 @@
15511572 kfree(obj_request->bvec_pos.bvecs);
15521573 break;
15531574 default:
1554
- rbd_assert(0);
1575
+ BUG();
15551576 }
15561577
15571578 kfree(obj_request->img_extents);
....@@ -1617,10 +1638,8 @@
16171638 if (!rbd_dev->parent_spec)
16181639 return false;
16191640
1620
- down_read(&rbd_dev->header_rwsem);
16211641 if (rbd_dev->parent_overlap)
16221642 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1623
- up_read(&rbd_dev->header_rwsem);
16241643
16251644 if (counter < 0)
16261645 rbd_warn(rbd_dev, "parent reference overflow");
....@@ -1628,64 +1647,528 @@
16281647 return counter > 0;
16291648 }
16301649
1631
-/*
1632
- * Caller is responsible for filling in the list of object requests
1633
- * that comprises the image request, and the Linux request pointer
1634
- * (if there is one).
1635
- */
1636
-static struct rbd_img_request *rbd_img_request_create(
1637
- struct rbd_device *rbd_dev,
1638
- enum obj_operation_type op_type,
1639
- struct ceph_snap_context *snapc)
1650
+static void rbd_img_request_init(struct rbd_img_request *img_request,
1651
+ struct rbd_device *rbd_dev,
1652
+ enum obj_operation_type op_type)
16401653 {
1641
- struct rbd_img_request *img_request;
1642
-
1643
- img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1644
- if (!img_request)
1645
- return NULL;
1654
+ memset(img_request, 0, sizeof(*img_request));
16461655
16471656 img_request->rbd_dev = rbd_dev;
16481657 img_request->op_type = op_type;
1649
- if (!rbd_img_is_write(img_request))
1650
- img_request->snap_id = rbd_dev->spec->snap_id;
1651
- else
1652
- img_request->snapc = snapc;
16531658
1654
- if (rbd_dev_parent_get(rbd_dev))
1655
- img_request_layered_set(img_request);
1656
-
1657
- spin_lock_init(&img_request->completion_lock);
1659
+ INIT_LIST_HEAD(&img_request->lock_item);
16581660 INIT_LIST_HEAD(&img_request->object_extents);
1659
- kref_init(&img_request->kref);
1660
-
1661
- dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1662
- obj_op_name(op_type), img_request);
1663
- return img_request;
1661
+ mutex_init(&img_request->state_mutex);
16641662 }
16651663
1666
-static void rbd_img_request_destroy(struct kref *kref)
1664
+/*
1665
+ * Only snap_id is captured here, for reads. For writes, snapshot
1666
+ * context is captured in rbd_img_object_requests() after exclusive
1667
+ * lock is ensured to be held.
1668
+ */
1669
+static void rbd_img_capture_header(struct rbd_img_request *img_req)
16671670 {
1668
- struct rbd_img_request *img_request;
1671
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
1672
+
1673
+ lockdep_assert_held(&rbd_dev->header_rwsem);
1674
+
1675
+ if (!rbd_img_is_write(img_req))
1676
+ img_req->snap_id = rbd_dev->spec->snap_id;
1677
+
1678
+ if (rbd_dev_parent_get(rbd_dev))
1679
+ img_request_layered_set(img_req);
1680
+}
1681
+
1682
+static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1683
+{
16691684 struct rbd_obj_request *obj_request;
16701685 struct rbd_obj_request *next_obj_request;
16711686
1672
- img_request = container_of(kref, struct rbd_img_request, kref);
1673
-
16741687 dout("%s: img %p\n", __func__, img_request);
16751688
1689
+ WARN_ON(!list_empty(&img_request->lock_item));
16761690 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
16771691 rbd_img_obj_request_del(img_request, obj_request);
1678
- rbd_assert(img_request->obj_request_count == 0);
16791692
1680
- if (img_request_layered_test(img_request)) {
1681
- img_request_layered_clear(img_request);
1693
+ if (img_request_layered_test(img_request))
16821694 rbd_dev_parent_put(img_request->rbd_dev);
1683
- }
16841695
16851696 if (rbd_img_is_write(img_request))
16861697 ceph_put_snap_context(img_request->snapc);
16871698
1688
- kmem_cache_free(rbd_img_request_cache, img_request);
1699
+ if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1700
+ kmem_cache_free(rbd_img_request_cache, img_request);
1701
+}
1702
+
1703
+#define BITS_PER_OBJ 2
1704
+#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1705
+#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1706
+
1707
+static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1708
+ u64 *index, u8 *shift)
1709
+{
1710
+ u32 off;
1711
+
1712
+ rbd_assert(objno < rbd_dev->object_map_size);
1713
+ *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1714
+ *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1715
+}
1716
+
1717
+static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1718
+{
1719
+ u64 index;
1720
+ u8 shift;
1721
+
1722
+ lockdep_assert_held(&rbd_dev->object_map_lock);
1723
+ __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1724
+ return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1725
+}
1726
+
1727
+static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1728
+{
1729
+ u64 index;
1730
+ u8 shift;
1731
+ u8 *p;
1732
+
1733
+ lockdep_assert_held(&rbd_dev->object_map_lock);
1734
+ rbd_assert(!(val & ~OBJ_MASK));
1735
+
1736
+ __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1737
+ p = &rbd_dev->object_map[index];
1738
+ *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1739
+}
1740
+
1741
+static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1742
+{
1743
+ u8 state;
1744
+
1745
+ spin_lock(&rbd_dev->object_map_lock);
1746
+ state = __rbd_object_map_get(rbd_dev, objno);
1747
+ spin_unlock(&rbd_dev->object_map_lock);
1748
+ return state;
1749
+}
1750
+
1751
+static bool use_object_map(struct rbd_device *rbd_dev)
1752
+{
1753
+ /*
1754
+ * An image mapped read-only can't use the object map -- it isn't
1755
+ * loaded because the header lock isn't acquired. Someone else can
1756
+ * write to the image and update the object map behind our back.
1757
+ *
1758
+ * A snapshot can't be written to, so using the object map is always
1759
+ * safe.
1760
+ */
1761
+ if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1762
+ return false;
1763
+
1764
+ return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1765
+ !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1766
+}
1767
+
1768
+static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1769
+{
1770
+ u8 state;
1771
+
1772
+ /* fall back to default logic if object map is disabled or invalid */
1773
+ if (!use_object_map(rbd_dev))
1774
+ return true;
1775
+
1776
+ state = rbd_object_map_get(rbd_dev, objno);
1777
+ return state != OBJECT_NONEXISTENT;
1778
+}
1779
+
1780
+static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1781
+ struct ceph_object_id *oid)
1782
+{
1783
+ if (snap_id == CEPH_NOSNAP)
1784
+ ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1785
+ rbd_dev->spec->image_id);
1786
+ else
1787
+ ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1788
+ rbd_dev->spec->image_id, snap_id);
1789
+}
1790
+
1791
+static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1792
+{
1793
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1794
+ CEPH_DEFINE_OID_ONSTACK(oid);
1795
+ u8 lock_type;
1796
+ char *lock_tag;
1797
+ struct ceph_locker *lockers;
1798
+ u32 num_lockers;
1799
+ bool broke_lock = false;
1800
+ int ret;
1801
+
1802
+ rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1803
+
1804
+again:
1805
+ ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1806
+ CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1807
+ if (ret != -EBUSY || broke_lock) {
1808
+ if (ret == -EEXIST)
1809
+ ret = 0; /* already locked by myself */
1810
+ if (ret)
1811
+ rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1812
+ return ret;
1813
+ }
1814
+
1815
+ ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1816
+ RBD_LOCK_NAME, &lock_type, &lock_tag,
1817
+ &lockers, &num_lockers);
1818
+ if (ret) {
1819
+ if (ret == -ENOENT)
1820
+ goto again;
1821
+
1822
+ rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1823
+ return ret;
1824
+ }
1825
+
1826
+ kfree(lock_tag);
1827
+ if (num_lockers == 0)
1828
+ goto again;
1829
+
1830
+ rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1831
+ ENTITY_NAME(lockers[0].id.name));
1832
+
1833
+ ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1834
+ RBD_LOCK_NAME, lockers[0].id.cookie,
1835
+ &lockers[0].id.name);
1836
+ ceph_free_lockers(lockers, num_lockers);
1837
+ if (ret) {
1838
+ if (ret == -ENOENT)
1839
+ goto again;
1840
+
1841
+ rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1842
+ return ret;
1843
+ }
1844
+
1845
+ broke_lock = true;
1846
+ goto again;
1847
+}
1848
+
1849
+static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1850
+{
1851
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1852
+ CEPH_DEFINE_OID_ONSTACK(oid);
1853
+ int ret;
1854
+
1855
+ rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1856
+
1857
+ ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1858
+ "");
1859
+ if (ret && ret != -ENOENT)
1860
+ rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1861
+}
1862
+
1863
+static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1864
+{
1865
+ u8 struct_v;
1866
+ u32 struct_len;
1867
+ u32 header_len;
1868
+ void *header_end;
1869
+ int ret;
1870
+
1871
+ ceph_decode_32_safe(p, end, header_len, e_inval);
1872
+ header_end = *p + header_len;
1873
+
1874
+ ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1875
+ &struct_len);
1876
+ if (ret)
1877
+ return ret;
1878
+
1879
+ ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1880
+
1881
+ *p = header_end;
1882
+ return 0;
1883
+
1884
+e_inval:
1885
+ return -EINVAL;
1886
+}
1887
+
1888
+static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1889
+{
1890
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1891
+ CEPH_DEFINE_OID_ONSTACK(oid);
1892
+ struct page **pages;
1893
+ void *p, *end;
1894
+ size_t reply_len;
1895
+ u64 num_objects;
1896
+ u64 object_map_bytes;
1897
+ u64 object_map_size;
1898
+ int num_pages;
1899
+ int ret;
1900
+
1901
+ rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1902
+
1903
+ num_objects = ceph_get_num_objects(&rbd_dev->layout,
1904
+ rbd_dev->mapping.size);
1905
+ object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1906
+ BITS_PER_BYTE);
1907
+ num_pages = calc_pages_for(0, object_map_bytes) + 1;
1908
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1909
+ if (IS_ERR(pages))
1910
+ return PTR_ERR(pages);
1911
+
1912
+ reply_len = num_pages * PAGE_SIZE;
1913
+ rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1914
+ ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1915
+ "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1916
+ NULL, 0, pages, &reply_len);
1917
+ if (ret)
1918
+ goto out;
1919
+
1920
+ p = page_address(pages[0]);
1921
+ end = p + min(reply_len, (size_t)PAGE_SIZE);
1922
+ ret = decode_object_map_header(&p, end, &object_map_size);
1923
+ if (ret)
1924
+ goto out;
1925
+
1926
+ if (object_map_size != num_objects) {
1927
+ rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1928
+ object_map_size, num_objects);
1929
+ ret = -EINVAL;
1930
+ goto out;
1931
+ }
1932
+
1933
+ if (offset_in_page(p) + object_map_bytes > reply_len) {
1934
+ ret = -EINVAL;
1935
+ goto out;
1936
+ }
1937
+
1938
+ rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1939
+ if (!rbd_dev->object_map) {
1940
+ ret = -ENOMEM;
1941
+ goto out;
1942
+ }
1943
+
1944
+ rbd_dev->object_map_size = object_map_size;
1945
+ ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1946
+ offset_in_page(p), object_map_bytes);
1947
+
1948
+out:
1949
+ ceph_release_page_vector(pages, num_pages);
1950
+ return ret;
1951
+}
1952
+
1953
+static void rbd_object_map_free(struct rbd_device *rbd_dev)
1954
+{
1955
+ kvfree(rbd_dev->object_map);
1956
+ rbd_dev->object_map = NULL;
1957
+ rbd_dev->object_map_size = 0;
1958
+}
1959
+
1960
+static int rbd_object_map_load(struct rbd_device *rbd_dev)
1961
+{
1962
+ int ret;
1963
+
1964
+ ret = __rbd_object_map_load(rbd_dev);
1965
+ if (ret)
1966
+ return ret;
1967
+
1968
+ ret = rbd_dev_v2_get_flags(rbd_dev);
1969
+ if (ret) {
1970
+ rbd_object_map_free(rbd_dev);
1971
+ return ret;
1972
+ }
1973
+
1974
+ if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1975
+ rbd_warn(rbd_dev, "object map is invalid");
1976
+
1977
+ return 0;
1978
+}
1979
+
1980
+static int rbd_object_map_open(struct rbd_device *rbd_dev)
1981
+{
1982
+ int ret;
1983
+
1984
+ ret = rbd_object_map_lock(rbd_dev);
1985
+ if (ret)
1986
+ return ret;
1987
+
1988
+ ret = rbd_object_map_load(rbd_dev);
1989
+ if (ret) {
1990
+ rbd_object_map_unlock(rbd_dev);
1991
+ return ret;
1992
+ }
1993
+
1994
+ return 0;
1995
+}
1996
+
1997
+static void rbd_object_map_close(struct rbd_device *rbd_dev)
1998
+{
1999
+ rbd_object_map_free(rbd_dev);
2000
+ rbd_object_map_unlock(rbd_dev);
2001
+}
2002
+
2003
+/*
2004
+ * This function needs snap_id (or more precisely just something to
2005
+ * distinguish between HEAD and snapshot object maps), new_state and
2006
+ * current_state that were passed to rbd_object_map_update().
2007
+ *
2008
+ * To avoid allocating and stashing a context we piggyback on the OSD
2009
+ * request. A HEAD update has two ops (assert_locked). For new_state
2010
+ * and current_state we decode our own object_map_update op, encoded in
2011
+ * rbd_cls_object_map_update().
2012
+ */
2013
+static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2014
+ struct ceph_osd_request *osd_req)
2015
+{
2016
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2017
+ struct ceph_osd_data *osd_data;
2018
+ u64 objno;
2019
+ u8 state, new_state, current_state;
2020
+ bool has_current_state;
2021
+ void *p;
2022
+
2023
+ if (osd_req->r_result)
2024
+ return osd_req->r_result;
2025
+
2026
+ /*
2027
+ * Nothing to do for a snapshot object map.
2028
+ */
2029
+ if (osd_req->r_num_ops == 1)
2030
+ return 0;
2031
+
2032
+ /*
2033
+ * Update in-memory HEAD object map.
2034
+ */
2035
+ rbd_assert(osd_req->r_num_ops == 2);
2036
+ osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2037
+ rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2038
+
2039
+ p = page_address(osd_data->pages[0]);
2040
+ objno = ceph_decode_64(&p);
2041
+ rbd_assert(objno == obj_req->ex.oe_objno);
2042
+ rbd_assert(ceph_decode_64(&p) == objno + 1);
2043
+ new_state = ceph_decode_8(&p);
2044
+ has_current_state = ceph_decode_8(&p);
2045
+ if (has_current_state)
2046
+ current_state = ceph_decode_8(&p);
2047
+
2048
+ spin_lock(&rbd_dev->object_map_lock);
2049
+ state = __rbd_object_map_get(rbd_dev, objno);
2050
+ if (!has_current_state || current_state == state ||
2051
+ (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2052
+ __rbd_object_map_set(rbd_dev, objno, new_state);
2053
+ spin_unlock(&rbd_dev->object_map_lock);
2054
+
2055
+ return 0;
2056
+}
2057
+
2058
+static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2059
+{
2060
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2061
+ int result;
2062
+
2063
+ dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2064
+ osd_req->r_result, obj_req);
2065
+
2066
+ result = rbd_object_map_update_finish(obj_req, osd_req);
2067
+ rbd_obj_handle_request(obj_req, result);
2068
+}
2069
+
2070
+static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2071
+{
2072
+ u8 state = rbd_object_map_get(rbd_dev, objno);
2073
+
2074
+ if (state == new_state ||
2075
+ (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2076
+ (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2077
+ return false;
2078
+
2079
+ return true;
2080
+}
2081
+
2082
+static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2083
+ int which, u64 objno, u8 new_state,
2084
+ const u8 *current_state)
2085
+{
2086
+ struct page **pages;
2087
+ void *p, *start;
2088
+ int ret;
2089
+
2090
+ ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2091
+ if (ret)
2092
+ return ret;
2093
+
2094
+ pages = ceph_alloc_page_vector(1, GFP_NOIO);
2095
+ if (IS_ERR(pages))
2096
+ return PTR_ERR(pages);
2097
+
2098
+ p = start = page_address(pages[0]);
2099
+ ceph_encode_64(&p, objno);
2100
+ ceph_encode_64(&p, objno + 1);
2101
+ ceph_encode_8(&p, new_state);
2102
+ if (current_state) {
2103
+ ceph_encode_8(&p, 1);
2104
+ ceph_encode_8(&p, *current_state);
2105
+ } else {
2106
+ ceph_encode_8(&p, 0);
2107
+ }
2108
+
2109
+ osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2110
+ false, true);
2111
+ return 0;
2112
+}
2113
+
2114
+/*
2115
+ * Return:
2116
+ * 0 - object map update sent
2117
+ * 1 - object map update isn't needed
2118
+ * <0 - error
2119
+ */
2120
+static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2121
+ u8 new_state, const u8 *current_state)
2122
+{
2123
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2124
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2125
+ struct ceph_osd_request *req;
2126
+ int num_ops = 1;
2127
+ int which = 0;
2128
+ int ret;
2129
+
2130
+ if (snap_id == CEPH_NOSNAP) {
2131
+ if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2132
+ return 1;
2133
+
2134
+ num_ops++; /* assert_locked */
2135
+ }
2136
+
2137
+ req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2138
+ if (!req)
2139
+ return -ENOMEM;
2140
+
2141
+ list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2142
+ req->r_callback = rbd_object_map_callback;
2143
+ req->r_priv = obj_req;
2144
+
2145
+ rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2146
+ ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2147
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
2148
+ ktime_get_real_ts64(&req->r_mtime);
2149
+
2150
+ if (snap_id == CEPH_NOSNAP) {
2151
+ /*
2152
+ * Protect against possible race conditions during lock
2153
+ * ownership transitions.
2154
+ */
2155
+ ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2156
+ CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2157
+ if (ret)
2158
+ return ret;
2159
+ }
2160
+
2161
+ ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2162
+ new_state, current_state);
2163
+ if (ret)
2164
+ return ret;
2165
+
2166
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2167
+ if (ret)
2168
+ return ret;
2169
+
2170
+ ceph_osdc_start_request(osdc, req, false);
2171
+ return 0;
16892172 }
16902173
16912174 static void prune_extents(struct ceph_file_extent *img_extents,
....@@ -1735,11 +2218,13 @@
17352218 return 0;
17362219 }
17372220
1738
-static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
2221
+static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
17392222 {
2223
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2224
+
17402225 switch (obj_req->img_request->data_type) {
17412226 case OBJ_REQUEST_BIO:
1742
- osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
2227
+ osd_req_op_extent_osd_data_bio(osd_req, which,
17432228 &obj_req->bio_pos,
17442229 obj_req->ex.oe_len);
17452230 break;
....@@ -1748,30 +2233,15 @@
17482233 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
17492234 obj_req->ex.oe_len);
17502235 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1751
- osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
2236
+ osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
17522237 &obj_req->bvec_pos);
17532238 break;
17542239 default:
1755
- rbd_assert(0);
2240
+ BUG();
17562241 }
17572242 }
17582243
1759
-static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1760
-{
1761
- obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1762
- if (!obj_req->osd_req)
1763
- return -ENOMEM;
1764
-
1765
- osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1766
- obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1767
- rbd_osd_req_setup_data(obj_req, 0);
1768
-
1769
- rbd_osd_req_format_read(obj_req);
1770
- return 0;
1771
-}
1772
-
1773
-static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1774
- unsigned int which)
2244
+static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
17752245 {
17762246 struct page **pages;
17772247
....@@ -1787,39 +2257,61 @@
17872257 if (IS_ERR(pages))
17882258 return PTR_ERR(pages);
17892259
1790
- osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1791
- osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
2260
+ osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2261
+ osd_req_op_raw_data_in_pages(osd_req, which, pages,
17922262 8 + sizeof(struct ceph_timespec),
17932263 0, false, true);
17942264 return 0;
17952265 }
17962266
1797
-static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1798
- unsigned int which)
2267
+static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2268
+ u32 bytes)
17992269 {
2270
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2271
+ int ret;
2272
+
2273
+ ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2274
+ if (ret)
2275
+ return ret;
2276
+
2277
+ osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2278
+ obj_req->copyup_bvec_count, bytes);
2279
+ return 0;
2280
+}
2281
+
2282
+static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2283
+{
2284
+ obj_req->read_state = RBD_OBJ_READ_START;
2285
+ return 0;
2286
+}
2287
+
2288
+static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2289
+ int which)
2290
+{
2291
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
18002292 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
18012293 u16 opcode;
18022294
1803
- osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1804
- rbd_dev->layout.object_size,
1805
- rbd_dev->layout.object_size);
2295
+ if (!use_object_map(rbd_dev) ||
2296
+ !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2297
+ osd_req_op_alloc_hint_init(osd_req, which++,
2298
+ rbd_dev->layout.object_size,
2299
+ rbd_dev->layout.object_size,
2300
+ rbd_dev->opts->alloc_hint_flags);
2301
+ }
18062302
18072303 if (rbd_obj_is_entire(obj_req))
18082304 opcode = CEPH_OSD_OP_WRITEFULL;
18092305 else
18102306 opcode = CEPH_OSD_OP_WRITE;
18112307
1812
- osd_req_op_extent_init(obj_req->osd_req, which, opcode,
2308
+ osd_req_op_extent_init(osd_req, which, opcode,
18132309 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1814
- rbd_osd_req_setup_data(obj_req, which++);
1815
-
1816
- rbd_assert(which == obj_req->osd_req->r_num_ops);
1817
- rbd_osd_req_format_write(obj_req);
2310
+ rbd_osd_setup_data(osd_req, which);
18182311 }
18192312
1820
-static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
2313
+static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
18212314 {
1822
- unsigned int num_osd_ops, which = 0;
18232315 int ret;
18242316
18252317 /* reverse map the entire object onto the parent */
....@@ -1827,61 +2319,104 @@
18272319 if (ret)
18282320 return ret;
18292321
1830
- if (obj_req->num_img_extents) {
1831
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1832
- num_osd_ops = 3; /* stat + setallochint + write/writefull */
1833
- } else {
1834
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1835
- num_osd_ops = 2; /* setallochint + write/writefull */
1836
- }
1837
-
1838
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1839
- if (!obj_req->osd_req)
1840
- return -ENOMEM;
1841
-
1842
- if (obj_req->num_img_extents) {
1843
- ret = __rbd_obj_setup_stat(obj_req, which++);
1844
- if (ret)
1845
- return ret;
1846
- }
1847
-
1848
- __rbd_obj_setup_write(obj_req, which);
2322
+ obj_req->write_state = RBD_OBJ_WRITE_START;
18492323 return 0;
18502324 }
18512325
1852
-static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1853
- unsigned int which)
2326
+static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
18542327 {
2328
+ return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2329
+ CEPH_OSD_OP_ZERO;
2330
+}
2331
+
2332
+static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2333
+ int which)
2334
+{
2335
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2336
+
2337
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2338
+ rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2339
+ osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2340
+ } else {
2341
+ osd_req_op_extent_init(osd_req, which,
2342
+ truncate_or_zero_opcode(obj_req),
2343
+ obj_req->ex.oe_off, obj_req->ex.oe_len,
2344
+ 0, 0);
2345
+ }
2346
+}
2347
+
2348
+static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2349
+{
2350
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2351
+ u64 off, next_off;
2352
+ int ret;
2353
+
2354
+ /*
2355
+ * Align the range to alloc_size boundary and punt on discards
2356
+ * that are too small to free up any space.
2357
+ *
2358
+ * alloc_size == object_size && is_tail() is a special case for
2359
+ * filestore with filestore_punch_hole = false, needed to allow
2360
+ * truncate (in addition to delete).
2361
+ */
2362
+ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2363
+ !rbd_obj_is_tail(obj_req)) {
2364
+ off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2365
+ next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2366
+ rbd_dev->opts->alloc_size);
2367
+ if (off >= next_off)
2368
+ return 1;
2369
+
2370
+ dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2371
+ obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2372
+ off, next_off - off);
2373
+ obj_req->ex.oe_off = off;
2374
+ obj_req->ex.oe_len = next_off - off;
2375
+ }
2376
+
2377
+ /* reverse map the entire object onto the parent */
2378
+ ret = rbd_obj_calc_img_extents(obj_req, true);
2379
+ if (ret)
2380
+ return ret;
2381
+
2382
+ obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2383
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2384
+ obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2385
+
2386
+ obj_req->write_state = RBD_OBJ_WRITE_START;
2387
+ return 0;
2388
+}
2389
+
2390
+static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2391
+ int which)
2392
+{
2393
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
18552394 u16 opcode;
18562395
18572396 if (rbd_obj_is_entire(obj_req)) {
18582397 if (obj_req->num_img_extents) {
1859
- osd_req_op_init(obj_req->osd_req, which++,
1860
- CEPH_OSD_OP_CREATE, 0);
2398
+ if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2399
+ osd_req_op_init(osd_req, which++,
2400
+ CEPH_OSD_OP_CREATE, 0);
18612401 opcode = CEPH_OSD_OP_TRUNCATE;
18622402 } else {
1863
- osd_req_op_init(obj_req->osd_req, which++,
2403
+ rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2404
+ osd_req_op_init(osd_req, which++,
18642405 CEPH_OSD_OP_DELETE, 0);
18652406 opcode = 0;
18662407 }
1867
- } else if (rbd_obj_is_tail(obj_req)) {
1868
- opcode = CEPH_OSD_OP_TRUNCATE;
18692408 } else {
1870
- opcode = CEPH_OSD_OP_ZERO;
2409
+ opcode = truncate_or_zero_opcode(obj_req);
18712410 }
18722411
18732412 if (opcode)
1874
- osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
2413
+ osd_req_op_extent_init(osd_req, which, opcode,
18752414 obj_req->ex.oe_off, obj_req->ex.oe_len,
18762415 0, 0);
1877
-
1878
- rbd_assert(which == obj_req->osd_req->r_num_ops);
1879
- rbd_osd_req_format_write(obj_req);
18802416 }
18812417
1882
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
2418
+static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
18832419 {
1884
- unsigned int num_osd_ops, which = 0;
18852420 int ret;
18862421
18872422 /* reverse map the entire object onto the parent */
....@@ -1889,64 +2424,96 @@
18892424 if (ret)
18902425 return ret;
18912426
1892
- if (rbd_obj_is_entire(obj_req)) {
1893
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1894
- if (obj_req->num_img_extents)
1895
- num_osd_ops = 2; /* create + truncate */
1896
- else
1897
- num_osd_ops = 1; /* delete */
1898
- } else {
1899
- if (obj_req->num_img_extents) {
1900
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1901
- num_osd_ops = 2; /* stat + truncate/zero */
1902
- } else {
1903
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1904
- num_osd_ops = 1; /* truncate/zero */
1905
- }
2427
+ if (!obj_req->num_img_extents) {
2428
+ obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2429
+ if (rbd_obj_is_entire(obj_req))
2430
+ obj_req->flags |= RBD_OBJ_FLAG_DELETION;
19062431 }
19072432
1908
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1909
- if (!obj_req->osd_req)
1910
- return -ENOMEM;
1911
-
1912
- if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1913
- ret = __rbd_obj_setup_stat(obj_req, which++);
1914
- if (ret)
1915
- return ret;
1916
- }
1917
-
1918
- __rbd_obj_setup_discard(obj_req, which);
2433
+ obj_req->write_state = RBD_OBJ_WRITE_START;
19192434 return 0;
19202435 }
19212436
2437
+static int count_write_ops(struct rbd_obj_request *obj_req)
2438
+{
2439
+ struct rbd_img_request *img_req = obj_req->img_request;
2440
+
2441
+ switch (img_req->op_type) {
2442
+ case OBJ_OP_WRITE:
2443
+ if (!use_object_map(img_req->rbd_dev) ||
2444
+ !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2445
+ return 2; /* setallochint + write/writefull */
2446
+
2447
+ return 1; /* write/writefull */
2448
+ case OBJ_OP_DISCARD:
2449
+ return 1; /* delete/truncate/zero */
2450
+ case OBJ_OP_ZEROOUT:
2451
+ if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2452
+ !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2453
+ return 2; /* create + truncate */
2454
+
2455
+ return 1; /* delete/truncate/zero */
2456
+ default:
2457
+ BUG();
2458
+ }
2459
+}
2460
+
2461
+static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2462
+ int which)
2463
+{
2464
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2465
+
2466
+ switch (obj_req->img_request->op_type) {
2467
+ case OBJ_OP_WRITE:
2468
+ __rbd_osd_setup_write_ops(osd_req, which);
2469
+ break;
2470
+ case OBJ_OP_DISCARD:
2471
+ __rbd_osd_setup_discard_ops(osd_req, which);
2472
+ break;
2473
+ case OBJ_OP_ZEROOUT:
2474
+ __rbd_osd_setup_zeroout_ops(osd_req, which);
2475
+ break;
2476
+ default:
2477
+ BUG();
2478
+ }
2479
+}
2480
+
19222481 /*
1923
- * For each object request in @img_req, allocate an OSD request, add
1924
- * individual OSD ops and prepare them for submission. The number of
1925
- * OSD ops depends on op_type and the overlap point (if any).
2482
+ * Prune the list of object requests (adjust offset and/or length, drop
2483
+ * redundant requests). Prepare object request state machines and image
2484
+ * request state machine for execution.
19262485 */
19272486 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
19282487 {
1929
- struct rbd_obj_request *obj_req;
2488
+ struct rbd_obj_request *obj_req, *next_obj_req;
19302489 int ret;
19312490
1932
- for_each_obj_request(img_req, obj_req) {
2491
+ for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
19332492 switch (img_req->op_type) {
19342493 case OBJ_OP_READ:
1935
- ret = rbd_obj_setup_read(obj_req);
2494
+ ret = rbd_obj_init_read(obj_req);
19362495 break;
19372496 case OBJ_OP_WRITE:
1938
- ret = rbd_obj_setup_write(obj_req);
2497
+ ret = rbd_obj_init_write(obj_req);
19392498 break;
19402499 case OBJ_OP_DISCARD:
1941
- ret = rbd_obj_setup_discard(obj_req);
2500
+ ret = rbd_obj_init_discard(obj_req);
2501
+ break;
2502
+ case OBJ_OP_ZEROOUT:
2503
+ ret = rbd_obj_init_zeroout(obj_req);
19422504 break;
19432505 default:
1944
- rbd_assert(0);
2506
+ BUG();
19452507 }
1946
- if (ret)
2508
+ if (ret < 0)
19472509 return ret;
2510
+ if (ret > 0) {
2511
+ rbd_img_obj_request_del(img_req, obj_req);
2512
+ continue;
2513
+ }
19482514 }
19492515
2516
+ img_req->state = RBD_IMG_START;
19502517 return 0;
19512518 }
19522519
....@@ -2235,32 +2802,78 @@
22352802 &it);
22362803 }
22372804
2238
-static void rbd_img_request_submit(struct rbd_img_request *img_request)
2805
+static void rbd_img_handle_request_work(struct work_struct *work)
22392806 {
2240
- struct rbd_obj_request *obj_request;
2807
+ struct rbd_img_request *img_req =
2808
+ container_of(work, struct rbd_img_request, work);
22412809
2242
- dout("%s: img %p\n", __func__, img_request);
2810
+ rbd_img_handle_request(img_req, img_req->work_result);
2811
+}
22432812
2244
- rbd_img_request_get(img_request);
2245
- for_each_obj_request(img_request, obj_request)
2246
- rbd_obj_request_submit(obj_request);
2813
+static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2814
+{
2815
+ INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2816
+ img_req->work_result = result;
2817
+ queue_work(rbd_wq, &img_req->work);
2818
+}
22472819
2248
- rbd_img_request_put(img_request);
2820
+static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2821
+{
2822
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2823
+
2824
+ if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2825
+ obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2826
+ return true;
2827
+ }
2828
+
2829
+ dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2830
+ obj_req->ex.oe_objno);
2831
+ return false;
2832
+}
2833
+
2834
+static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2835
+{
2836
+ struct ceph_osd_request *osd_req;
2837
+ int ret;
2838
+
2839
+ osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2840
+ if (IS_ERR(osd_req))
2841
+ return PTR_ERR(osd_req);
2842
+
2843
+ osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2844
+ obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2845
+ rbd_osd_setup_data(osd_req, 0);
2846
+ rbd_osd_format_read(osd_req);
2847
+
2848
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2849
+ if (ret)
2850
+ return ret;
2851
+
2852
+ rbd_osd_submit(osd_req);
2853
+ return 0;
22492854 }
22502855
22512856 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
22522857 {
22532858 struct rbd_img_request *img_req = obj_req->img_request;
2859
+ struct rbd_device *parent = img_req->rbd_dev->parent;
22542860 struct rbd_img_request *child_img_req;
22552861 int ret;
22562862
2257
- child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2258
- OBJ_OP_READ, NULL);
2863
+ child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
22592864 if (!child_img_req)
22602865 return -ENOMEM;
22612866
2867
+ rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
22622868 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
22632869 child_img_req->obj_request = obj_req;
2870
+
2871
+ down_read(&parent->header_rwsem);
2872
+ rbd_img_capture_header(child_img_req);
2873
+ up_read(&parent->header_rwsem);
2874
+
2875
+ dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2876
+ obj_req);
22642877
22652878 if (!rbd_img_is_write(img_req)) {
22662879 switch (img_req->data_type) {
....@@ -2278,7 +2891,7 @@
22782891 &obj_req->bvec_pos);
22792892 break;
22802893 default:
2281
- rbd_assert(0);
2894
+ BUG();
22822895 }
22832896 } else {
22842897 ret = rbd_img_fill_from_bvecs(child_img_req,
....@@ -2287,55 +2900,159 @@
22872900 obj_req->copyup_bvecs);
22882901 }
22892902 if (ret) {
2290
- rbd_img_request_put(child_img_req);
2903
+ rbd_img_request_destroy(child_img_req);
22912904 return ret;
22922905 }
22932906
2294
- rbd_img_request_submit(child_img_req);
2907
+ /* avoid parent chain recursion */
2908
+ rbd_img_schedule(child_img_req, 0);
22952909 return 0;
22962910 }
22972911
2298
-static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2912
+static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
22992913 {
23002914 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
23012915 int ret;
23022916
2303
- if (obj_req->result == -ENOENT &&
2304
- rbd_dev->parent_overlap && !obj_req->tried_parent) {
2305
- /* reverse map this object extent onto the parent */
2306
- ret = rbd_obj_calc_img_extents(obj_req, false);
2917
+again:
2918
+ switch (obj_req->read_state) {
2919
+ case RBD_OBJ_READ_START:
2920
+ rbd_assert(!*result);
2921
+
2922
+ if (!rbd_obj_may_exist(obj_req)) {
2923
+ *result = -ENOENT;
2924
+ obj_req->read_state = RBD_OBJ_READ_OBJECT;
2925
+ goto again;
2926
+ }
2927
+
2928
+ ret = rbd_obj_read_object(obj_req);
23072929 if (ret) {
2308
- obj_req->result = ret;
2930
+ *result = ret;
23092931 return true;
23102932 }
2311
-
2312
- if (obj_req->num_img_extents) {
2313
- obj_req->tried_parent = true;
2314
- ret = rbd_obj_read_from_parent(obj_req);
2933
+ obj_req->read_state = RBD_OBJ_READ_OBJECT;
2934
+ return false;
2935
+ case RBD_OBJ_READ_OBJECT:
2936
+ if (*result == -ENOENT && rbd_dev->parent_overlap) {
2937
+ /* reverse map this object extent onto the parent */
2938
+ ret = rbd_obj_calc_img_extents(obj_req, false);
23152939 if (ret) {
2316
- obj_req->result = ret;
2940
+ *result = ret;
23172941 return true;
23182942 }
2319
- return false;
2943
+ if (obj_req->num_img_extents) {
2944
+ ret = rbd_obj_read_from_parent(obj_req);
2945
+ if (ret) {
2946
+ *result = ret;
2947
+ return true;
2948
+ }
2949
+ obj_req->read_state = RBD_OBJ_READ_PARENT;
2950
+ return false;
2951
+ }
23202952 }
2953
+
2954
+ /*
2955
+ * -ENOENT means a hole in the image -- zero-fill the entire
2956
+ * length of the request. A short read also implies zero-fill
2957
+ * to the end of the request.
2958
+ */
2959
+ if (*result == -ENOENT) {
2960
+ rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2961
+ *result = 0;
2962
+ } else if (*result >= 0) {
2963
+ if (*result < obj_req->ex.oe_len)
2964
+ rbd_obj_zero_range(obj_req, *result,
2965
+ obj_req->ex.oe_len - *result);
2966
+ else
2967
+ rbd_assert(*result == obj_req->ex.oe_len);
2968
+ *result = 0;
2969
+ }
2970
+ return true;
2971
+ case RBD_OBJ_READ_PARENT:
2972
+ /*
2973
+ * The parent image is read only up to the overlap -- zero-fill
2974
+ * from the overlap to the end of the request.
2975
+ */
2976
+ if (!*result) {
2977
+ u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2978
+
2979
+ if (obj_overlap < obj_req->ex.oe_len)
2980
+ rbd_obj_zero_range(obj_req, obj_overlap,
2981
+ obj_req->ex.oe_len - obj_overlap);
2982
+ }
2983
+ return true;
2984
+ default:
2985
+ BUG();
2986
+ }
2987
+}
2988
+
2989
+static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2990
+{
2991
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2992
+
2993
+ if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2994
+ obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2995
+
2996
+ if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2997
+ (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2998
+ dout("%s %p noop for nonexistent\n", __func__, obj_req);
2999
+ return true;
23213000 }
23223001
2323
- /*
2324
- * -ENOENT means a hole in the image -- zero-fill the entire
2325
- * length of the request. A short read also implies zero-fill
2326
- * to the end of the request. In both cases we update xferred
2327
- * count to indicate the whole request was satisfied.
2328
- */
2329
- if (obj_req->result == -ENOENT ||
2330
- (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2331
- rbd_assert(!obj_req->xferred || !obj_req->result);
2332
- rbd_obj_zero_range(obj_req, obj_req->xferred,
2333
- obj_req->ex.oe_len - obj_req->xferred);
2334
- obj_req->result = 0;
2335
- obj_req->xferred = obj_req->ex.oe_len;
3002
+ return false;
3003
+}
3004
+
3005
+/*
3006
+ * Return:
3007
+ * 0 - object map update sent
3008
+ * 1 - object map update isn't needed
3009
+ * <0 - error
3010
+ */
3011
+static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3012
+{
3013
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3014
+ u8 new_state;
3015
+
3016
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3017
+ return 1;
3018
+
3019
+ if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3020
+ new_state = OBJECT_PENDING;
3021
+ else
3022
+ new_state = OBJECT_EXISTS;
3023
+
3024
+ return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3025
+}
3026
+
3027
+static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3028
+{
3029
+ struct ceph_osd_request *osd_req;
3030
+ int num_ops = count_write_ops(obj_req);
3031
+ int which = 0;
3032
+ int ret;
3033
+
3034
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3035
+ num_ops++; /* stat */
3036
+
3037
+ osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3038
+ if (IS_ERR(osd_req))
3039
+ return PTR_ERR(osd_req);
3040
+
3041
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3042
+ ret = rbd_osd_setup_stat(osd_req, which++);
3043
+ if (ret)
3044
+ return ret;
23363045 }
23373046
2338
- return true;
3047
+ rbd_osd_setup_write_ops(osd_req, which);
3048
+ rbd_osd_format_write(osd_req);
3049
+
3050
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3051
+ if (ret)
3052
+ return ret;
3053
+
3054
+ rbd_osd_submit(osd_req);
3055
+ return 0;
23393056 }
23403057
23413058 /*
....@@ -2356,56 +3073,66 @@
23563073 return true;
23573074 }
23583075
2359
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
3076
+#define MODS_ONLY U32_MAX
3077
+
3078
+static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3079
+ u32 bytes)
23603080 {
2361
- unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
3081
+ struct ceph_osd_request *osd_req;
23623082 int ret;
23633083
23643084 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2365
- rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2366
- rbd_osd_req_destroy(obj_req->osd_req);
3085
+ rbd_assert(bytes > 0 && bytes != MODS_ONLY);
23673086
2368
- /*
2369
- * Create a copyup request with the same number of OSD ops as
2370
- * the original request. The original request was stat + op(s),
2371
- * the new copyup request will be copyup + the same op(s).
2372
- */
2373
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2374
- if (!obj_req->osd_req)
2375
- return -ENOMEM;
3087
+ osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3088
+ if (IS_ERR(osd_req))
3089
+ return PTR_ERR(osd_req);
23763090
2377
- ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2378
- "copyup");
3091
+ ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
23793092 if (ret)
23803093 return ret;
23813094
2382
- /*
2383
- * Only send non-zero copyup data to save some I/O and network
2384
- * bandwidth -- zero copyup data is equivalent to the object not
2385
- * existing.
2386
- */
2387
- if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2388
- dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2389
- bytes = 0;
2390
- }
2391
- osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2392
- obj_req->copyup_bvecs,
2393
- obj_req->copyup_bvec_count,
2394
- bytes);
3095
+ rbd_osd_format_write(osd_req);
23953096
2396
- switch (obj_req->img_request->op_type) {
2397
- case OBJ_OP_WRITE:
2398
- __rbd_obj_setup_write(obj_req, 1);
2399
- break;
2400
- case OBJ_OP_DISCARD:
2401
- rbd_assert(!rbd_obj_is_entire(obj_req));
2402
- __rbd_obj_setup_discard(obj_req, 1);
2403
- break;
2404
- default:
2405
- rbd_assert(0);
3097
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3098
+ if (ret)
3099
+ return ret;
3100
+
3101
+ rbd_osd_submit(osd_req);
3102
+ return 0;
3103
+}
3104
+
3105
+static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3106
+ u32 bytes)
3107
+{
3108
+ struct ceph_osd_request *osd_req;
3109
+ int num_ops = count_write_ops(obj_req);
3110
+ int which = 0;
3111
+ int ret;
3112
+
3113
+ dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3114
+
3115
+ if (bytes != MODS_ONLY)
3116
+ num_ops++; /* copyup */
3117
+
3118
+ osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3119
+ if (IS_ERR(osd_req))
3120
+ return PTR_ERR(osd_req);
3121
+
3122
+ if (bytes != MODS_ONLY) {
3123
+ ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3124
+ if (ret)
3125
+ return ret;
24063126 }
24073127
2408
- rbd_obj_request_submit(obj_req);
3128
+ rbd_osd_setup_write_ops(osd_req, which);
3129
+ rbd_osd_format_write(osd_req);
3130
+
3131
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3132
+ if (ret)
3133
+ return ret;
3134
+
3135
+ rbd_osd_submit(osd_req);
24093136 return 0;
24103137 }
24113138
....@@ -2437,7 +3164,12 @@
24373164 return 0;
24383165 }
24393166
2440
-static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
3167
+/*
3168
+ * The target object doesn't exist. Read the data for the entire
3169
+ * target object up to the overlap point (if any) from the parent,
3170
+ * so we can use it for a copyup.
3171
+ */
3172
+static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
24413173 {
24423174 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
24433175 int ret;
....@@ -2448,174 +3180,503 @@
24483180 if (!obj_req->num_img_extents) {
24493181 /*
24503182 * The overlap has become 0 (most likely because the
2451
- * image has been flattened). Use rbd_obj_issue_copyup()
2452
- * to re-submit the original write request -- the copyup
2453
- * operation itself will be a no-op, since someone must
2454
- * have populated the child object while we weren't
2455
- * looking. Move to WRITE_FLAT state as we'll be done
2456
- * with the operation once the null copyup completes.
3183
+ * image has been flattened). Re-submit the original write
3184
+ * request -- pass MODS_ONLY since the copyup isn't needed
3185
+ * anymore.
24573186 */
2458
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2459
- return rbd_obj_issue_copyup(obj_req, 0);
3187
+ return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
24603188 }
24613189
24623190 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
24633191 if (ret)
24643192 return ret;
24653193
2466
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
24673194 return rbd_obj_read_from_parent(obj_req);
24683195 }
24693196
2470
-static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
3197
+static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
24713198 {
3199
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3200
+ struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3201
+ u8 new_state;
3202
+ u32 i;
3203
+ int ret;
3204
+
3205
+ rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3206
+
3207
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3208
+ return;
3209
+
3210
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3211
+ return;
3212
+
3213
+ for (i = 0; i < snapc->num_snaps; i++) {
3214
+ if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3215
+ i + 1 < snapc->num_snaps)
3216
+ new_state = OBJECT_EXISTS_CLEAN;
3217
+ else
3218
+ new_state = OBJECT_EXISTS;
3219
+
3220
+ ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3221
+ new_state, NULL);
3222
+ if (ret < 0) {
3223
+ obj_req->pending.result = ret;
3224
+ return;
3225
+ }
3226
+
3227
+ rbd_assert(!ret);
3228
+ obj_req->pending.num_pending++;
3229
+ }
3230
+}
3231
+
3232
+static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3233
+{
3234
+ u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3235
+ int ret;
3236
+
3237
+ rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3238
+
3239
+ /*
3240
+ * Only send non-zero copyup data to save some I/O and network
3241
+ * bandwidth -- zero copyup data is equivalent to the object not
3242
+ * existing.
3243
+ */
3244
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3245
+ bytes = 0;
3246
+
3247
+ if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3248
+ /*
3249
+ * Send a copyup request with an empty snapshot context to
3250
+ * deep-copyup the object through all existing snapshots.
3251
+ * A second request with the current snapshot context will be
3252
+ * sent for the actual modification.
3253
+ */
3254
+ ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3255
+ if (ret) {
3256
+ obj_req->pending.result = ret;
3257
+ return;
3258
+ }
3259
+
3260
+ obj_req->pending.num_pending++;
3261
+ bytes = MODS_ONLY;
3262
+ }
3263
+
3264
+ ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3265
+ if (ret) {
3266
+ obj_req->pending.result = ret;
3267
+ return;
3268
+ }
3269
+
3270
+ obj_req->pending.num_pending++;
3271
+}
3272
+
3273
+static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3274
+{
3275
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
24723276 int ret;
24733277
24743278 again:
2475
- switch (obj_req->write_state) {
2476
- case RBD_OBJ_WRITE_GUARD:
2477
- rbd_assert(!obj_req->xferred);
2478
- if (obj_req->result == -ENOENT) {
2479
- /*
2480
- * The target object doesn't exist. Read the data for
2481
- * the entire target object up to the overlap point (if
2482
- * any) from the parent, so we can use it for a copyup.
2483
- */
2484
- ret = rbd_obj_handle_write_guard(obj_req);
2485
- if (ret) {
2486
- obj_req->result = ret;
2487
- return true;
2488
- }
2489
- return false;
2490
- }
2491
- /* fall through */
2492
- case RBD_OBJ_WRITE_FLAT:
2493
- if (!obj_req->result)
2494
- /*
2495
- * There is no such thing as a successful short
2496
- * write -- indicate the whole request was satisfied.
2497
- */
2498
- obj_req->xferred = obj_req->ex.oe_len;
2499
- return true;
2500
- case RBD_OBJ_WRITE_COPYUP:
2501
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2502
- if (obj_req->result)
2503
- goto again;
3279
+ switch (obj_req->copyup_state) {
3280
+ case RBD_OBJ_COPYUP_START:
3281
+ rbd_assert(!*result);
25043282
2505
- rbd_assert(obj_req->xferred);
2506
- ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
3283
+ ret = rbd_obj_copyup_read_parent(obj_req);
25073284 if (ret) {
2508
- obj_req->result = ret;
2509
- obj_req->xferred = 0;
3285
+ *result = ret;
25103286 return true;
25113287 }
3288
+ if (obj_req->num_img_extents)
3289
+ obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3290
+ else
3291
+ obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
25123292 return false;
3293
+ case RBD_OBJ_COPYUP_READ_PARENT:
3294
+ if (*result)
3295
+ return true;
3296
+
3297
+ if (is_zero_bvecs(obj_req->copyup_bvecs,
3298
+ rbd_obj_img_extents_bytes(obj_req))) {
3299
+ dout("%s %p detected zeros\n", __func__, obj_req);
3300
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3301
+ }
3302
+
3303
+ rbd_obj_copyup_object_maps(obj_req);
3304
+ if (!obj_req->pending.num_pending) {
3305
+ *result = obj_req->pending.result;
3306
+ obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3307
+ goto again;
3308
+ }
3309
+ obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3310
+ return false;
3311
+ case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3312
+ if (!pending_result_dec(&obj_req->pending, result))
3313
+ return false;
3314
+ fallthrough;
3315
+ case RBD_OBJ_COPYUP_OBJECT_MAPS:
3316
+ if (*result) {
3317
+ rbd_warn(rbd_dev, "snap object map update failed: %d",
3318
+ *result);
3319
+ return true;
3320
+ }
3321
+
3322
+ rbd_obj_copyup_write_object(obj_req);
3323
+ if (!obj_req->pending.num_pending) {
3324
+ *result = obj_req->pending.result;
3325
+ obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3326
+ goto again;
3327
+ }
3328
+ obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3329
+ return false;
3330
+ case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3331
+ if (!pending_result_dec(&obj_req->pending, result))
3332
+ return false;
3333
+ fallthrough;
3334
+ case RBD_OBJ_COPYUP_WRITE_OBJECT:
3335
+ return true;
25133336 default:
25143337 BUG();
25153338 }
25163339 }
25173340
25183341 /*
2519
- * Returns true if @obj_req is completed, or false otherwise.
3342
+ * Return:
3343
+ * 0 - object map update sent
3344
+ * 1 - object map update isn't needed
3345
+ * <0 - error
25203346 */
2521
-static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
3347
+static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
25223348 {
2523
- switch (obj_req->img_request->op_type) {
2524
- case OBJ_OP_READ:
2525
- return rbd_obj_handle_read(obj_req);
2526
- case OBJ_OP_WRITE:
2527
- return rbd_obj_handle_write(obj_req);
2528
- case OBJ_OP_DISCARD:
2529
- if (rbd_obj_handle_write(obj_req)) {
2530
- /*
2531
- * Hide -ENOENT from delete/truncate/zero -- discarding
2532
- * a non-existent object is not a problem.
2533
- */
2534
- if (obj_req->result == -ENOENT) {
2535
- obj_req->result = 0;
2536
- obj_req->xferred = obj_req->ex.oe_len;
2537
- }
3349
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3350
+ u8 current_state = OBJECT_PENDING;
3351
+
3352
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3353
+ return 1;
3354
+
3355
+ if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3356
+ return 1;
3357
+
3358
+ return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3359
+ &current_state);
3360
+}
3361
+
3362
+static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3363
+{
3364
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3365
+ int ret;
3366
+
3367
+again:
3368
+ switch (obj_req->write_state) {
3369
+ case RBD_OBJ_WRITE_START:
3370
+ rbd_assert(!*result);
3371
+
3372
+ rbd_obj_set_copyup_enabled(obj_req);
3373
+ if (rbd_obj_write_is_noop(obj_req))
3374
+ return true;
3375
+
3376
+ ret = rbd_obj_write_pre_object_map(obj_req);
3377
+ if (ret < 0) {
3378
+ *result = ret;
25383379 return true;
25393380 }
3381
+ obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3382
+ if (ret > 0)
3383
+ goto again;
25403384 return false;
3385
+ case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3386
+ if (*result) {
3387
+ rbd_warn(rbd_dev, "pre object map update failed: %d",
3388
+ *result);
3389
+ return true;
3390
+ }
3391
+ ret = rbd_obj_write_object(obj_req);
3392
+ if (ret) {
3393
+ *result = ret;
3394
+ return true;
3395
+ }
3396
+ obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3397
+ return false;
3398
+ case RBD_OBJ_WRITE_OBJECT:
3399
+ if (*result == -ENOENT) {
3400
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3401
+ *result = 0;
3402
+ obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3403
+ obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3404
+ goto again;
3405
+ }
3406
+ /*
3407
+ * On a non-existent object:
3408
+ * delete - -ENOENT, truncate/zero - 0
3409
+ */
3410
+ if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3411
+ *result = 0;
3412
+ }
3413
+ if (*result)
3414
+ return true;
3415
+
3416
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3417
+ goto again;
3418
+ case __RBD_OBJ_WRITE_COPYUP:
3419
+ if (!rbd_obj_advance_copyup(obj_req, result))
3420
+ return false;
3421
+ fallthrough;
3422
+ case RBD_OBJ_WRITE_COPYUP:
3423
+ if (*result) {
3424
+ rbd_warn(rbd_dev, "copyup failed: %d", *result);
3425
+ return true;
3426
+ }
3427
+ ret = rbd_obj_write_post_object_map(obj_req);
3428
+ if (ret < 0) {
3429
+ *result = ret;
3430
+ return true;
3431
+ }
3432
+ obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3433
+ if (ret > 0)
3434
+ goto again;
3435
+ return false;
3436
+ case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3437
+ if (*result)
3438
+ rbd_warn(rbd_dev, "post object map update failed: %d",
3439
+ *result);
3440
+ return true;
25413441 default:
25423442 BUG();
25433443 }
25443444 }
25453445
2546
-static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
3446
+/*
3447
+ * Return true if @obj_req is completed.
3448
+ */
3449
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3450
+ int *result)
25473451 {
25483452 struct rbd_img_request *img_req = obj_req->img_request;
3453
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3454
+ bool done;
25493455
2550
- rbd_assert((!obj_req->result &&
2551
- obj_req->xferred == obj_req->ex.oe_len) ||
2552
- (obj_req->result < 0 && !obj_req->xferred));
2553
- if (!obj_req->result) {
2554
- img_req->xferred += obj_req->xferred;
2555
- return;
2556
- }
3456
+ mutex_lock(&obj_req->state_mutex);
3457
+ if (!rbd_img_is_write(img_req))
3458
+ done = rbd_obj_advance_read(obj_req, result);
3459
+ else
3460
+ done = rbd_obj_advance_write(obj_req, result);
3461
+ mutex_unlock(&obj_req->state_mutex);
25573462
2558
- rbd_warn(img_req->rbd_dev,
2559
- "%s at objno %llu %llu~%llu result %d xferred %llu",
2560
- obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2561
- obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2562
- obj_req->xferred);
2563
- if (!img_req->result) {
2564
- img_req->result = obj_req->result;
2565
- img_req->xferred = 0;
3463
+ if (done && *result) {
3464
+ rbd_assert(*result < 0);
3465
+ rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3466
+ obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3467
+ obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
25663468 }
3469
+ return done;
25673470 }
25683471
2569
-static void rbd_img_end_child_request(struct rbd_img_request *img_req)
3472
+/*
3473
+ * This is open-coded in rbd_img_handle_request() to avoid parent chain
3474
+ * recursion.
3475
+ */
3476
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
25703477 {
2571
- struct rbd_obj_request *obj_req = img_req->obj_request;
2572
-
2573
- rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2574
- rbd_assert((!img_req->result &&
2575
- img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2576
- (img_req->result < 0 && !img_req->xferred));
2577
-
2578
- obj_req->result = img_req->result;
2579
- obj_req->xferred = img_req->xferred;
2580
- rbd_img_request_put(img_req);
3478
+ if (__rbd_obj_handle_request(obj_req, &result))
3479
+ rbd_img_handle_request(obj_req->img_request, result);
25813480 }
25823481
2583
-static void rbd_img_end_request(struct rbd_img_request *img_req)
3482
+static bool need_exclusive_lock(struct rbd_img_request *img_req)
25843483 {
3484
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3485
+
3486
+ if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3487
+ return false;
3488
+
3489
+ if (rbd_is_ro(rbd_dev))
3490
+ return false;
3491
+
25853492 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2586
- rbd_assert((!img_req->result &&
2587
- img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2588
- (img_req->result < 0 && !img_req->xferred));
3493
+ if (rbd_dev->opts->lock_on_read ||
3494
+ (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3495
+ return true;
25893496
2590
- blk_mq_end_request(img_req->rq,
2591
- errno_to_blk_status(img_req->result));
2592
- rbd_img_request_put(img_req);
3497
+ return rbd_img_is_write(img_req);
25933498 }
25943499
2595
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
3500
+static bool rbd_lock_add_request(struct rbd_img_request *img_req)
25963501 {
2597
- struct rbd_img_request *img_req;
3502
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3503
+ bool locked;
3504
+
3505
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
3506
+ locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3507
+ spin_lock(&rbd_dev->lock_lists_lock);
3508
+ rbd_assert(list_empty(&img_req->lock_item));
3509
+ if (!locked)
3510
+ list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3511
+ else
3512
+ list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3513
+ spin_unlock(&rbd_dev->lock_lists_lock);
3514
+ return locked;
3515
+}
3516
+
3517
+static void rbd_lock_del_request(struct rbd_img_request *img_req)
3518
+{
3519
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3520
+ bool need_wakeup;
3521
+
3522
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
3523
+ spin_lock(&rbd_dev->lock_lists_lock);
3524
+ rbd_assert(!list_empty(&img_req->lock_item));
3525
+ list_del_init(&img_req->lock_item);
3526
+ need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3527
+ list_empty(&rbd_dev->running_list));
3528
+ spin_unlock(&rbd_dev->lock_lists_lock);
3529
+ if (need_wakeup)
3530
+ complete(&rbd_dev->releasing_wait);
3531
+}
3532
+
3533
+static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3534
+{
3535
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3536
+
3537
+ if (!need_exclusive_lock(img_req))
3538
+ return 1;
3539
+
3540
+ if (rbd_lock_add_request(img_req))
3541
+ return 1;
3542
+
3543
+ if (rbd_dev->opts->exclusive) {
3544
+ WARN_ON(1); /* lock got released? */
3545
+ return -EROFS;
3546
+ }
3547
+
3548
+ /*
3549
+ * Note the use of mod_delayed_work() in rbd_acquire_lock()
3550
+ * and cancel_delayed_work() in wake_lock_waiters().
3551
+ */
3552
+ dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3553
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3554
+ return 0;
3555
+}
3556
+
3557
+static void rbd_img_object_requests(struct rbd_img_request *img_req)
3558
+{
3559
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3560
+ struct rbd_obj_request *obj_req;
3561
+
3562
+ rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3563
+ rbd_assert(!need_exclusive_lock(img_req) ||
3564
+ __rbd_is_lock_owner(rbd_dev));
3565
+
3566
+ if (rbd_img_is_write(img_req)) {
3567
+ rbd_assert(!img_req->snapc);
3568
+ down_read(&rbd_dev->header_rwsem);
3569
+ img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
3570
+ up_read(&rbd_dev->header_rwsem);
3571
+ }
3572
+
3573
+ for_each_obj_request(img_req, obj_req) {
3574
+ int result = 0;
3575
+
3576
+ if (__rbd_obj_handle_request(obj_req, &result)) {
3577
+ if (result) {
3578
+ img_req->pending.result = result;
3579
+ return;
3580
+ }
3581
+ } else {
3582
+ img_req->pending.num_pending++;
3583
+ }
3584
+ }
3585
+}
3586
+
3587
+static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3588
+{
3589
+ int ret;
25983590
25993591 again:
2600
- if (!__rbd_obj_handle_request(obj_req))
2601
- return;
3592
+ switch (img_req->state) {
3593
+ case RBD_IMG_START:
3594
+ rbd_assert(!*result);
26023595
2603
- img_req = obj_req->img_request;
2604
- spin_lock(&img_req->completion_lock);
2605
- rbd_obj_end_request(obj_req);
2606
- rbd_assert(img_req->pending_count);
2607
- if (--img_req->pending_count) {
2608
- spin_unlock(&img_req->completion_lock);
2609
- return;
3596
+ ret = rbd_img_exclusive_lock(img_req);
3597
+ if (ret < 0) {
3598
+ *result = ret;
3599
+ return true;
3600
+ }
3601
+ img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3602
+ if (ret > 0)
3603
+ goto again;
3604
+ return false;
3605
+ case RBD_IMG_EXCLUSIVE_LOCK:
3606
+ if (*result)
3607
+ return true;
3608
+
3609
+ rbd_img_object_requests(img_req);
3610
+ if (!img_req->pending.num_pending) {
3611
+ *result = img_req->pending.result;
3612
+ img_req->state = RBD_IMG_OBJECT_REQUESTS;
3613
+ goto again;
3614
+ }
3615
+ img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3616
+ return false;
3617
+ case __RBD_IMG_OBJECT_REQUESTS:
3618
+ if (!pending_result_dec(&img_req->pending, result))
3619
+ return false;
3620
+ fallthrough;
3621
+ case RBD_IMG_OBJECT_REQUESTS:
3622
+ return true;
3623
+ default:
3624
+ BUG();
3625
+ }
3626
+}
3627
+
3628
+/*
3629
+ * Return true if @img_req is completed.
3630
+ */
3631
+static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3632
+ int *result)
3633
+{
3634
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3635
+ bool done;
3636
+
3637
+ if (need_exclusive_lock(img_req)) {
3638
+ down_read(&rbd_dev->lock_rwsem);
3639
+ mutex_lock(&img_req->state_mutex);
3640
+ done = rbd_img_advance(img_req, result);
3641
+ if (done)
3642
+ rbd_lock_del_request(img_req);
3643
+ mutex_unlock(&img_req->state_mutex);
3644
+ up_read(&rbd_dev->lock_rwsem);
3645
+ } else {
3646
+ mutex_lock(&img_req->state_mutex);
3647
+ done = rbd_img_advance(img_req, result);
3648
+ mutex_unlock(&img_req->state_mutex);
26103649 }
26113650
2612
- spin_unlock(&img_req->completion_lock);
3651
+ if (done && *result) {
3652
+ rbd_assert(*result < 0);
3653
+ rbd_warn(rbd_dev, "%s%s result %d",
3654
+ test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3655
+ obj_op_name(img_req->op_type), *result);
3656
+ }
3657
+ return done;
3658
+}
3659
+
3660
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3661
+{
3662
+again:
3663
+ if (!__rbd_img_handle_request(img_req, &result))
3664
+ return;
3665
+
26133666 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2614
- obj_req = img_req->obj_request;
2615
- rbd_img_end_child_request(img_req);
2616
- goto again;
3667
+ struct rbd_obj_request *obj_req = img_req->obj_request;
3668
+
3669
+ rbd_img_request_destroy(img_req);
3670
+ if (__rbd_obj_handle_request(obj_req, &result)) {
3671
+ img_req = obj_req->img_request;
3672
+ goto again;
3673
+ }
3674
+ } else {
3675
+ struct request *rq = blk_mq_rq_from_pdu(img_req);
3676
+
3677
+ rbd_img_request_destroy(img_req);
3678
+ blk_mq_end_request(rq, errno_to_blk_status(result));
26173679 }
2618
- rbd_img_end_request(img_req);
26193680 }
26203681
26213682 static const struct rbd_client_id rbd_empty_cid;
....@@ -2660,6 +3721,7 @@
26603721 {
26613722 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
26623723
3724
+ rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
26633725 strcpy(rbd_dev->lock_cookie, cookie);
26643726 rbd_set_owner_cid(rbd_dev, &cid);
26653727 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
....@@ -2681,10 +3743,9 @@
26813743 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
26823744 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
26833745 RBD_LOCK_TAG, "", 0);
2684
- if (ret)
3746
+ if (ret && ret != -EEXIST)
26853747 return ret;
26863748
2687
- rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
26883749 __rbd_lock(rbd_dev, cookie);
26893750 return 0;
26903751 }
....@@ -2703,7 +3764,7 @@
27033764 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
27043765 RBD_LOCK_NAME, rbd_dev->lock_cookie);
27053766 if (ret && ret != -ENOENT)
2706
- rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3767
+ rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
27073768
27083769 /* treat errors as the image is unlocked */
27093770 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
....@@ -2739,11 +3800,7 @@
27393800 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
27403801 enum rbd_notify_op notify_op)
27413802 {
2742
- struct page **reply_pages;
2743
- size_t reply_len;
2744
-
2745
- __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2746
- ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3803
+ __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
27473804 }
27483805
27493806 static void rbd_notify_acquired_lock(struct work_struct *work)
....@@ -2830,21 +3887,56 @@
28303887 goto out;
28313888 }
28323889
2833
-static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3890
+/*
3891
+ * Either image request state machine(s) or rbd_add_acquire_lock()
3892
+ * (i.e. "rbd map").
3893
+ */
3894
+static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
28343895 {
2835
- dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3896
+ struct rbd_img_request *img_req;
3897
+
3898
+ dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3899
+ lockdep_assert_held_write(&rbd_dev->lock_rwsem);
28363900
28373901 cancel_delayed_work(&rbd_dev->lock_dwork);
2838
- if (wake_all)
2839
- wake_up_all(&rbd_dev->lock_waitq);
2840
- else
2841
- wake_up(&rbd_dev->lock_waitq);
3902
+ if (!completion_done(&rbd_dev->acquire_wait)) {
3903
+ rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3904
+ list_empty(&rbd_dev->running_list));
3905
+ rbd_dev->acquire_err = result;
3906
+ complete_all(&rbd_dev->acquire_wait);
3907
+ return;
3908
+ }
3909
+
3910
+ list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3911
+ mutex_lock(&img_req->state_mutex);
3912
+ rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3913
+ rbd_img_schedule(img_req, result);
3914
+ mutex_unlock(&img_req->state_mutex);
3915
+ }
3916
+
3917
+ list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
28423918 }
28433919
2844
-static int get_lock_owner_info(struct rbd_device *rbd_dev,
2845
- struct ceph_locker **lockers, u32 *num_lockers)
3920
+static bool locker_equal(const struct ceph_locker *lhs,
3921
+ const struct ceph_locker *rhs)
3922
+{
3923
+ return lhs->id.name.type == rhs->id.name.type &&
3924
+ lhs->id.name.num == rhs->id.name.num &&
3925
+ !strcmp(lhs->id.cookie, rhs->id.cookie) &&
3926
+ ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr);
3927
+}
3928
+
3929
+static void free_locker(struct ceph_locker *locker)
3930
+{
3931
+ if (locker)
3932
+ ceph_free_lockers(locker, 1);
3933
+}
3934
+
3935
+static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
28463936 {
28473937 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3938
+ struct ceph_locker *lockers;
3939
+ u32 num_lockers;
28483940 u8 lock_type;
28493941 char *lock_tag;
28503942 int ret;
....@@ -2853,39 +3945,45 @@
28533945
28543946 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
28553947 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2856
- &lock_type, &lock_tag, lockers, num_lockers);
2857
- if (ret)
2858
- return ret;
3948
+ &lock_type, &lock_tag, &lockers, &num_lockers);
3949
+ if (ret) {
3950
+ rbd_warn(rbd_dev, "failed to get header lockers: %d", ret);
3951
+ return ERR_PTR(ret);
3952
+ }
28593953
2860
- if (*num_lockers == 0) {
3954
+ if (num_lockers == 0) {
28613955 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3956
+ lockers = NULL;
28623957 goto out;
28633958 }
28643959
28653960 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
28663961 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
28673962 lock_tag);
2868
- ret = -EBUSY;
2869
- goto out;
3963
+ goto err_busy;
28703964 }
28713965
28723966 if (lock_type == CEPH_CLS_LOCK_SHARED) {
28733967 rbd_warn(rbd_dev, "shared lock type detected");
2874
- ret = -EBUSY;
2875
- goto out;
3968
+ goto err_busy;
28763969 }
28773970
2878
- if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3971
+ WARN_ON(num_lockers != 1);
3972
+ if (strncmp(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
28793973 strlen(RBD_LOCK_COOKIE_PREFIX))) {
28803974 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2881
- (*lockers)[0].id.cookie);
2882
- ret = -EBUSY;
2883
- goto out;
3975
+ lockers[0].id.cookie);
3976
+ goto err_busy;
28843977 }
28853978
28863979 out:
28873980 kfree(lock_tag);
2888
- return ret;
3981
+ return lockers;
3982
+
3983
+err_busy:
3984
+ kfree(lock_tag);
3985
+ ceph_free_lockers(lockers, num_lockers);
3986
+ return ERR_PTR(-EBUSY);
28893987 }
28903988
28913989 static int find_watcher(struct rbd_device *rbd_dev,
....@@ -2901,13 +3999,19 @@
29013999 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
29024000 &rbd_dev->header_oloc, &watchers,
29034001 &num_watchers);
2904
- if (ret)
4002
+ if (ret) {
4003
+ rbd_warn(rbd_dev, "failed to get watchers: %d", ret);
29054004 return ret;
4005
+ }
29064006
29074007 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
29084008 for (i = 0; i < num_watchers; i++) {
2909
- if (!memcmp(&watchers[i].addr, &locker->info.addr,
2910
- sizeof(locker->info.addr)) &&
4009
+ /*
4010
+ * Ignore addr->type while comparing. This mimics
4011
+ * entity_addr_t::get_legacy_str() + strcmp().
4012
+ */
4013
+ if (ceph_addr_equal_no_type(&watchers[i].addr,
4014
+ &locker->info.addr) &&
29114015 watchers[i].cookie == cookie) {
29124016 struct rbd_client_id cid = {
29134017 .gid = le64_to_cpu(watchers[i].name.num),
....@@ -2935,104 +4039,160 @@
29354039 static int rbd_try_lock(struct rbd_device *rbd_dev)
29364040 {
29374041 struct ceph_client *client = rbd_dev->rbd_client->client;
2938
- struct ceph_locker *lockers;
2939
- u32 num_lockers;
4042
+ struct ceph_locker *locker, *refreshed_locker;
29404043 int ret;
29414044
29424045 for (;;) {
4046
+ locker = refreshed_locker = NULL;
4047
+
29434048 ret = rbd_lock(rbd_dev);
2944
- if (ret != -EBUSY)
2945
- return ret;
2946
-
2947
- /* determine if the current lock holder is still alive */
2948
- ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2949
- if (ret)
2950
- return ret;
2951
-
2952
- if (num_lockers == 0)
2953
- goto again;
2954
-
2955
- ret = find_watcher(rbd_dev, lockers);
2956
- if (ret) {
2957
- if (ret > 0)
2958
- ret = 0; /* have to request lock */
4049
+ if (!ret)
4050
+ goto out;
4051
+ if (ret != -EBUSY) {
4052
+ rbd_warn(rbd_dev, "failed to lock header: %d", ret);
29594053 goto out;
29604054 }
29614055
2962
- rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2963
- ENTITY_NAME(lockers[0].id.name));
4056
+ /* determine if the current lock holder is still alive */
4057
+ locker = get_lock_owner_info(rbd_dev);
4058
+ if (IS_ERR(locker)) {
4059
+ ret = PTR_ERR(locker);
4060
+ locker = NULL;
4061
+ goto out;
4062
+ }
4063
+ if (!locker)
4064
+ goto again;
29644065
2965
- ret = ceph_monc_blacklist_add(&client->monc,
2966
- &lockers[0].info.addr);
4066
+ ret = find_watcher(rbd_dev, locker);
4067
+ if (ret)
4068
+ goto out; /* request lock or error */
4069
+
4070
+ refreshed_locker = get_lock_owner_info(rbd_dev);
4071
+ if (IS_ERR(refreshed_locker)) {
4072
+ ret = PTR_ERR(refreshed_locker);
4073
+ refreshed_locker = NULL;
4074
+ goto out;
4075
+ }
4076
+ if (!refreshed_locker ||
4077
+ !locker_equal(locker, refreshed_locker))
4078
+ goto again;
4079
+
4080
+ rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4081
+ ENTITY_NAME(locker->id.name));
4082
+
4083
+ ret = ceph_monc_blocklist_add(&client->monc,
4084
+ &locker->info.addr);
29674085 if (ret) {
2968
- rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2969
- ENTITY_NAME(lockers[0].id.name), ret);
4086
+ rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d",
4087
+ ENTITY_NAME(locker->id.name), ret);
29704088 goto out;
29714089 }
29724090
29734091 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
29744092 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2975
- lockers[0].id.cookie,
2976
- &lockers[0].id.name);
2977
- if (ret && ret != -ENOENT)
4093
+ locker->id.cookie, &locker->id.name);
4094
+ if (ret && ret != -ENOENT) {
4095
+ rbd_warn(rbd_dev, "failed to break header lock: %d",
4096
+ ret);
29784097 goto out;
4098
+ }
29794099
29804100 again:
2981
- ceph_free_lockers(lockers, num_lockers);
4101
+ free_locker(refreshed_locker);
4102
+ free_locker(locker);
29824103 }
29834104
29844105 out:
2985
- ceph_free_lockers(lockers, num_lockers);
4106
+ free_locker(refreshed_locker);
4107
+ free_locker(locker);
29864108 return ret;
29874109 }
29884110
2989
-/*
2990
- * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2991
- */
2992
-static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2993
- int *pret)
4111
+static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
29944112 {
2995
- enum rbd_lock_state lock_state;
4113
+ int ret;
4114
+
4115
+ ret = rbd_dev_refresh(rbd_dev);
4116
+ if (ret)
4117
+ return ret;
4118
+
4119
+ if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4120
+ ret = rbd_object_map_open(rbd_dev);
4121
+ if (ret)
4122
+ return ret;
4123
+ }
4124
+
4125
+ return 0;
4126
+}
4127
+
4128
+/*
4129
+ * Return:
4130
+ * 0 - lock acquired
4131
+ * 1 - caller should call rbd_request_lock()
4132
+ * <0 - error
4133
+ */
4134
+static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4135
+{
4136
+ int ret;
29964137
29974138 down_read(&rbd_dev->lock_rwsem);
29984139 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
29994140 rbd_dev->lock_state);
30004141 if (__rbd_is_lock_owner(rbd_dev)) {
3001
- lock_state = rbd_dev->lock_state;
30024142 up_read(&rbd_dev->lock_rwsem);
3003
- return lock_state;
4143
+ return 0;
30044144 }
30054145
30064146 up_read(&rbd_dev->lock_rwsem);
30074147 down_write(&rbd_dev->lock_rwsem);
30084148 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
30094149 rbd_dev->lock_state);
3010
- if (!__rbd_is_lock_owner(rbd_dev)) {
3011
- *pret = rbd_try_lock(rbd_dev);
3012
- if (*pret)
3013
- rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
4150
+ if (__rbd_is_lock_owner(rbd_dev)) {
4151
+ up_write(&rbd_dev->lock_rwsem);
4152
+ return 0;
30144153 }
30154154
3016
- lock_state = rbd_dev->lock_state;
4155
+ ret = rbd_try_lock(rbd_dev);
4156
+ if (ret < 0) {
4157
+ rbd_warn(rbd_dev, "failed to acquire lock: %d", ret);
4158
+ goto out;
4159
+ }
4160
+ if (ret > 0) {
4161
+ up_write(&rbd_dev->lock_rwsem);
4162
+ return ret;
4163
+ }
4164
+
4165
+ rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4166
+ rbd_assert(list_empty(&rbd_dev->running_list));
4167
+
4168
+ ret = rbd_post_acquire_action(rbd_dev);
4169
+ if (ret) {
4170
+ rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4171
+ /*
4172
+ * Can't stay in RBD_LOCK_STATE_LOCKED because
4173
+ * rbd_lock_add_request() would let the request through,
4174
+ * assuming that e.g. object map is locked and loaded.
4175
+ */
4176
+ rbd_unlock(rbd_dev);
4177
+ }
4178
+
4179
+out:
4180
+ wake_lock_waiters(rbd_dev, ret);
30174181 up_write(&rbd_dev->lock_rwsem);
3018
- return lock_state;
4182
+ return ret;
30194183 }
30204184
30214185 static void rbd_acquire_lock(struct work_struct *work)
30224186 {
30234187 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
30244188 struct rbd_device, lock_dwork);
3025
- enum rbd_lock_state lock_state;
3026
- int ret = 0;
4189
+ int ret;
30274190
30284191 dout("%s rbd_dev %p\n", __func__, rbd_dev);
30294192 again:
3030
- lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3031
- if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3032
- if (lock_state == RBD_LOCK_STATE_LOCKED)
3033
- wake_requests(rbd_dev, true);
3034
- dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3035
- rbd_dev, lock_state, ret);
4193
+ ret = rbd_try_acquire_lock(rbd_dev);
4194
+ if (ret <= 0) {
4195
+ dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
30364196 return;
30374197 }
30384198
....@@ -3041,16 +4201,9 @@
30414201 goto again; /* treat this as a dead client */
30424202 } else if (ret == -EROFS) {
30434203 rbd_warn(rbd_dev, "peer will not release lock");
3044
- /*
3045
- * If this is rbd_add_acquire_lock(), we want to fail
3046
- * immediately -- reuse BLACKLISTED flag. Otherwise we
3047
- * want to block.
3048
- */
3049
- if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3050
- set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3051
- /* wake "rbd map --exclusive" process */
3052
- wake_requests(rbd_dev, false);
3053
- }
4204
+ down_write(&rbd_dev->lock_rwsem);
4205
+ wake_lock_waiters(rbd_dev, ret);
4206
+ up_write(&rbd_dev->lock_rwsem);
30544207 } else if (ret < 0) {
30554208 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
30564209 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
....@@ -3060,50 +4213,72 @@
30604213 * lock owner acked, but resend if we don't see them
30614214 * release the lock
30624215 */
3063
- dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
4216
+ dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
30644217 rbd_dev);
30654218 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
30664219 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
30674220 }
30684221 }
30694222
3070
-/*
3071
- * lock_rwsem must be held for write
3072
- */
3073
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
4223
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
30744224 {
3075
- dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3076
- rbd_dev->lock_state);
4225
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
4226
+ lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4227
+
30774228 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
30784229 return false;
30794230
3080
- rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3081
- downgrade_write(&rbd_dev->lock_rwsem);
30824231 /*
30834232 * Ensure that all in-flight IO is flushed.
3084
- *
3085
- * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3086
- * may be shared with other devices.
30874233 */
3088
- ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3089
- up_read(&rbd_dev->lock_rwsem);
4234
+ rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4235
+ rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4236
+ if (list_empty(&rbd_dev->running_list))
4237
+ return true;
4238
+
4239
+ up_write(&rbd_dev->lock_rwsem);
4240
+ wait_for_completion(&rbd_dev->releasing_wait);
30904241
30914242 down_write(&rbd_dev->lock_rwsem);
3092
- dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3093
- rbd_dev->lock_state);
30944243 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
30954244 return false;
30964245
4246
+ rbd_assert(list_empty(&rbd_dev->running_list));
4247
+ return true;
4248
+}
4249
+
4250
+static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4251
+{
4252
+ if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4253
+ rbd_object_map_close(rbd_dev);
4254
+}
4255
+
4256
+static void __rbd_release_lock(struct rbd_device *rbd_dev)
4257
+{
4258
+ rbd_assert(list_empty(&rbd_dev->running_list));
4259
+
4260
+ rbd_pre_release_action(rbd_dev);
30974261 rbd_unlock(rbd_dev);
4262
+}
4263
+
4264
+/*
4265
+ * lock_rwsem must be held for write
4266
+ */
4267
+static void rbd_release_lock(struct rbd_device *rbd_dev)
4268
+{
4269
+ if (!rbd_quiesce_lock(rbd_dev))
4270
+ return;
4271
+
4272
+ __rbd_release_lock(rbd_dev);
4273
+
30984274 /*
30994275 * Give others a chance to grab the lock - we would re-acquire
3100
- * almost immediately if we got new IO during ceph_osdc_sync()
3101
- * otherwise. We need to ack our own notifications, so this
3102
- * lock_dwork will be requeued from rbd_wait_state_locked()
3103
- * after wake_requests() in rbd_handle_released_lock().
4276
+ * almost immediately if we got new IO while draining the running
4277
+ * list otherwise. We need to ack our own notifications, so this
4278
+ * lock_dwork will be requeued from rbd_handle_released_lock() by
4279
+ * way of maybe_kick_acquire().
31044280 */
31054281 cancel_delayed_work(&rbd_dev->lock_dwork);
3106
- return true;
31074282 }
31084283
31094284 static void rbd_release_lock_work(struct work_struct *work)
....@@ -3114,6 +4289,23 @@
31144289 down_write(&rbd_dev->lock_rwsem);
31154290 rbd_release_lock(rbd_dev);
31164291 up_write(&rbd_dev->lock_rwsem);
4292
+}
4293
+
4294
+static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4295
+{
4296
+ bool have_requests;
4297
+
4298
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
4299
+ if (__rbd_is_lock_owner(rbd_dev))
4300
+ return;
4301
+
4302
+ spin_lock(&rbd_dev->lock_lists_lock);
4303
+ have_requests = !list_empty(&rbd_dev->acquiring_list);
4304
+ spin_unlock(&rbd_dev->lock_lists_lock);
4305
+ if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4306
+ dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4307
+ mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4308
+ }
31174309 }
31184310
31194311 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
....@@ -3131,22 +4323,17 @@
31314323 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
31324324 down_write(&rbd_dev->lock_rwsem);
31334325 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3134
- /*
3135
- * we already know that the remote client is
3136
- * the owner
3137
- */
3138
- up_write(&rbd_dev->lock_rwsem);
3139
- return;
4326
+ dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4327
+ __func__, rbd_dev, cid.gid, cid.handle);
4328
+ } else {
4329
+ rbd_set_owner_cid(rbd_dev, &cid);
31404330 }
3141
-
3142
- rbd_set_owner_cid(rbd_dev, &cid);
31434331 downgrade_write(&rbd_dev->lock_rwsem);
31444332 } else {
31454333 down_read(&rbd_dev->lock_rwsem);
31464334 }
31474335
3148
- if (!__rbd_is_lock_owner(rbd_dev))
3149
- wake_requests(rbd_dev, false);
4336
+ maybe_kick_acquire(rbd_dev);
31504337 up_read(&rbd_dev->lock_rwsem);
31514338 }
31524339
....@@ -3165,21 +4352,18 @@
31654352 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
31664353 down_write(&rbd_dev->lock_rwsem);
31674354 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3168
- dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4355
+ dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
31694356 __func__, rbd_dev, cid.gid, cid.handle,
31704357 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3171
- up_write(&rbd_dev->lock_rwsem);
3172
- return;
4358
+ } else {
4359
+ rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
31734360 }
3174
-
3175
- rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
31764361 downgrade_write(&rbd_dev->lock_rwsem);
31774362 } else {
31784363 down_read(&rbd_dev->lock_rwsem);
31794364 }
31804365
3181
- if (!__rbd_is_lock_owner(rbd_dev))
3182
- wake_requests(rbd_dev, false);
4366
+ maybe_kick_acquire(rbd_dev);
31834367 up_read(&rbd_dev->lock_rwsem);
31844368 }
31854369
....@@ -3433,7 +4617,6 @@
34334617 */
34344618 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
34354619 {
3436
- WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
34374620 cancel_tasks_sync(rbd_dev);
34384621
34394622 mutex_lock(&rbd_dev->watch_mutex);
....@@ -3455,7 +4638,8 @@
34554638 char cookie[32];
34564639 int ret;
34574640
3458
- WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4641
+ if (!rbd_quiesce_lock(rbd_dev))
4642
+ return;
34594643
34604644 format_lock_cookie(rbd_dev, cookie);
34614645 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
....@@ -3471,11 +4655,11 @@
34714655 * Lock cookie cannot be updated on older OSDs, so do
34724656 * a manual release and queue an acquire.
34734657 */
3474
- if (rbd_release_lock(rbd_dev))
3475
- queue_delayed_work(rbd_dev->task_wq,
3476
- &rbd_dev->lock_dwork, 0);
4658
+ __rbd_release_lock(rbd_dev);
4659
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
34774660 } else {
34784661 __rbd_lock(rbd_dev, cookie);
4662
+ wake_lock_waiters(rbd_dev, 0);
34794663 }
34804664 }
34814665
....@@ -3496,15 +4680,18 @@
34964680 ret = __rbd_register_watch(rbd_dev);
34974681 if (ret) {
34984682 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3499
- if (ret == -EBLACKLISTED || ret == -ENOENT) {
3500
- set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3501
- wake_requests(rbd_dev, true);
3502
- } else {
4683
+ if (ret != -EBLOCKLISTED && ret != -ENOENT) {
35034684 queue_delayed_work(rbd_dev->task_wq,
35044685 &rbd_dev->watch_dwork,
35054686 RBD_RETRY_DELAY);
4687
+ mutex_unlock(&rbd_dev->watch_mutex);
4688
+ return;
35064689 }
4690
+
35074691 mutex_unlock(&rbd_dev->watch_mutex);
4692
+ down_write(&rbd_dev->lock_rwsem);
4693
+ wake_lock_waiters(rbd_dev, ret);
4694
+ up_write(&rbd_dev->lock_rwsem);
35084695 return;
35094696 }
35104697
....@@ -3567,7 +4754,7 @@
35674754
35684755 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
35694756 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3570
- reply_page, &inbound_size);
4757
+ &reply_page, &inbound_size);
35714758 if (!ret) {
35724759 memcpy(inbound, page_address(reply_page), inbound_size);
35734760 ret = inbound_size;
....@@ -3579,71 +4766,74 @@
35794766 return ret;
35804767 }
35814768
3582
-/*
3583
- * lock_rwsem must be held for read
3584
- */
3585
-static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
3586
-{
3587
- DEFINE_WAIT(wait);
3588
- unsigned long timeout;
3589
- int ret = 0;
3590
-
3591
- if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3592
- return -EBLACKLISTED;
3593
-
3594
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3595
- return 0;
3596
-
3597
- if (!may_acquire) {
3598
- rbd_warn(rbd_dev, "exclusive lock required");
3599
- return -EROFS;
3600
- }
3601
-
3602
- do {
3603
- /*
3604
- * Note the use of mod_delayed_work() in rbd_acquire_lock()
3605
- * and cancel_delayed_work() in wake_requests().
3606
- */
3607
- dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3608
- queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3609
- prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3610
- TASK_UNINTERRUPTIBLE);
3611
- up_read(&rbd_dev->lock_rwsem);
3612
- timeout = schedule_timeout(ceph_timeout_jiffies(
3613
- rbd_dev->opts->lock_timeout));
3614
- down_read(&rbd_dev->lock_rwsem);
3615
- if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3616
- ret = -EBLACKLISTED;
3617
- break;
3618
- }
3619
- if (!timeout) {
3620
- rbd_warn(rbd_dev, "timed out waiting for lock");
3621
- ret = -ETIMEDOUT;
3622
- break;
3623
- }
3624
- } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3625
-
3626
- finish_wait(&rbd_dev->lock_waitq, &wait);
3627
- return ret;
3628
-}
3629
-
36304769 static void rbd_queue_workfn(struct work_struct *work)
36314770 {
3632
- struct request *rq = blk_mq_rq_from_pdu(work);
3633
- struct rbd_device *rbd_dev = rq->q->queuedata;
3634
- struct rbd_img_request *img_request;
3635
- struct ceph_snap_context *snapc = NULL;
4771
+ struct rbd_img_request *img_request =
4772
+ container_of(work, struct rbd_img_request, work);
4773
+ struct rbd_device *rbd_dev = img_request->rbd_dev;
4774
+ enum obj_operation_type op_type = img_request->op_type;
4775
+ struct request *rq = blk_mq_rq_from_pdu(img_request);
36364776 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
36374777 u64 length = blk_rq_bytes(rq);
3638
- enum obj_operation_type op_type;
36394778 u64 mapping_size;
3640
- bool must_be_locked;
36414779 int result;
36424780
3643
- switch (req_op(rq)) {
4781
+ /* Ignore/skip any zero-length requests */
4782
+ if (!length) {
4783
+ dout("%s: zero-length request\n", __func__);
4784
+ result = 0;
4785
+ goto err_img_request;
4786
+ }
4787
+
4788
+ blk_mq_start_request(rq);
4789
+
4790
+ down_read(&rbd_dev->header_rwsem);
4791
+ mapping_size = rbd_dev->mapping.size;
4792
+ rbd_img_capture_header(img_request);
4793
+ up_read(&rbd_dev->header_rwsem);
4794
+
4795
+ if (offset + length > mapping_size) {
4796
+ rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4797
+ length, mapping_size);
4798
+ result = -EIO;
4799
+ goto err_img_request;
4800
+ }
4801
+
4802
+ dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4803
+ img_request, obj_op_name(op_type), offset, length);
4804
+
4805
+ if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4806
+ result = rbd_img_fill_nodata(img_request, offset, length);
4807
+ else
4808
+ result = rbd_img_fill_from_bio(img_request, offset, length,
4809
+ rq->bio);
4810
+ if (result)
4811
+ goto err_img_request;
4812
+
4813
+ rbd_img_handle_request(img_request, 0);
4814
+ return;
4815
+
4816
+err_img_request:
4817
+ rbd_img_request_destroy(img_request);
4818
+ if (result)
4819
+ rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4820
+ obj_op_name(op_type), length, offset, result);
4821
+ blk_mq_end_request(rq, errno_to_blk_status(result));
4822
+}
4823
+
4824
+static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4825
+ const struct blk_mq_queue_data *bd)
4826
+{
4827
+ struct rbd_device *rbd_dev = hctx->queue->queuedata;
4828
+ struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4829
+ enum obj_operation_type op_type;
4830
+
4831
+ switch (req_op(bd->rq)) {
36444832 case REQ_OP_DISCARD:
3645
- case REQ_OP_WRITE_ZEROES:
36464833 op_type = OBJ_OP_DISCARD;
4834
+ break;
4835
+ case REQ_OP_WRITE_ZEROES:
4836
+ op_type = OBJ_OP_ZEROOUT;
36474837 break;
36484838 case REQ_OP_WRITE:
36494839 op_type = OBJ_OP_WRITE;
....@@ -3652,112 +4842,23 @@
36524842 op_type = OBJ_OP_READ;
36534843 break;
36544844 default:
3655
- dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3656
- result = -EIO;
3657
- goto err;
4845
+ rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4846
+ return BLK_STS_IOERR;
36584847 }
36594848
3660
- /* Ignore/skip any zero-length requests */
4849
+ rbd_img_request_init(img_req, rbd_dev, op_type);
36614850
3662
- if (!length) {
3663
- dout("%s: zero-length request\n", __func__);
3664
- result = 0;
3665
- goto err_rq;
4851
+ if (rbd_img_is_write(img_req)) {
4852
+ if (rbd_is_ro(rbd_dev)) {
4853
+ rbd_warn(rbd_dev, "%s on read-only mapping",
4854
+ obj_op_name(img_req->op_type));
4855
+ return BLK_STS_IOERR;
4856
+ }
4857
+ rbd_assert(!rbd_is_snap(rbd_dev));
36664858 }
36674859
3668
- rbd_assert(op_type == OBJ_OP_READ ||
3669
- rbd_dev->spec->snap_id == CEPH_NOSNAP);
3670
-
3671
- /*
3672
- * Quit early if the mapped snapshot no longer exists. It's
3673
- * still possible the snapshot will have disappeared by the
3674
- * time our request arrives at the osd, but there's no sense in
3675
- * sending it if we already know.
3676
- */
3677
- if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3678
- dout("request for non-existent snapshot");
3679
- rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3680
- result = -ENXIO;
3681
- goto err_rq;
3682
- }
3683
-
3684
- if (offset && length > U64_MAX - offset + 1) {
3685
- rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3686
- length);
3687
- result = -EINVAL;
3688
- goto err_rq; /* Shouldn't happen */
3689
- }
3690
-
3691
- blk_mq_start_request(rq);
3692
-
3693
- down_read(&rbd_dev->header_rwsem);
3694
- mapping_size = rbd_dev->mapping.size;
3695
- if (op_type != OBJ_OP_READ) {
3696
- snapc = rbd_dev->header.snapc;
3697
- ceph_get_snap_context(snapc);
3698
- }
3699
- up_read(&rbd_dev->header_rwsem);
3700
-
3701
- if (offset + length > mapping_size) {
3702
- rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3703
- length, mapping_size);
3704
- result = -EIO;
3705
- goto err_rq;
3706
- }
3707
-
3708
- must_be_locked =
3709
- (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3710
- (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
3711
- if (must_be_locked) {
3712
- down_read(&rbd_dev->lock_rwsem);
3713
- result = rbd_wait_state_locked(rbd_dev,
3714
- !rbd_dev->opts->exclusive);
3715
- if (result)
3716
- goto err_unlock;
3717
- }
3718
-
3719
- img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
3720
- if (!img_request) {
3721
- result = -ENOMEM;
3722
- goto err_unlock;
3723
- }
3724
- img_request->rq = rq;
3725
- snapc = NULL; /* img_request consumes a ref */
3726
-
3727
- if (op_type == OBJ_OP_DISCARD)
3728
- result = rbd_img_fill_nodata(img_request, offset, length);
3729
- else
3730
- result = rbd_img_fill_from_bio(img_request, offset, length,
3731
- rq->bio);
3732
- if (result)
3733
- goto err_img_request;
3734
-
3735
- rbd_img_request_submit(img_request);
3736
- if (must_be_locked)
3737
- up_read(&rbd_dev->lock_rwsem);
3738
- return;
3739
-
3740
-err_img_request:
3741
- rbd_img_request_put(img_request);
3742
-err_unlock:
3743
- if (must_be_locked)
3744
- up_read(&rbd_dev->lock_rwsem);
3745
-err_rq:
3746
- if (result)
3747
- rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3748
- obj_op_name(op_type), length, offset, result);
3749
- ceph_put_snap_context(snapc);
3750
-err:
3751
- blk_mq_end_request(rq, errno_to_blk_status(result));
3752
-}
3753
-
3754
-static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3755
- const struct blk_mq_queue_data *bd)
3756
-{
3757
- struct request *rq = bd->rq;
3758
- struct work_struct *work = blk_mq_rq_to_pdu(rq);
3759
-
3760
- queue_work(rbd_wq, work);
4860
+ INIT_WORK(&img_req->work, rbd_queue_workfn);
4861
+ queue_work(rbd_wq, &img_req->work);
37614862 return BLK_STS_OK;
37624863 }
37634864
....@@ -3789,10 +4890,6 @@
37894890 ceph_oloc_copy(&req->r_base_oloc, oloc);
37904891 req->r_flags = CEPH_OSD_FLAG_READ;
37914892
3792
- ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3793
- if (ret)
3794
- goto out_req;
3795
-
37964893 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
37974894 if (IS_ERR(pages)) {
37984895 ret = PTR_ERR(pages);
....@@ -3802,6 +4899,10 @@
38024899 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
38034900 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
38044901 true);
4902
+
4903
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4904
+ if (ret)
4905
+ goto out_req;
38054906
38064907 ceph_osdc_start_request(osdc, req, false);
38074908 ret = ceph_osdc_wait_request(osdc, req);
....@@ -3818,7 +4919,9 @@
38184919 * return, the rbd_dev->header field will contain up-to-date
38194920 * information about the image.
38204921 */
3821
-static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4922
+static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
4923
+ struct rbd_image_header *header,
4924
+ bool first_time)
38224925 {
38234926 struct rbd_image_header_ondisk *ondisk = NULL;
38244927 u32 snap_count = 0;
....@@ -3866,30 +4969,11 @@
38664969 snap_count = le32_to_cpu(ondisk->snap_count);
38674970 } while (snap_count != want_count);
38684971
3869
- ret = rbd_header_from_disk(rbd_dev, ondisk);
4972
+ ret = rbd_header_from_disk(header, ondisk, first_time);
38704973 out:
38714974 kfree(ondisk);
38724975
38734976 return ret;
3874
-}
3875
-
3876
-/*
3877
- * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3878
- * has disappeared from the (just updated) snapshot context.
3879
- */
3880
-static void rbd_exists_validate(struct rbd_device *rbd_dev)
3881
-{
3882
- u64 snap_id;
3883
-
3884
- if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3885
- return;
3886
-
3887
- snap_id = rbd_dev->spec->snap_id;
3888
- if (snap_id == CEPH_NOSNAP)
3889
- return;
3890
-
3891
- if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3892
- clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
38934977 }
38944978
38954979 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
....@@ -3906,59 +4990,12 @@
39064990 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
39074991 dout("setting size to %llu sectors", (unsigned long long)size);
39084992 set_capacity(rbd_dev->disk, size);
3909
- revalidate_disk(rbd_dev->disk);
4993
+ revalidate_disk_size(rbd_dev->disk, true);
39104994 }
3911
-}
3912
-
3913
-static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3914
-{
3915
- u64 mapping_size;
3916
- int ret;
3917
-
3918
- down_write(&rbd_dev->header_rwsem);
3919
- mapping_size = rbd_dev->mapping.size;
3920
-
3921
- ret = rbd_dev_header_info(rbd_dev);
3922
- if (ret)
3923
- goto out;
3924
-
3925
- /*
3926
- * If there is a parent, see if it has disappeared due to the
3927
- * mapped image getting flattened.
3928
- */
3929
- if (rbd_dev->parent) {
3930
- ret = rbd_dev_v2_parent_info(rbd_dev);
3931
- if (ret)
3932
- goto out;
3933
- }
3934
-
3935
- if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3936
- rbd_dev->mapping.size = rbd_dev->header.image_size;
3937
- } else {
3938
- /* validate mapped snapshot's EXISTS flag */
3939
- rbd_exists_validate(rbd_dev);
3940
- }
3941
-
3942
-out:
3943
- up_write(&rbd_dev->header_rwsem);
3944
- if (!ret && mapping_size != rbd_dev->mapping.size)
3945
- rbd_dev_update_size(rbd_dev);
3946
-
3947
- return ret;
3948
-}
3949
-
3950
-static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3951
- unsigned int hctx_idx, unsigned int numa_node)
3952
-{
3953
- struct work_struct *work = blk_mq_rq_to_pdu(rq);
3954
-
3955
- INIT_WORK(work, rbd_queue_workfn);
3956
- return 0;
39574995 }
39584996
39594997 static const struct blk_mq_ops rbd_mq_ops = {
39604998 .queue_rq = rbd_queue_rq,
3961
- .init_request = rbd_init_request,
39624999 };
39635000
39645001 static int rbd_init_disk(struct rbd_device *rbd_dev)
....@@ -3989,9 +5026,9 @@
39895026 rbd_dev->tag_set.ops = &rbd_mq_ops;
39905027 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
39915028 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3992
- rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3993
- rbd_dev->tag_set.nr_hw_queues = 1;
3994
- rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5029
+ rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5030
+ rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
5031
+ rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
39955032
39965033 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
39975034 if (err)
....@@ -4010,18 +5047,18 @@
40105047 q->limits.max_sectors = queue_max_hw_sectors(q);
40115048 blk_queue_max_segments(q, USHRT_MAX);
40125049 blk_queue_max_segment_size(q, UINT_MAX);
4013
- blk_queue_io_min(q, objset_bytes);
4014
- blk_queue_io_opt(q, objset_bytes);
5050
+ blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5051
+ blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
40155052
40165053 if (rbd_dev->opts->trim) {
40175054 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4018
- q->limits.discard_granularity = objset_bytes;
5055
+ q->limits.discard_granularity = rbd_dev->opts->alloc_size;
40195056 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
40205057 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
40215058 }
40225059
40235060 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4024
- q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5061
+ blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
40255062
40265063 /*
40275064 * disk_release() expects a queue ref from add_disk() and will
....@@ -4059,17 +5096,12 @@
40595096 (unsigned long long)rbd_dev->mapping.size);
40605097 }
40615098
4062
-/*
4063
- * Note this shows the features for whatever's mapped, which is not
4064
- * necessarily the base image.
4065
- */
40665099 static ssize_t rbd_features_show(struct device *dev,
40675100 struct device_attribute *attr, char *buf)
40685101 {
40695102 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
40705103
4071
- return sprintf(buf, "0x%016llx\n",
4072
- (unsigned long long)rbd_dev->mapping.features);
5104
+ return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
40735105 }
40745106
40755107 static ssize_t rbd_major_show(struct device *dev,
....@@ -4381,8 +5413,7 @@
43815413 module_put(THIS_MODULE);
43825414 }
43835415
4384
-static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4385
- struct rbd_spec *spec)
5416
+static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
43865417 {
43875418 struct rbd_device *rbd_dev;
43885419
....@@ -4414,15 +5445,18 @@
44145445 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
44155446 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
44165447 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4417
- init_waitqueue_head(&rbd_dev->lock_waitq);
5448
+ spin_lock_init(&rbd_dev->lock_lists_lock);
5449
+ INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5450
+ INIT_LIST_HEAD(&rbd_dev->running_list);
5451
+ init_completion(&rbd_dev->acquire_wait);
5452
+ init_completion(&rbd_dev->releasing_wait);
5453
+
5454
+ spin_lock_init(&rbd_dev->object_map_lock);
44185455
44195456 rbd_dev->dev.bus = &rbd_bus_type;
44205457 rbd_dev->dev.type = &rbd_device_type;
44215458 rbd_dev->dev.parent = &rbd_root_dev;
44225459 device_initialize(&rbd_dev->dev);
4423
-
4424
- rbd_dev->rbd_client = rbdc;
4425
- rbd_dev->spec = spec;
44265460
44275461 return rbd_dev;
44285462 }
....@@ -4436,11 +5470,9 @@
44365470 {
44375471 struct rbd_device *rbd_dev;
44385472
4439
- rbd_dev = __rbd_dev_create(rbdc, spec);
5473
+ rbd_dev = __rbd_dev_create(spec);
44405474 if (!rbd_dev)
44415475 return NULL;
4442
-
4443
- rbd_dev->opts = opts;
44445476
44455477 /* get an id and fill in device name */
44465478 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
....@@ -4457,6 +5489,10 @@
44575489
44585490 /* we have a ref from do_rbd_add() */
44595491 __module_get(THIS_MODULE);
5492
+
5493
+ rbd_dev->rbd_client = rbdc;
5494
+ rbd_dev->spec = spec;
5495
+ rbd_dev->opts = opts;
44605496
44615497 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
44625498 return rbd_dev;
....@@ -4512,41 +5548,39 @@
45125548 return 0;
45135549 }
45145550
4515
-static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5551
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev,
5552
+ char **pobject_prefix)
45165553 {
4517
- return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4518
- &rbd_dev->header.obj_order,
4519
- &rbd_dev->header.image_size);
4520
-}
4521
-
4522
-static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4523
-{
5554
+ size_t size;
45245555 void *reply_buf;
5556
+ char *object_prefix;
45255557 int ret;
45265558 void *p;
45275559
4528
- reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5560
+ /* Response will be an encoded string, which includes a length */
5561
+ size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5562
+ reply_buf = kzalloc(size, GFP_KERNEL);
45295563 if (!reply_buf)
45305564 return -ENOMEM;
45315565
45325566 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
45335567 &rbd_dev->header_oloc, "get_object_prefix",
4534
- NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5568
+ NULL, 0, reply_buf, size);
45355569 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
45365570 if (ret < 0)
45375571 goto out;
45385572
45395573 p = reply_buf;
4540
- rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4541
- p + ret, NULL, GFP_NOIO);
5574
+ object_prefix = ceph_extract_encoded_string(&p, p + ret, NULL,
5575
+ GFP_NOIO);
5576
+ if (IS_ERR(object_prefix)) {
5577
+ ret = PTR_ERR(object_prefix);
5578
+ goto out;
5579
+ }
45425580 ret = 0;
45435581
4544
- if (IS_ERR(rbd_dev->header.object_prefix)) {
4545
- ret = PTR_ERR(rbd_dev->header.object_prefix);
4546
- rbd_dev->header.object_prefix = NULL;
4547
- } else {
4548
- dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4549
- }
5582
+ *pobject_prefix = object_prefix;
5583
+ dout(" object_prefix = %s\n", object_prefix);
45505584 out:
45515585 kfree(reply_buf);
45525586
....@@ -4554,9 +5588,12 @@
45545588 }
45555589
45565590 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4557
- u64 *snap_features)
5591
+ bool read_only, u64 *snap_features)
45585592 {
4559
- __le64 snapid = cpu_to_le64(snap_id);
5593
+ struct {
5594
+ __le64 snap_id;
5595
+ u8 read_only;
5596
+ } features_in;
45605597 struct {
45615598 __le64 features;
45625599 __le64 incompat;
....@@ -4564,9 +5601,12 @@
45645601 u64 unsup;
45655602 int ret;
45665603
5604
+ features_in.snap_id = cpu_to_le64(snap_id);
5605
+ features_in.read_only = read_only;
5606
+
45675607 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
45685608 &rbd_dev->header_oloc, "get_features",
4569
- &snapid, sizeof(snapid),
5609
+ &features_in, sizeof(features_in),
45705610 &features_buf, sizeof(features_buf));
45715611 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
45725612 if (ret < 0)
....@@ -4591,10 +5631,30 @@
45915631 return 0;
45925632 }
45935633
4594
-static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5634
+/*
5635
+ * These are generic image flags, but since they are used only for
5636
+ * object map, store them in rbd_dev->object_map_flags.
5637
+ *
5638
+ * For the same reason, this function is called only on object map
5639
+ * (re)load and not on header refresh.
5640
+ */
5641
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
45955642 {
4596
- return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4597
- &rbd_dev->header.features);
5643
+ __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5644
+ __le64 flags;
5645
+ int ret;
5646
+
5647
+ ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5648
+ &rbd_dev->header_oloc, "get_flags",
5649
+ &snapid, sizeof(snapid),
5650
+ &flags, sizeof(flags));
5651
+ if (ret < 0)
5652
+ return ret;
5653
+ if (ret < sizeof(flags))
5654
+ return -EBADMSG;
5655
+
5656
+ rbd_dev->object_map_flags = le64_to_cpu(flags);
5657
+ return 0;
45985658 }
45995659
46005660 struct parent_image_info {
....@@ -4606,6 +5666,14 @@
46065666 bool has_overlap;
46075667 u64 overlap;
46085668 };
5669
+
5670
+static void rbd_parent_info_cleanup(struct parent_image_info *pii)
5671
+{
5672
+ kfree(pii->pool_ns);
5673
+ kfree(pii->image_id);
5674
+
5675
+ memset(pii, 0, sizeof(*pii));
5676
+}
46095677
46105678 /*
46115679 * The caller is responsible for @pii.
....@@ -4654,7 +5722,7 @@
46545722
46555723 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
46565724 "rbd", "parent_get", CEPH_OSD_FLAG_READ,
4657
- req_page, sizeof(u64), reply_page, &reply_len);
5725
+ req_page, sizeof(u64), &reply_page, &reply_len);
46585726 if (ret)
46595727 return ret == -EOPNOTSUPP ? 1 : ret;
46605728
....@@ -4666,7 +5734,7 @@
46665734
46675735 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
46685736 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
4669
- req_page, sizeof(u64), reply_page, &reply_len);
5737
+ req_page, sizeof(u64), &reply_page, &reply_len);
46705738 if (ret)
46715739 return ret;
46725740
....@@ -4676,6 +5744,9 @@
46765744 if (pii->has_overlap)
46775745 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
46785746
5747
+ dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5748
+ __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
5749
+ pii->has_overlap, pii->overlap);
46795750 return 0;
46805751
46815752 e_inval:
....@@ -4697,7 +5768,7 @@
46975768
46985769 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
46995770 "rbd", "get_parent", CEPH_OSD_FLAG_READ,
4700
- req_page, sizeof(u64), reply_page, &reply_len);
5771
+ req_page, sizeof(u64), &reply_page, &reply_len);
47015772 if (ret)
47025773 return ret;
47035774
....@@ -4714,14 +5785,17 @@
47145785 pii->has_overlap = true;
47155786 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
47165787
5788
+ dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5789
+ __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
5790
+ pii->has_overlap, pii->overlap);
47175791 return 0;
47185792
47195793 e_inval:
47205794 return -EINVAL;
47215795 }
47225796
4723
-static int get_parent_info(struct rbd_device *rbd_dev,
4724
- struct parent_image_info *pii)
5797
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
5798
+ struct parent_image_info *pii)
47255799 {
47265800 struct page *req_page, *reply_page;
47275801 void *p;
....@@ -4749,7 +5823,7 @@
47495823 return ret;
47505824 }
47515825
4752
-static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5826
+static int rbd_dev_setup_parent(struct rbd_device *rbd_dev)
47535827 {
47545828 struct rbd_spec *parent_spec;
47555829 struct parent_image_info pii = { 0 };
....@@ -4759,37 +5833,12 @@
47595833 if (!parent_spec)
47605834 return -ENOMEM;
47615835
4762
- ret = get_parent_info(rbd_dev, &pii);
5836
+ ret = rbd_dev_v2_parent_info(rbd_dev, &pii);
47635837 if (ret)
47645838 goto out_err;
47655839
4766
- dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
4767
- __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
4768
- pii.has_overlap, pii.overlap);
4769
-
4770
- if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
4771
- /*
4772
- * Either the parent never existed, or we have
4773
- * record of it but the image got flattened so it no
4774
- * longer has a parent. When the parent of a
4775
- * layered image disappears we immediately set the
4776
- * overlap to 0. The effect of this is that all new
4777
- * requests will be treated as if the image had no
4778
- * parent.
4779
- *
4780
- * If !pii.has_overlap, the parent image spec is not
4781
- * applicable. It's there to avoid duplication in each
4782
- * snapshot record.
4783
- */
4784
- if (rbd_dev->parent_overlap) {
4785
- rbd_dev->parent_overlap = 0;
4786
- rbd_dev_parent_put(rbd_dev);
4787
- pr_info("%s: clone image has been flattened\n",
4788
- rbd_dev->disk->disk_name);
4789
- }
4790
-
5840
+ if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap)
47915841 goto out; /* No parent? No problem. */
4792
- }
47935842
47945843 /* The ceph file layout needs to fit pool id in 32 bits */
47955844
....@@ -4801,58 +5850,46 @@
48015850 }
48025851
48035852 /*
4804
- * The parent won't change (except when the clone is
4805
- * flattened, already handled that). So we only need to
4806
- * record the parent spec we have not already done so.
5853
+ * The parent won't change except when the clone is flattened,
5854
+ * so we only need to record the parent image spec once.
48075855 */
4808
- if (!rbd_dev->parent_spec) {
4809
- parent_spec->pool_id = pii.pool_id;
4810
- if (pii.pool_ns && *pii.pool_ns) {
4811
- parent_spec->pool_ns = pii.pool_ns;
4812
- pii.pool_ns = NULL;
4813
- }
4814
- parent_spec->image_id = pii.image_id;
4815
- pii.image_id = NULL;
4816
- parent_spec->snap_id = pii.snap_id;
4817
-
4818
- rbd_dev->parent_spec = parent_spec;
4819
- parent_spec = NULL; /* rbd_dev now owns this */
5856
+ parent_spec->pool_id = pii.pool_id;
5857
+ if (pii.pool_ns && *pii.pool_ns) {
5858
+ parent_spec->pool_ns = pii.pool_ns;
5859
+ pii.pool_ns = NULL;
48205860 }
5861
+ parent_spec->image_id = pii.image_id;
5862
+ pii.image_id = NULL;
5863
+ parent_spec->snap_id = pii.snap_id;
5864
+
5865
+ rbd_assert(!rbd_dev->parent_spec);
5866
+ rbd_dev->parent_spec = parent_spec;
5867
+ parent_spec = NULL; /* rbd_dev now owns this */
48215868
48225869 /*
4823
- * We always update the parent overlap. If it's zero we issue
4824
- * a warning, as we will proceed as if there was no parent.
5870
+ * Record the parent overlap. If it's zero, issue a warning as
5871
+ * we will proceed as if there is no parent.
48255872 */
4826
- if (!pii.overlap) {
4827
- if (parent_spec) {
4828
- /* refresh, careful to warn just once */
4829
- if (rbd_dev->parent_overlap)
4830
- rbd_warn(rbd_dev,
4831
- "clone now standalone (overlap became 0)");
4832
- } else {
4833
- /* initial probe */
4834
- rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4835
- }
4836
- }
5873
+ if (!pii.overlap)
5874
+ rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
48375875 rbd_dev->parent_overlap = pii.overlap;
48385876
48395877 out:
48405878 ret = 0;
48415879 out_err:
4842
- kfree(pii.pool_ns);
4843
- kfree(pii.image_id);
5880
+ rbd_parent_info_cleanup(&pii);
48445881 rbd_spec_put(parent_spec);
48455882 return ret;
48465883 }
48475884
4848
-static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5885
+static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev,
5886
+ u64 *stripe_unit, u64 *stripe_count)
48495887 {
48505888 struct {
48515889 __le64 stripe_unit;
48525890 __le64 stripe_count;
48535891 } __attribute__ ((packed)) striping_info_buf = { 0 };
48545892 size_t size = sizeof (striping_info_buf);
4855
- void *p;
48565893 int ret;
48575894
48585895 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
....@@ -4864,27 +5901,33 @@
48645901 if (ret < size)
48655902 return -ERANGE;
48665903
4867
- p = &striping_info_buf;
4868
- rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4869
- rbd_dev->header.stripe_count = ceph_decode_64(&p);
5904
+ *stripe_unit = le64_to_cpu(striping_info_buf.stripe_unit);
5905
+ *stripe_count = le64_to_cpu(striping_info_buf.stripe_count);
5906
+ dout(" stripe_unit = %llu stripe_count = %llu\n", *stripe_unit,
5907
+ *stripe_count);
5908
+
48705909 return 0;
48715910 }
48725911
4873
-static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5912
+static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev, s64 *data_pool_id)
48745913 {
4875
- __le64 data_pool_id;
5914
+ __le64 data_pool_buf;
48765915 int ret;
48775916
48785917 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
48795918 &rbd_dev->header_oloc, "get_data_pool",
4880
- NULL, 0, &data_pool_id, sizeof(data_pool_id));
5919
+ NULL, 0, &data_pool_buf,
5920
+ sizeof(data_pool_buf));
5921
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
48815922 if (ret < 0)
48825923 return ret;
4883
- if (ret < sizeof(data_pool_id))
5924
+ if (ret < sizeof(data_pool_buf))
48845925 return -EBADMSG;
48855926
4886
- rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4887
- WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5927
+ *data_pool_id = le64_to_cpu(data_pool_buf);
5928
+ dout(" data_pool_id = %lld\n", *data_pool_id);
5929
+ WARN_ON(*data_pool_id == CEPH_NOPOOL);
5930
+
48885931 return 0;
48895932 }
48905933
....@@ -5076,7 +6119,8 @@
50766119 return ret;
50776120 }
50786121
5079
-static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6122
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev,
6123
+ struct ceph_snap_context **psnapc)
50806124 {
50816125 size_t size;
50826126 int ret;
....@@ -5137,9 +6181,7 @@
51376181 for (i = 0; i < snap_count; i++)
51386182 snapc->snaps[i] = ceph_decode_64(&p);
51396183
5140
- ceph_put_snap_context(rbd_dev->header.snapc);
5141
- rbd_dev->header.snapc = snapc;
5142
-
6184
+ *psnapc = snapc;
51436185 dout(" snap context seq = %llu, snap_count = %u\n",
51446186 (unsigned long long)seq, (unsigned int)snap_count);
51456187 out:
....@@ -5188,38 +6230,42 @@
51886230 return snap_name;
51896231 }
51906232
5191
-static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6233
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev,
6234
+ struct rbd_image_header *header,
6235
+ bool first_time)
51926236 {
5193
- bool first_time = rbd_dev->header.object_prefix == NULL;
51946237 int ret;
51956238
5196
- ret = rbd_dev_v2_image_size(rbd_dev);
6239
+ ret = _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
6240
+ first_time ? &header->obj_order : NULL,
6241
+ &header->image_size);
51976242 if (ret)
51986243 return ret;
51996244
52006245 if (first_time) {
5201
- ret = rbd_dev_v2_header_onetime(rbd_dev);
6246
+ ret = rbd_dev_v2_header_onetime(rbd_dev, header);
52026247 if (ret)
52036248 return ret;
52046249 }
52056250
5206
- ret = rbd_dev_v2_snap_context(rbd_dev);
5207
- if (ret && first_time) {
5208
- kfree(rbd_dev->header.object_prefix);
5209
- rbd_dev->header.object_prefix = NULL;
5210
- }
6251
+ ret = rbd_dev_v2_snap_context(rbd_dev, &header->snapc);
6252
+ if (ret)
6253
+ return ret;
52116254
5212
- return ret;
6255
+ return 0;
52136256 }
52146257
5215
-static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6258
+static int rbd_dev_header_info(struct rbd_device *rbd_dev,
6259
+ struct rbd_image_header *header,
6260
+ bool first_time)
52166261 {
52176262 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6263
+ rbd_assert(!header->object_prefix && !header->snapc);
52186264
52196265 if (rbd_dev->image_format == 1)
5220
- return rbd_dev_v1_header_info(rbd_dev);
6266
+ return rbd_dev_v1_header_info(rbd_dev, header, first_time);
52216267
5222
- return rbd_dev_v2_header_info(rbd_dev);
6268
+ return rbd_dev_v2_header_info(rbd_dev, header, first_time);
52236269 }
52246270
52256271 /*
....@@ -5275,6 +6321,141 @@
52756321 return dup;
52766322 }
52776323
6324
+static int rbd_parse_param(struct fs_parameter *param,
6325
+ struct rbd_parse_opts_ctx *pctx)
6326
+{
6327
+ struct rbd_options *opt = pctx->opts;
6328
+ struct fs_parse_result result;
6329
+ struct p_log log = {.prefix = "rbd"};
6330
+ int token, ret;
6331
+
6332
+ ret = ceph_parse_param(param, pctx->copts, NULL);
6333
+ if (ret != -ENOPARAM)
6334
+ return ret;
6335
+
6336
+ token = __fs_parse(&log, rbd_parameters, param, &result);
6337
+ dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6338
+ if (token < 0) {
6339
+ if (token == -ENOPARAM)
6340
+ return inval_plog(&log, "Unknown parameter '%s'",
6341
+ param->key);
6342
+ return token;
6343
+ }
6344
+
6345
+ switch (token) {
6346
+ case Opt_queue_depth:
6347
+ if (result.uint_32 < 1)
6348
+ goto out_of_range;
6349
+ opt->queue_depth = result.uint_32;
6350
+ break;
6351
+ case Opt_alloc_size:
6352
+ if (result.uint_32 < SECTOR_SIZE)
6353
+ goto out_of_range;
6354
+ if (!is_power_of_2(result.uint_32))
6355
+ return inval_plog(&log, "alloc_size must be a power of 2");
6356
+ opt->alloc_size = result.uint_32;
6357
+ break;
6358
+ case Opt_lock_timeout:
6359
+ /* 0 is "wait forever" (i.e. infinite timeout) */
6360
+ if (result.uint_32 > INT_MAX / 1000)
6361
+ goto out_of_range;
6362
+ opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6363
+ break;
6364
+ case Opt_pool_ns:
6365
+ kfree(pctx->spec->pool_ns);
6366
+ pctx->spec->pool_ns = param->string;
6367
+ param->string = NULL;
6368
+ break;
6369
+ case Opt_compression_hint:
6370
+ switch (result.uint_32) {
6371
+ case Opt_compression_hint_none:
6372
+ opt->alloc_hint_flags &=
6373
+ ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6374
+ CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6375
+ break;
6376
+ case Opt_compression_hint_compressible:
6377
+ opt->alloc_hint_flags |=
6378
+ CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6379
+ opt->alloc_hint_flags &=
6380
+ ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6381
+ break;
6382
+ case Opt_compression_hint_incompressible:
6383
+ opt->alloc_hint_flags |=
6384
+ CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6385
+ opt->alloc_hint_flags &=
6386
+ ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6387
+ break;
6388
+ default:
6389
+ BUG();
6390
+ }
6391
+ break;
6392
+ case Opt_read_only:
6393
+ opt->read_only = true;
6394
+ break;
6395
+ case Opt_read_write:
6396
+ opt->read_only = false;
6397
+ break;
6398
+ case Opt_lock_on_read:
6399
+ opt->lock_on_read = true;
6400
+ break;
6401
+ case Opt_exclusive:
6402
+ opt->exclusive = true;
6403
+ break;
6404
+ case Opt_notrim:
6405
+ opt->trim = false;
6406
+ break;
6407
+ default:
6408
+ BUG();
6409
+ }
6410
+
6411
+ return 0;
6412
+
6413
+out_of_range:
6414
+ return inval_plog(&log, "%s out of range", param->key);
6415
+}
6416
+
6417
+/*
6418
+ * This duplicates most of generic_parse_monolithic(), untying it from
6419
+ * fs_context and skipping standard superblock and security options.
6420
+ */
6421
+static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6422
+{
6423
+ char *key;
6424
+ int ret = 0;
6425
+
6426
+ dout("%s '%s'\n", __func__, options);
6427
+ while ((key = strsep(&options, ",")) != NULL) {
6428
+ if (*key) {
6429
+ struct fs_parameter param = {
6430
+ .key = key,
6431
+ .type = fs_value_is_flag,
6432
+ };
6433
+ char *value = strchr(key, '=');
6434
+ size_t v_len = 0;
6435
+
6436
+ if (value) {
6437
+ if (value == key)
6438
+ continue;
6439
+ *value++ = 0;
6440
+ v_len = strlen(value);
6441
+ param.string = kmemdup_nul(value, v_len,
6442
+ GFP_KERNEL);
6443
+ if (!param.string)
6444
+ return -ENOMEM;
6445
+ param.type = fs_value_is_string;
6446
+ }
6447
+ param.size = v_len;
6448
+
6449
+ ret = rbd_parse_param(&param, pctx);
6450
+ kfree(param.string);
6451
+ if (ret)
6452
+ break;
6453
+ }
6454
+ }
6455
+
6456
+ return ret;
6457
+}
6458
+
52786459 /*
52796460 * Parse the options provided for an "rbd add" (i.e., rbd image
52806461 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
....@@ -5326,8 +6507,7 @@
53266507 const char *mon_addrs;
53276508 char *snap_name;
53286509 size_t mon_addrs_size;
5329
- struct parse_rbd_opts_ctx pctx = { 0 };
5330
- struct ceph_options *copts;
6510
+ struct rbd_parse_opts_ctx pctx = { 0 };
53316511 int ret;
53326512
53336513 /* The first four tokens are required */
....@@ -5338,7 +6518,7 @@
53386518 return -EINVAL;
53396519 }
53406520 mon_addrs = buf;
5341
- mon_addrs_size = len + 1;
6521
+ mon_addrs_size = len;
53426522 buf += len;
53436523
53446524 ret = -EINVAL;
....@@ -5388,6 +6568,10 @@
53886568 *(snap_name + len) = '\0';
53896569 pctx.spec->snap_name = snap_name;
53906570
6571
+ pctx.copts = ceph_alloc_options();
6572
+ if (!pctx.copts)
6573
+ goto out_mem;
6574
+
53916575 /* Initialize all rbd options to the defaults */
53926576
53936577 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
....@@ -5396,32 +6580,33 @@
53966580
53976581 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
53986582 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6583
+ pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
53996584 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
54006585 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
54016586 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
54026587 pctx.opts->trim = RBD_TRIM_DEFAULT;
54036588
5404
- copts = ceph_parse_options(options, mon_addrs,
5405
- mon_addrs + mon_addrs_size - 1,
5406
- parse_rbd_opts_token, &pctx);
5407
- if (IS_ERR(copts)) {
5408
- ret = PTR_ERR(copts);
6589
+ ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
6590
+ if (ret)
54096591 goto out_err;
5410
- }
5411
- kfree(options);
54126592
5413
- *ceph_opts = copts;
6593
+ ret = rbd_parse_options(options, &pctx);
6594
+ if (ret)
6595
+ goto out_err;
6596
+
6597
+ *ceph_opts = pctx.copts;
54146598 *opts = pctx.opts;
54156599 *rbd_spec = pctx.spec;
5416
-
6600
+ kfree(options);
54176601 return 0;
6602
+
54186603 out_mem:
54196604 ret = -ENOMEM;
54206605 out_err:
54216606 kfree(pctx.opts);
6607
+ ceph_destroy_options(pctx.copts);
54226608 rbd_spec_put(pctx.spec);
54236609 kfree(options);
5424
-
54256610 return ret;
54266611 }
54276612
....@@ -5429,28 +6614,51 @@
54296614 {
54306615 down_write(&rbd_dev->lock_rwsem);
54316616 if (__rbd_is_lock_owner(rbd_dev))
5432
- rbd_unlock(rbd_dev);
6617
+ __rbd_release_lock(rbd_dev);
54336618 up_write(&rbd_dev->lock_rwsem);
54346619 }
54356620
6621
+/*
6622
+ * If the wait is interrupted, an error is returned even if the lock
6623
+ * was successfully acquired. rbd_dev_image_unlock() will release it
6624
+ * if needed.
6625
+ */
54366626 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
54376627 {
5438
- int ret;
6628
+ long ret;
54396629
54406630 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6631
+ if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6632
+ return 0;
6633
+
54416634 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
54426635 return -EINVAL;
54436636 }
54446637
5445
- /* FIXME: "rbd map --exclusive" should be in interruptible */
5446
- down_read(&rbd_dev->lock_rwsem);
5447
- ret = rbd_wait_state_locked(rbd_dev, true);
5448
- up_read(&rbd_dev->lock_rwsem);
5449
- if (ret) {
5450
- rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5451
- return -EROFS;
5452
- }
6638
+ if (rbd_is_ro(rbd_dev))
6639
+ return 0;
54536640
6641
+ rbd_assert(!rbd_is_lock_owner(rbd_dev));
6642
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6643
+ ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6644
+ ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6645
+ if (ret > 0) {
6646
+ ret = rbd_dev->acquire_err;
6647
+ } else {
6648
+ cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6649
+ if (!ret)
6650
+ ret = -ETIMEDOUT;
6651
+
6652
+ rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
6653
+ }
6654
+ if (ret)
6655
+ return ret;
6656
+
6657
+ /*
6658
+ * The lock may have been released by now, unless automatic lock
6659
+ * transitions are disabled.
6660
+ */
6661
+ rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
54546662 return 0;
54556663 }
54566664
....@@ -5500,7 +6708,6 @@
55006708 dout("rbd id object name is %s\n", oid.name);
55016709
55026710 /* Response will be an encoded string, which includes a length */
5503
-
55046711 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
55056712 response = kzalloc(size, GFP_NOIO);
55066713 if (!response) {
....@@ -5512,7 +6719,7 @@
55126719
55136720 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
55146721 "get_id", NULL, 0,
5515
- response, RBD_IMAGE_ID_LEN_MAX);
6722
+ response, size);
55166723 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
55176724 if (ret == -ENOENT) {
55186725 image_id = kstrdup("", GFP_KERNEL);
....@@ -5545,58 +6752,49 @@
55456752 */
55466753 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
55476754 {
5548
- struct rbd_image_header *header;
5549
-
55506755 rbd_dev_parent_put(rbd_dev);
6756
+ rbd_object_map_free(rbd_dev);
6757
+ rbd_dev_mapping_clear(rbd_dev);
55516758
55526759 /* Free dynamic fields from the header, then zero it out */
55536760
5554
- header = &rbd_dev->header;
5555
- ceph_put_snap_context(header->snapc);
5556
- kfree(header->snap_sizes);
5557
- kfree(header->snap_names);
5558
- kfree(header->object_prefix);
5559
- memset(header, 0, sizeof (*header));
6761
+ rbd_image_header_cleanup(&rbd_dev->header);
55606762 }
55616763
5562
-static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6764
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev,
6765
+ struct rbd_image_header *header)
55636766 {
55646767 int ret;
55656768
5566
- ret = rbd_dev_v2_object_prefix(rbd_dev);
6769
+ ret = rbd_dev_v2_object_prefix(rbd_dev, &header->object_prefix);
55676770 if (ret)
5568
- goto out_err;
6771
+ return ret;
55696772
55706773 /*
55716774 * Get the and check features for the image. Currently the
55726775 * features are assumed to never change.
55736776 */
5574
- ret = rbd_dev_v2_features(rbd_dev);
6777
+ ret = _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
6778
+ rbd_is_ro(rbd_dev), &header->features);
55756779 if (ret)
5576
- goto out_err;
6780
+ return ret;
55776781
55786782 /* If the image supports fancy striping, get its parameters */
55796783
5580
- if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5581
- ret = rbd_dev_v2_striping_info(rbd_dev);
5582
- if (ret < 0)
5583
- goto out_err;
5584
- }
5585
-
5586
- if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5587
- ret = rbd_dev_v2_data_pool(rbd_dev);
6784
+ if (header->features & RBD_FEATURE_STRIPINGV2) {
6785
+ ret = rbd_dev_v2_striping_info(rbd_dev, &header->stripe_unit,
6786
+ &header->stripe_count);
55886787 if (ret)
5589
- goto out_err;
6788
+ return ret;
55906789 }
55916790
5592
- rbd_init_layout(rbd_dev);
5593
- return 0;
6791
+ if (header->features & RBD_FEATURE_DATA_POOL) {
6792
+ ret = rbd_dev_v2_data_pool(rbd_dev, &header->data_pool_id);
6793
+ if (ret)
6794
+ return ret;
6795
+ }
55946796
5595
-out_err:
5596
- rbd_dev->header.features = 0;
5597
- kfree(rbd_dev->header.object_prefix);
5598
- rbd_dev->header.object_prefix = NULL;
5599
- return ret;
6797
+ return 0;
56006798 }
56016799
56026800 /*
....@@ -5618,7 +6816,7 @@
56186816 goto out_err;
56196817 }
56206818
5621
- parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6819
+ parent = __rbd_dev_create(rbd_dev->parent_spec);
56226820 if (!parent) {
56236821 ret = -ENOMEM;
56246822 goto out_err;
....@@ -5628,8 +6826,10 @@
56286826 * Images related by parent/child relationships always share
56296827 * rbd_client and spec/parent_spec, so bump their refcounts.
56306828 */
5631
- __rbd_get_client(rbd_dev->rbd_client);
5632
- rbd_spec_get(rbd_dev->parent_spec);
6829
+ parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
6830
+ parent->spec = rbd_spec_get(rbd_dev->parent_spec);
6831
+
6832
+ __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
56336833
56346834 ret = rbd_dev_image_probe(parent, depth);
56356835 if (ret < 0)
....@@ -5648,7 +6848,6 @@
56486848 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
56496849 {
56506850 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5651
- rbd_dev_mapping_clear(rbd_dev);
56526851 rbd_free_disk(rbd_dev);
56536852 if (!single_major)
56546853 unregister_blkdev(rbd_dev->major, rbd_dev->name);
....@@ -5682,23 +6881,17 @@
56826881 if (ret)
56836882 goto err_out_blkdev;
56846883
5685
- ret = rbd_dev_mapping_set(rbd_dev);
5686
- if (ret)
5687
- goto err_out_disk;
5688
-
56896884 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5690
- set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
6885
+ set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
56916886
56926887 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
56936888 if (ret)
5694
- goto err_out_mapping;
6889
+ goto err_out_disk;
56956890
56966891 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
56976892 up_write(&rbd_dev->header_rwsem);
56986893 return 0;
56996894
5700
-err_out_mapping:
5701
- rbd_dev_mapping_clear(rbd_dev);
57026895 err_out_disk:
57036896 rbd_free_disk(rbd_dev);
57046897 err_out_blkdev:
....@@ -5727,9 +6920,27 @@
57276920 return ret;
57286921 }
57296922
6923
+static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6924
+{
6925
+ if (!is_snap) {
6926
+ pr_info("image %s/%s%s%s does not exist\n",
6927
+ rbd_dev->spec->pool_name,
6928
+ rbd_dev->spec->pool_ns ?: "",
6929
+ rbd_dev->spec->pool_ns ? "/" : "",
6930
+ rbd_dev->spec->image_name);
6931
+ } else {
6932
+ pr_info("snap %s/%s%s%s@%s does not exist\n",
6933
+ rbd_dev->spec->pool_name,
6934
+ rbd_dev->spec->pool_ns ?: "",
6935
+ rbd_dev->spec->pool_ns ? "/" : "",
6936
+ rbd_dev->spec->image_name,
6937
+ rbd_dev->spec->snap_name);
6938
+ }
6939
+}
6940
+
57306941 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
57316942 {
5732
- if (rbd_dev->opts)
6943
+ if (!rbd_is_ro(rbd_dev))
57336944 rbd_unregister_watch(rbd_dev);
57346945
57356946 rbd_dev_unprobe(rbd_dev);
....@@ -5749,6 +6960,7 @@
57496960 */
57506961 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
57516962 {
6963
+ bool need_watch = !rbd_is_ro(rbd_dev);
57526964 int ret;
57536965
57546966 /*
....@@ -5765,15 +6977,11 @@
57656977 if (ret)
57666978 goto err_out_format;
57676979
5768
- if (!depth) {
6980
+ if (need_watch) {
57696981 ret = rbd_register_watch(rbd_dev);
57706982 if (ret) {
57716983 if (ret == -ENOENT)
5772
- pr_info("image %s/%s%s%s does not exist\n",
5773
- rbd_dev->spec->pool_name,
5774
- rbd_dev->spec->pool_ns ?: "",
5775
- rbd_dev->spec->pool_ns ? "/" : "",
5776
- rbd_dev->spec->image_name);
6984
+ rbd_print_dne(rbd_dev, false);
57776985 goto err_out_format;
57786986 }
57796987 }
....@@ -5781,9 +6989,14 @@
57816989 if (!depth)
57826990 down_write(&rbd_dev->header_rwsem);
57836991
5784
- ret = rbd_dev_header_info(rbd_dev);
5785
- if (ret)
6992
+ ret = rbd_dev_header_info(rbd_dev, &rbd_dev->header, true);
6993
+ if (ret) {
6994
+ if (ret == -ENOENT && !need_watch)
6995
+ rbd_print_dne(rbd_dev, false);
57866996 goto err_out_probe;
6997
+ }
6998
+
6999
+ rbd_init_layout(rbd_dev);
57877000
57887001 /*
57897002 * If this image is the one being mapped, we have pool name and
....@@ -5797,27 +7010,25 @@
57977010 ret = rbd_spec_fill_names(rbd_dev);
57987011 if (ret) {
57997012 if (ret == -ENOENT)
5800
- pr_info("snap %s/%s%s%s@%s does not exist\n",
5801
- rbd_dev->spec->pool_name,
5802
- rbd_dev->spec->pool_ns ?: "",
5803
- rbd_dev->spec->pool_ns ? "/" : "",
5804
- rbd_dev->spec->image_name,
5805
- rbd_dev->spec->snap_name);
7013
+ rbd_print_dne(rbd_dev, true);
58067014 goto err_out_probe;
58077015 }
58087016
5809
- if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5810
- ret = rbd_dev_v2_parent_info(rbd_dev);
7017
+ ret = rbd_dev_mapping_set(rbd_dev);
7018
+ if (ret)
7019
+ goto err_out_probe;
7020
+
7021
+ if (rbd_is_snap(rbd_dev) &&
7022
+ (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7023
+ ret = rbd_object_map_load(rbd_dev);
58117024 if (ret)
58127025 goto err_out_probe;
7026
+ }
58137027
5814
- /*
5815
- * Need to warn users if this image is the one being
5816
- * mapped and has a parent.
5817
- */
5818
- if (!depth && rbd_dev->parent_spec)
5819
- rbd_warn(rbd_dev,
5820
- "WARNING: kernel layering is EXPERIMENTAL!");
7028
+ if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7029
+ ret = rbd_dev_setup_parent(rbd_dev);
7030
+ if (ret)
7031
+ goto err_out_probe;
58217032 }
58227033
58237034 ret = rbd_dev_probe_parent(rbd_dev, depth);
....@@ -5831,13 +7042,114 @@
58317042 err_out_probe:
58327043 if (!depth)
58337044 up_write(&rbd_dev->header_rwsem);
5834
- if (!depth)
7045
+ if (need_watch)
58357046 rbd_unregister_watch(rbd_dev);
58367047 rbd_dev_unprobe(rbd_dev);
58377048 err_out_format:
58387049 rbd_dev->image_format = 0;
58397050 kfree(rbd_dev->spec->image_id);
58407051 rbd_dev->spec->image_id = NULL;
7052
+ return ret;
7053
+}
7054
+
7055
+static void rbd_dev_update_header(struct rbd_device *rbd_dev,
7056
+ struct rbd_image_header *header)
7057
+{
7058
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
7059
+ rbd_assert(rbd_dev->header.object_prefix); /* !first_time */
7060
+
7061
+ if (rbd_dev->header.image_size != header->image_size) {
7062
+ rbd_dev->header.image_size = header->image_size;
7063
+
7064
+ if (!rbd_is_snap(rbd_dev)) {
7065
+ rbd_dev->mapping.size = header->image_size;
7066
+ rbd_dev_update_size(rbd_dev);
7067
+ }
7068
+ }
7069
+
7070
+ ceph_put_snap_context(rbd_dev->header.snapc);
7071
+ rbd_dev->header.snapc = header->snapc;
7072
+ header->snapc = NULL;
7073
+
7074
+ if (rbd_dev->image_format == 1) {
7075
+ kfree(rbd_dev->header.snap_names);
7076
+ rbd_dev->header.snap_names = header->snap_names;
7077
+ header->snap_names = NULL;
7078
+
7079
+ kfree(rbd_dev->header.snap_sizes);
7080
+ rbd_dev->header.snap_sizes = header->snap_sizes;
7081
+ header->snap_sizes = NULL;
7082
+ }
7083
+}
7084
+
7085
+static void rbd_dev_update_parent(struct rbd_device *rbd_dev,
7086
+ struct parent_image_info *pii)
7087
+{
7088
+ if (pii->pool_id == CEPH_NOPOOL || !pii->has_overlap) {
7089
+ /*
7090
+ * Either the parent never existed, or we have
7091
+ * record of it but the image got flattened so it no
7092
+ * longer has a parent. When the parent of a
7093
+ * layered image disappears we immediately set the
7094
+ * overlap to 0. The effect of this is that all new
7095
+ * requests will be treated as if the image had no
7096
+ * parent.
7097
+ *
7098
+ * If !pii.has_overlap, the parent image spec is not
7099
+ * applicable. It's there to avoid duplication in each
7100
+ * snapshot record.
7101
+ */
7102
+ if (rbd_dev->parent_overlap) {
7103
+ rbd_dev->parent_overlap = 0;
7104
+ rbd_dev_parent_put(rbd_dev);
7105
+ pr_info("%s: clone has been flattened\n",
7106
+ rbd_dev->disk->disk_name);
7107
+ }
7108
+ } else {
7109
+ rbd_assert(rbd_dev->parent_spec);
7110
+
7111
+ /*
7112
+ * Update the parent overlap. If it became zero, issue
7113
+ * a warning as we will proceed as if there is no parent.
7114
+ */
7115
+ if (!pii->overlap && rbd_dev->parent_overlap)
7116
+ rbd_warn(rbd_dev,
7117
+ "clone has become standalone (overlap 0)");
7118
+ rbd_dev->parent_overlap = pii->overlap;
7119
+ }
7120
+}
7121
+
7122
+static int rbd_dev_refresh(struct rbd_device *rbd_dev)
7123
+{
7124
+ struct rbd_image_header header = { 0 };
7125
+ struct parent_image_info pii = { 0 };
7126
+ int ret;
7127
+
7128
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
7129
+
7130
+ ret = rbd_dev_header_info(rbd_dev, &header, false);
7131
+ if (ret)
7132
+ goto out;
7133
+
7134
+ /*
7135
+ * If there is a parent, see if it has disappeared due to the
7136
+ * mapped image getting flattened.
7137
+ */
7138
+ if (rbd_dev->parent) {
7139
+ ret = rbd_dev_v2_parent_info(rbd_dev, &pii);
7140
+ if (ret)
7141
+ goto out;
7142
+ }
7143
+
7144
+ down_write(&rbd_dev->header_rwsem);
7145
+ rbd_dev_update_header(rbd_dev, &header);
7146
+ if (rbd_dev->parent)
7147
+ rbd_dev_update_parent(rbd_dev, &pii);
7148
+ up_write(&rbd_dev->header_rwsem);
7149
+
7150
+out:
7151
+ rbd_parent_info_cleanup(&pii);
7152
+ rbd_image_header_cleanup(&header);
58417153 return ret;
58427154 }
58437155
....@@ -5887,6 +7199,11 @@
58877199 spec = NULL; /* rbd_dev now owns this */
58887200 rbd_opts = NULL; /* rbd_dev now owns this */
58897201
7202
+ /* if we are mapping a snapshot it will be a read-only mapping */
7203
+ if (rbd_dev->opts->read_only ||
7204
+ strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7205
+ __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7206
+
58907207 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
58917208 if (!rbd_dev->config_info) {
58927209 rc = -ENOMEM;
....@@ -5897,19 +7214,19 @@
58977214 if (rc < 0)
58987215 goto err_out_rbd_dev;
58997216
5900
- /* If we are mapping a snapshot it must be marked read-only */
5901
- if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5902
- rbd_dev->opts->read_only = true;
7217
+ if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7218
+ rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7219
+ rbd_dev->layout.object_size);
7220
+ rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7221
+ }
59037222
59047223 rc = rbd_dev_device_setup(rbd_dev);
59057224 if (rc)
59067225 goto err_out_image_probe;
59077226
5908
- if (rbd_dev->opts->exclusive) {
5909
- rc = rbd_add_acquire_lock(rbd_dev);
5910
- if (rc)
5911
- goto err_out_device_setup;
5912
- }
7227
+ rc = rbd_add_acquire_lock(rbd_dev);
7228
+ if (rc)
7229
+ goto err_out_image_lock;
59137230
59147231 /* Everything's ready. Announce the disk to the world. */
59157232
....@@ -5917,7 +7234,7 @@
59177234 if (rc)
59187235 goto err_out_image_lock;
59197236
5920
- add_disk(rbd_dev->disk);
7237
+ device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
59217238 /* see rbd_init_disk() */
59227239 blk_put_queue(rbd_dev->disk->queue);
59237240
....@@ -5935,7 +7252,6 @@
59357252
59367253 err_out_image_lock:
59377254 rbd_dev_image_unlock(rbd_dev);
5938
-err_out_device_setup:
59397255 rbd_dev_device_release(rbd_dev);
59407256 err_out_image_probe:
59417257 rbd_dev_image_release(rbd_dev);
....@@ -5949,9 +7265,7 @@
59497265 goto out;
59507266 }
59517267
5952
-static ssize_t rbd_add(struct bus_type *bus,
5953
- const char *buf,
5954
- size_t count)
7268
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
59557269 {
59567270 if (single_major)
59577271 return -EINVAL;
....@@ -5959,9 +7273,8 @@
59597273 return do_rbd_add(bus, buf, count);
59607274 }
59617275
5962
-static ssize_t rbd_add_single_major(struct bus_type *bus,
5963
- const char *buf,
5964
- size_t count)
7276
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7277
+ size_t count)
59657278 {
59667279 return do_rbd_add(bus, buf, count);
59677280 }
....@@ -6067,9 +7380,7 @@
60677380 return count;
60687381 }
60697382
6070
-static ssize_t rbd_remove(struct bus_type *bus,
6071
- const char *buf,
6072
- size_t count)
7383
+static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
60737384 {
60747385 if (single_major)
60757386 return -EINVAL;
....@@ -6077,9 +7388,8 @@
60777388 return do_rbd_remove(bus, buf, count);
60787389 }
60797390
6080
-static ssize_t rbd_remove_single_major(struct bus_type *bus,
6081
- const char *buf,
6082
- size_t count)
7391
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7392
+ size_t count)
60837393 {
60847394 return do_rbd_remove(bus, buf, count);
60857395 }
....@@ -6088,7 +7398,7 @@
60887398 * create control files in sysfs
60897399 * /sys/bus/rbd/...
60907400 */
6091
-static int rbd_sysfs_init(void)
7401
+static int __init rbd_sysfs_init(void)
60927402 {
60937403 int ret;
60947404
....@@ -6103,13 +7413,13 @@
61037413 return ret;
61047414 }
61057415
6106
-static void rbd_sysfs_cleanup(void)
7416
+static void __exit rbd_sysfs_cleanup(void)
61077417 {
61087418 bus_unregister(&rbd_bus_type);
61097419 device_unregister(&rbd_root_dev);
61107420 }
61117421
6112
-static int rbd_slab_init(void)
7422
+static int __init rbd_slab_init(void)
61137423 {
61147424 rbd_assert(!rbd_img_request_cache);
61157425 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);