forked from ~ljy/RK356X_SDK_RELEASE

hc
2023-12-11 072de836f53be56a70cecf70b43ae43b7ce17376
kernel/drivers/block/rbd.c
....@@ -34,7 +34,7 @@
3434 #include <linux/ceph/cls_lock_client.h>
3535 #include <linux/ceph/striper.h>
3636 #include <linux/ceph/decode.h>
37
-#include <linux/parser.h>
37
+#include <linux/fs_parser.h>
3838 #include <linux/bsearch.h>
3939
4040 #include <linux/kernel.h>
....@@ -115,12 +115,18 @@
115115 #define RBD_FEATURE_LAYERING (1ULL<<0)
116116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118
+#define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119
+#define RBD_FEATURE_FAST_DIFF (1ULL<<4)
120
+#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
118121 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
119122 #define RBD_FEATURE_OPERATIONS (1ULL<<8)
120123
121124 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
122125 RBD_FEATURE_STRIPINGV2 | \
123126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127
+ RBD_FEATURE_OBJECT_MAP | \
128
+ RBD_FEATURE_FAST_DIFF | \
129
+ RBD_FEATURE_DEEP_FLATTEN | \
124130 RBD_FEATURE_DATA_POOL | \
125131 RBD_FEATURE_OPERATIONS)
126132
....@@ -201,6 +207,11 @@
201207 struct list_head node;
202208 };
203209
210
+struct pending_result {
211
+ int result; /* first nonzero result */
212
+ int num_pending;
213
+};
214
+
204215 struct rbd_img_request;
205216
206217 enum obj_request_type {
....@@ -214,34 +225,69 @@
214225 OBJ_OP_READ = 1,
215226 OBJ_OP_WRITE,
216227 OBJ_OP_DISCARD,
228
+ OBJ_OP_ZEROOUT,
229
+};
230
+
231
+#define RBD_OBJ_FLAG_DELETION (1U << 0)
232
+#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
233
+#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234
+#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235
+#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
236
+
237
+enum rbd_obj_read_state {
238
+ RBD_OBJ_READ_START = 1,
239
+ RBD_OBJ_READ_OBJECT,
240
+ RBD_OBJ_READ_PARENT,
217241 };
218242
219243 /*
220244 * Writes go through the following state machine to deal with
221245 * layering:
222246 *
223
- * need copyup
224
- * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
225
- * | ^ |
226
- * v \------------------------------/
227
- * done
228
- * ^
229
- * |
230
- * RBD_OBJ_WRITE_FLAT
247
+ * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248
+ * . | .
249
+ * . v .
250
+ * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . .
251
+ * . | . .
252
+ * . v v (deep-copyup .
253
+ * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) .
254
+ * flattened) v | . .
255
+ * . v . .
256
+ * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup .
257
+ * | not needed) v
258
+ * v .
259
+ * done . . . . . . . . . . . . . . . . . .
260
+ * ^
261
+ * |
262
+ * RBD_OBJ_WRITE_FLAT
231263 *
232264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
233
- * there is a parent or not.
265
+ * assert_exists guard is needed or not (in some cases it's not needed
266
+ * even if there is a parent).
234267 */
235268 enum rbd_obj_write_state {
236
- RBD_OBJ_WRITE_FLAT = 1,
237
- RBD_OBJ_WRITE_GUARD,
269
+ RBD_OBJ_WRITE_START = 1,
270
+ RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271
+ RBD_OBJ_WRITE_OBJECT,
272
+ __RBD_OBJ_WRITE_COPYUP,
238273 RBD_OBJ_WRITE_COPYUP,
274
+ RBD_OBJ_WRITE_POST_OBJECT_MAP,
275
+};
276
+
277
+enum rbd_obj_copyup_state {
278
+ RBD_OBJ_COPYUP_START = 1,
279
+ RBD_OBJ_COPYUP_READ_PARENT,
280
+ __RBD_OBJ_COPYUP_OBJECT_MAPS,
281
+ RBD_OBJ_COPYUP_OBJECT_MAPS,
282
+ __RBD_OBJ_COPYUP_WRITE_OBJECT,
283
+ RBD_OBJ_COPYUP_WRITE_OBJECT,
239284 };
240285
241286 struct rbd_obj_request {
242287 struct ceph_object_extent ex;
288
+ unsigned int flags; /* RBD_OBJ_FLAG_* */
243289 union {
244
- bool tried_parent; /* for reads */
290
+ enum rbd_obj_read_state read_state; /* for reads */
245291 enum rbd_obj_write_state write_state; /* for writes */
246292 };
247293
....@@ -257,14 +303,15 @@
257303 u32 bvec_idx;
258304 };
259305 };
306
+
307
+ enum rbd_obj_copyup_state copyup_state;
260308 struct bio_vec *copyup_bvecs;
261309 u32 copyup_bvec_count;
262310
263
- struct ceph_osd_request *osd_req;
311
+ struct list_head osd_reqs; /* w/ r_private_item */
264312
265
- u64 xferred; /* bytes transferred */
266
- int result;
267
-
313
+ struct mutex state_mutex;
314
+ struct pending_result pending;
268315 struct kref kref;
269316 };
270317
....@@ -273,28 +320,32 @@
273320 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
274321 };
275322
323
+enum rbd_img_state {
324
+ RBD_IMG_START = 1,
325
+ RBD_IMG_EXCLUSIVE_LOCK,
326
+ __RBD_IMG_OBJECT_REQUESTS,
327
+ RBD_IMG_OBJECT_REQUESTS,
328
+};
329
+
276330 struct rbd_img_request {
277331 struct rbd_device *rbd_dev;
278332 enum obj_operation_type op_type;
279333 enum obj_request_type data_type;
280334 unsigned long flags;
335
+ enum rbd_img_state state;
281336 union {
282337 u64 snap_id; /* for reads */
283338 struct ceph_snap_context *snapc; /* for writes */
284339 };
285
- union {
286
- struct request *rq; /* block request */
287
- struct rbd_obj_request *obj_request; /* obj req initiator */
288
- };
289
- spinlock_t completion_lock;
290
- u64 xferred;/* aggregate bytes transferred */
291
- int result; /* first nonzero obj_request result */
340
+ struct rbd_obj_request *obj_request; /* obj req initiator */
292341
342
+ struct list_head lock_item;
293343 struct list_head object_extents; /* obj_req.ex structs */
294
- u32 obj_request_count;
295
- u32 pending_count;
296344
297
- struct kref kref;
345
+ struct mutex state_mutex;
346
+ struct pending_result pending;
347
+ struct work_struct work;
348
+ int work_result;
298349 };
299350
300351 #define for_each_obj_request(ireq, oreq) \
....@@ -322,7 +373,6 @@
322373
323374 struct rbd_mapping {
324375 u64 size;
325
- u64 features;
326376 };
327377
328378 /*
....@@ -367,7 +417,17 @@
367417 struct work_struct released_lock_work;
368418 struct delayed_work lock_dwork;
369419 struct work_struct unlock_work;
370
- wait_queue_head_t lock_waitq;
420
+ spinlock_t lock_lists_lock;
421
+ struct list_head acquiring_list;
422
+ struct list_head running_list;
423
+ struct completion acquire_wait;
424
+ int acquire_err;
425
+ struct completion releasing_wait;
426
+
427
+ spinlock_t object_map_lock;
428
+ u8 *object_map;
429
+ u64 object_map_size; /* in objects */
430
+ u64 object_map_flags;
371431
372432 struct workqueue_struct *task_wq;
373433
....@@ -395,12 +455,11 @@
395455 * Flag bits for rbd_dev->flags:
396456 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397457 * by rbd_dev->lock
398
- * - BLACKLISTED is protected by rbd_dev->lock_rwsem
399458 */
400459 enum rbd_dev_flags {
401
- RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
460
+ RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */
402461 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
403
- RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
462
+ RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */
404463 };
405464
406465 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
....@@ -421,6 +480,10 @@
421480
422481 static struct workqueue_struct *rbd_wq;
423482
483
+static struct ceph_snap_context rbd_empty_snapc = {
484
+ .nref = REFCOUNT_INIT(1),
485
+};
486
+
424487 /*
425488 * single-major requires >= 0.75 version of userspace rbd utility.
426489 */
....@@ -428,14 +491,13 @@
428491 module_param(single_major, bool, 0444);
429492 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
430493
431
-static ssize_t rbd_add(struct bus_type *bus, const char *buf,
432
- size_t count);
433
-static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
434
- size_t count);
435
-static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
436
- size_t count);
437
-static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
438
- size_t count);
494
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
495
+static ssize_t remove_store(struct bus_type *bus, const char *buf,
496
+ size_t count);
497
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
498
+ size_t count);
499
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
500
+ size_t count);
439501 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
440502
441503 static int rbd_dev_id_to_minor(int dev_id)
....@@ -448,8 +510,20 @@
448510 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
449511 }
450512
513
+static bool rbd_is_ro(struct rbd_device *rbd_dev)
514
+{
515
+ return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
516
+}
517
+
518
+static bool rbd_is_snap(struct rbd_device *rbd_dev)
519
+{
520
+ return rbd_dev->spec->snap_id != CEPH_NOSNAP;
521
+}
522
+
451523 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
452524 {
525
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
526
+
453527 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
454528 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
455529 }
....@@ -464,16 +538,16 @@
464538 return is_lock_owner;
465539 }
466540
467
-static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
541
+static ssize_t supported_features_show(struct bus_type *bus, char *buf)
468542 {
469543 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
470544 }
471545
472
-static BUS_ATTR(add, 0200, NULL, rbd_add);
473
-static BUS_ATTR(remove, 0200, NULL, rbd_remove);
474
-static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
475
-static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
476
-static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
546
+static BUS_ATTR_WO(add);
547
+static BUS_ATTR_WO(remove);
548
+static BUS_ATTR_WO(add_single_major);
549
+static BUS_ATTR_WO(remove_single_major);
550
+static BUS_ATTR_RO(supported_features);
477551
478552 static struct attribute *rbd_bus_attrs[] = {
479553 &bus_attr_add.attr,
....@@ -565,8 +639,26 @@
565639 u64 snap_id);
566640 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
567641 u8 *order, u64 *snap_size);
568
-static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
569
- u64 *snap_features);
642
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
643
+
644
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
645
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
646
+
647
+/*
648
+ * Return true if nothing else is pending.
649
+ */
650
+static bool pending_result_dec(struct pending_result *pending, int *result)
651
+{
652
+ rbd_assert(pending->num_pending > 0);
653
+
654
+ if (*result && !pending->result)
655
+ pending->result = *result;
656
+ if (--pending->num_pending)
657
+ return false;
658
+
659
+ *result = pending->result;
660
+ return true;
661
+}
570662
571663 static int rbd_open(struct block_device *bdev, fmode_t mode)
572664 {
....@@ -607,9 +699,16 @@
607699 if (get_user(ro, (int __user *)arg))
608700 return -EFAULT;
609701
610
- /* Snapshots can't be marked read-write */
611
- if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
612
- return -EROFS;
702
+ /*
703
+ * Both images mapped read-only and snapshots can't be marked
704
+ * read-write.
705
+ */
706
+ if (!ro) {
707
+ if (rbd_is_ro(rbd_dev))
708
+ return -EROFS;
709
+
710
+ rbd_assert(!rbd_is_snap(rbd_dev));
711
+ }
613712
614713 /* Let blkdev_roset() handle it */
615714 return -ENOTTY;
....@@ -733,121 +832,74 @@
733832 */
734833 enum {
735834 Opt_queue_depth,
835
+ Opt_alloc_size,
736836 Opt_lock_timeout,
737
- Opt_last_int,
738837 /* int args above */
739838 Opt_pool_ns,
740
- Opt_last_string,
839
+ Opt_compression_hint,
741840 /* string args above */
742841 Opt_read_only,
743842 Opt_read_write,
744843 Opt_lock_on_read,
745844 Opt_exclusive,
746845 Opt_notrim,
747
- Opt_err
748846 };
749847
750
-static match_table_t rbd_opts_tokens = {
751
- {Opt_queue_depth, "queue_depth=%d"},
752
- {Opt_lock_timeout, "lock_timeout=%d"},
753
- /* int args above */
754
- {Opt_pool_ns, "_pool_ns=%s"},
755
- /* string args above */
756
- {Opt_read_only, "read_only"},
757
- {Opt_read_only, "ro"}, /* Alternate spelling */
758
- {Opt_read_write, "read_write"},
759
- {Opt_read_write, "rw"}, /* Alternate spelling */
760
- {Opt_lock_on_read, "lock_on_read"},
761
- {Opt_exclusive, "exclusive"},
762
- {Opt_notrim, "notrim"},
763
- {Opt_err, NULL}
848
+enum {
849
+ Opt_compression_hint_none,
850
+ Opt_compression_hint_compressible,
851
+ Opt_compression_hint_incompressible,
852
+};
853
+
854
+static const struct constant_table rbd_param_compression_hint[] = {
855
+ {"none", Opt_compression_hint_none},
856
+ {"compressible", Opt_compression_hint_compressible},
857
+ {"incompressible", Opt_compression_hint_incompressible},
858
+ {}
859
+};
860
+
861
+static const struct fs_parameter_spec rbd_parameters[] = {
862
+ fsparam_u32 ("alloc_size", Opt_alloc_size),
863
+ fsparam_enum ("compression_hint", Opt_compression_hint,
864
+ rbd_param_compression_hint),
865
+ fsparam_flag ("exclusive", Opt_exclusive),
866
+ fsparam_flag ("lock_on_read", Opt_lock_on_read),
867
+ fsparam_u32 ("lock_timeout", Opt_lock_timeout),
868
+ fsparam_flag ("notrim", Opt_notrim),
869
+ fsparam_string ("_pool_ns", Opt_pool_ns),
870
+ fsparam_u32 ("queue_depth", Opt_queue_depth),
871
+ fsparam_flag ("read_only", Opt_read_only),
872
+ fsparam_flag ("read_write", Opt_read_write),
873
+ fsparam_flag ("ro", Opt_read_only),
874
+ fsparam_flag ("rw", Opt_read_write),
875
+ {}
764876 };
765877
766878 struct rbd_options {
767879 int queue_depth;
880
+ int alloc_size;
768881 unsigned long lock_timeout;
769882 bool read_only;
770883 bool lock_on_read;
771884 bool exclusive;
772885 bool trim;
886
+
887
+ u32 alloc_hint_flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
773888 };
774889
775890 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
891
+#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
776892 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
777893 #define RBD_READ_ONLY_DEFAULT false
778894 #define RBD_LOCK_ON_READ_DEFAULT false
779895 #define RBD_EXCLUSIVE_DEFAULT false
780896 #define RBD_TRIM_DEFAULT true
781897
782
-struct parse_rbd_opts_ctx {
898
+struct rbd_parse_opts_ctx {
783899 struct rbd_spec *spec;
900
+ struct ceph_options *copts;
784901 struct rbd_options *opts;
785902 };
786
-
787
-static int parse_rbd_opts_token(char *c, void *private)
788
-{
789
- struct parse_rbd_opts_ctx *pctx = private;
790
- substring_t argstr[MAX_OPT_ARGS];
791
- int token, intval, ret;
792
-
793
- token = match_token(c, rbd_opts_tokens, argstr);
794
- if (token < Opt_last_int) {
795
- ret = match_int(&argstr[0], &intval);
796
- if (ret < 0) {
797
- pr_err("bad option arg (not int) at '%s'\n", c);
798
- return ret;
799
- }
800
- dout("got int token %d val %d\n", token, intval);
801
- } else if (token > Opt_last_int && token < Opt_last_string) {
802
- dout("got string token %d val %s\n", token, argstr[0].from);
803
- } else {
804
- dout("got token %d\n", token);
805
- }
806
-
807
- switch (token) {
808
- case Opt_queue_depth:
809
- if (intval < 1) {
810
- pr_err("queue_depth out of range\n");
811
- return -EINVAL;
812
- }
813
- pctx->opts->queue_depth = intval;
814
- break;
815
- case Opt_lock_timeout:
816
- /* 0 is "wait forever" (i.e. infinite timeout) */
817
- if (intval < 0 || intval > INT_MAX / 1000) {
818
- pr_err("lock_timeout out of range\n");
819
- return -EINVAL;
820
- }
821
- pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
822
- break;
823
- case Opt_pool_ns:
824
- kfree(pctx->spec->pool_ns);
825
- pctx->spec->pool_ns = match_strdup(argstr);
826
- if (!pctx->spec->pool_ns)
827
- return -ENOMEM;
828
- break;
829
- case Opt_read_only:
830
- pctx->opts->read_only = true;
831
- break;
832
- case Opt_read_write:
833
- pctx->opts->read_only = false;
834
- break;
835
- case Opt_lock_on_read:
836
- pctx->opts->lock_on_read = true;
837
- break;
838
- case Opt_exclusive:
839
- pctx->opts->exclusive = true;
840
- break;
841
- case Opt_notrim:
842
- pctx->opts->trim = false;
843
- break;
844
- default:
845
- /* libceph prints "bad option" msg */
846
- return -EINVAL;
847
- }
848
-
849
- return 0;
850
-}
851903
852904 static char* obj_op_name(enum obj_operation_type op_type)
853905 {
....@@ -858,6 +910,8 @@
858910 return "write";
859911 case OBJ_OP_DISCARD:
860912 return "discard";
913
+ case OBJ_OP_ZEROOUT:
914
+ return "zeroout";
861915 default:
862916 return "???";
863917 }
....@@ -891,23 +945,6 @@
891945 kref_put(&rbdc->kref, rbd_client_release);
892946 }
893947
894
-static int wait_for_latest_osdmap(struct ceph_client *client)
895
-{
896
- u64 newest_epoch;
897
- int ret;
898
-
899
- ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
900
- if (ret)
901
- return ret;
902
-
903
- if (client->osdc.osdmap->epoch >= newest_epoch)
904
- return 0;
905
-
906
- ceph_osdc_maybe_request_map(&client->osdc);
907
- return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
908
- client->options->mount_timeout);
909
-}
910
-
911948 /*
912949 * Get a ceph client with specific addr and configuration, if one does
913950 * not exist create it. Either way, ceph_opts is consumed by this
....@@ -918,7 +955,7 @@
918955 struct rbd_client *rbdc;
919956 int ret;
920957
921
- mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
958
+ mutex_lock(&client_mutex);
922959 rbdc = rbd_client_find(ceph_opts);
923960 if (rbdc) {
924961 ceph_destroy_options(ceph_opts);
....@@ -927,7 +964,8 @@
927964 * Using an existing client. Make sure ->pg_pools is up to
928965 * date before we look up the pool id in do_rbd_add().
929966 */
930
- ret = wait_for_latest_osdmap(rbdc->client);
967
+ ret = ceph_wait_for_latest_osdmap(rbdc->client,
968
+ rbdc->client->options->mount_timeout);
931969 if (ret) {
932970 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
933971 rbd_put_client(rbdc);
....@@ -1213,51 +1251,23 @@
12131251 return 0;
12141252 }
12151253
1216
-static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1217
- u64 *snap_features)
1218
-{
1219
- rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1220
- if (snap_id == CEPH_NOSNAP) {
1221
- *snap_features = rbd_dev->header.features;
1222
- } else if (rbd_dev->image_format == 1) {
1223
- *snap_features = 0; /* No features for format 1 */
1224
- } else {
1225
- u64 features = 0;
1226
- int ret;
1227
-
1228
- ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1229
- if (ret)
1230
- return ret;
1231
-
1232
- *snap_features = features;
1233
- }
1234
- return 0;
1235
-}
1236
-
12371254 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
12381255 {
12391256 u64 snap_id = rbd_dev->spec->snap_id;
12401257 u64 size = 0;
1241
- u64 features = 0;
12421258 int ret;
12431259
12441260 ret = rbd_snap_size(rbd_dev, snap_id, &size);
12451261 if (ret)
12461262 return ret;
1247
- ret = rbd_snap_features(rbd_dev, snap_id, &features);
1248
- if (ret)
1249
- return ret;
12501263
12511264 rbd_dev->mapping.size = size;
1252
- rbd_dev->mapping.features = features;
1253
-
12541265 return 0;
12551266 }
12561267
12571268 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
12581269 {
12591270 rbd_dev->mapping.size = 0;
1260
- rbd_dev->mapping.features = 0;
12611271 }
12621272
12631273 static void zero_bvec(struct bio_vec *bv)
....@@ -1300,6 +1310,8 @@
13001310 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
13011311 u32 bytes)
13021312 {
1313
+ dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1314
+
13031315 switch (obj_req->img_request->data_type) {
13041316 case OBJ_REQUEST_BIO:
13051317 zero_bios(&obj_req->bio_pos, off, bytes);
....@@ -1309,7 +1321,7 @@
13091321 zero_bvecs(&obj_req->bvec_pos, off, bytes);
13101322 break;
13111323 default:
1312
- rbd_assert(0);
1324
+ BUG();
13131325 }
13141326 }
13151327
....@@ -1322,22 +1334,6 @@
13221334 kref_put(&obj_request->kref, rbd_obj_request_destroy);
13231335 }
13241336
1325
-static void rbd_img_request_get(struct rbd_img_request *img_request)
1326
-{
1327
- dout("%s: img %p (was %d)\n", __func__, img_request,
1328
- kref_read(&img_request->kref));
1329
- kref_get(&img_request->kref);
1330
-}
1331
-
1332
-static void rbd_img_request_destroy(struct kref *kref);
1333
-static void rbd_img_request_put(struct rbd_img_request *img_request)
1334
-{
1335
- rbd_assert(img_request != NULL);
1336
- dout("%s: img %p (was %d)\n", __func__, img_request,
1337
- kref_read(&img_request->kref));
1338
- kref_put(&img_request->kref, rbd_img_request_destroy);
1339
-}
1340
-
13411337 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
13421338 struct rbd_obj_request *obj_request)
13431339 {
....@@ -1345,8 +1341,6 @@
13451341
13461342 /* Image request now owns object's original reference */
13471343 obj_request->img_request = img_request;
1348
- img_request->obj_request_count++;
1349
- img_request->pending_count++;
13501344 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
13511345 }
13521346
....@@ -1355,19 +1349,17 @@
13551349 {
13561350 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
13571351 list_del(&obj_request->ex.oe_item);
1358
- rbd_assert(img_request->obj_request_count > 0);
1359
- img_request->obj_request_count--;
13601352 rbd_assert(obj_request->img_request == img_request);
13611353 rbd_obj_request_put(obj_request);
13621354 }
13631355
1364
-static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1356
+static void rbd_osd_submit(struct ceph_osd_request *osd_req)
13651357 {
1366
- struct ceph_osd_request *osd_req = obj_request->osd_req;
1358
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
13671359
1368
- dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1369
- obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1370
- obj_request->ex.oe_len, osd_req);
1360
+ dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1361
+ __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1362
+ obj_req->ex.oe_off, obj_req->ex.oe_len);
13711363 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
13721364 }
13731365
....@@ -1379,18 +1371,10 @@
13791371 static void img_request_layered_set(struct rbd_img_request *img_request)
13801372 {
13811373 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1382
- smp_mb();
1383
-}
1384
-
1385
-static void img_request_layered_clear(struct rbd_img_request *img_request)
1386
-{
1387
- clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1388
- smp_mb();
13891374 }
13901375
13911376 static bool img_request_layered_test(struct rbd_img_request *img_request)
13921377 {
1393
- smp_mb();
13941378 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
13951379 }
13961380
....@@ -1410,6 +1394,19 @@
14101394 rbd_dev->layout.object_size;
14111395 }
14121396
1397
+/*
1398
+ * Must be called after rbd_obj_calc_img_extents().
1399
+ */
1400
+static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1401
+{
1402
+ if (!obj_req->num_img_extents ||
1403
+ (rbd_obj_is_entire(obj_req) &&
1404
+ !obj_req->img_request->snapc->num_snaps))
1405
+ return false;
1406
+
1407
+ return true;
1408
+}
1409
+
14131410 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
14141411 {
14151412 return ceph_file_extents_bytes(obj_req->img_extents,
....@@ -1423,47 +1420,47 @@
14231420 return false;
14241421 case OBJ_OP_WRITE:
14251422 case OBJ_OP_DISCARD:
1423
+ case OBJ_OP_ZEROOUT:
14261424 return true;
14271425 default:
14281426 BUG();
14291427 }
14301428 }
14311429
1432
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1433
-
14341430 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
14351431 {
14361432 struct rbd_obj_request *obj_req = osd_req->r_priv;
1433
+ int result;
14371434
14381435 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
14391436 osd_req->r_result, obj_req);
1440
- rbd_assert(osd_req == obj_req->osd_req);
14411437
1442
- obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1443
- if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1444
- obj_req->xferred = osd_req->r_result;
1438
+ /*
1439
+ * Writes aren't allowed to return a data payload. In some
1440
+ * guarded write cases (e.g. stat + zero on an empty object)
1441
+ * a stat response makes it through, but we don't care.
1442
+ */
1443
+ if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1444
+ result = 0;
14451445 else
1446
- /*
1447
- * Writes aren't allowed to return a data payload. In some
1448
- * guarded write cases (e.g. stat + zero on an empty object)
1449
- * a stat response makes it through, but we don't care.
1450
- */
1451
- obj_req->xferred = 0;
1446
+ result = osd_req->r_result;
14521447
1453
- rbd_obj_handle_request(obj_req);
1448
+ rbd_obj_handle_request(obj_req, result);
14541449 }
14551450
1456
-static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1451
+static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
14571452 {
1458
- struct ceph_osd_request *osd_req = obj_request->osd_req;
1453
+ struct rbd_obj_request *obj_request = osd_req->r_priv;
1454
+ struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1455
+ struct ceph_options *opt = rbd_dev->rbd_client->client->options;
14591456
1460
- osd_req->r_flags = CEPH_OSD_FLAG_READ;
1457
+ osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
14611458 osd_req->r_snapid = obj_request->img_request->snap_id;
14621459 }
14631460
1464
-static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1461
+static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
14651462 {
1466
- struct ceph_osd_request *osd_req = obj_request->osd_req;
1463
+ struct rbd_obj_request *obj_request = osd_req->r_priv;
14671464
14681465 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
14691466 ktime_get_real_ts64(&osd_req->r_mtime);
....@@ -1471,21 +1468,21 @@
14711468 }
14721469
14731470 static struct ceph_osd_request *
1474
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1471
+__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1472
+ struct ceph_snap_context *snapc, int num_ops)
14751473 {
1476
- struct rbd_img_request *img_req = obj_req->img_request;
1477
- struct rbd_device *rbd_dev = img_req->rbd_dev;
1474
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
14781475 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
14791476 struct ceph_osd_request *req;
14801477 const char *name_format = rbd_dev->image_format == 1 ?
14811478 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1479
+ int ret;
14821480
1483
- req = ceph_osdc_alloc_request(osdc,
1484
- (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1485
- num_ops, false, GFP_NOIO);
1481
+ req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
14861482 if (!req)
1487
- return NULL;
1483
+ return ERR_PTR(-ENOMEM);
14881484
1485
+ list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
14891486 req->r_callback = rbd_osd_req_callback;
14901487 req->r_priv = obj_req;
14911488
....@@ -1496,23 +1493,20 @@
14961493 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
14971494 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
14981495
1499
- if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1500
- rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1501
- goto err_req;
1502
-
1503
- if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1504
- goto err_req;
1496
+ ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1497
+ rbd_dev->header.object_prefix,
1498
+ obj_req->ex.oe_objno);
1499
+ if (ret)
1500
+ return ERR_PTR(ret);
15051501
15061502 return req;
1507
-
1508
-err_req:
1509
- ceph_osdc_put_request(req);
1510
- return NULL;
15111503 }
15121504
1513
-static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1505
+static struct ceph_osd_request *
1506
+rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
15141507 {
1515
- ceph_osdc_put_request(osd_req);
1508
+ return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1509
+ num_ops);
15161510 }
15171511
15181512 static struct rbd_obj_request *rbd_obj_request_create(void)
....@@ -1524,6 +1518,8 @@
15241518 return NULL;
15251519
15261520 ceph_object_extent_init(&obj_request->ex);
1521
+ INIT_LIST_HEAD(&obj_request->osd_reqs);
1522
+ mutex_init(&obj_request->state_mutex);
15271523 kref_init(&obj_request->kref);
15281524
15291525 dout("%s %p\n", __func__, obj_request);
....@@ -1533,14 +1529,19 @@
15331529 static void rbd_obj_request_destroy(struct kref *kref)
15341530 {
15351531 struct rbd_obj_request *obj_request;
1532
+ struct ceph_osd_request *osd_req;
15361533 u32 i;
15371534
15381535 obj_request = container_of(kref, struct rbd_obj_request, kref);
15391536
15401537 dout("%s: obj %p\n", __func__, obj_request);
15411538
1542
- if (obj_request->osd_req)
1543
- rbd_osd_req_destroy(obj_request->osd_req);
1539
+ while (!list_empty(&obj_request->osd_reqs)) {
1540
+ osd_req = list_first_entry(&obj_request->osd_reqs,
1541
+ struct ceph_osd_request, r_private_item);
1542
+ list_del_init(&osd_req->r_private_item);
1543
+ ceph_osdc_put_request(osd_req);
1544
+ }
15441545
15451546 switch (obj_request->img_request->data_type) {
15461547 case OBJ_REQUEST_NODATA:
....@@ -1551,7 +1552,7 @@
15511552 kfree(obj_request->bvec_pos.bvecs);
15521553 break;
15531554 default:
1554
- rbd_assert(0);
1555
+ BUG();
15551556 }
15561557
15571558 kfree(obj_request->img_extents);
....@@ -1617,10 +1618,8 @@
16171618 if (!rbd_dev->parent_spec)
16181619 return false;
16191620
1620
- down_read(&rbd_dev->header_rwsem);
16211621 if (rbd_dev->parent_overlap)
16221622 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1623
- up_read(&rbd_dev->header_rwsem);
16241623
16251624 if (counter < 0)
16261625 rbd_warn(rbd_dev, "parent reference overflow");
....@@ -1628,64 +1627,525 @@
16281627 return counter > 0;
16291628 }
16301629
1631
-/*
1632
- * Caller is responsible for filling in the list of object requests
1633
- * that comprises the image request, and the Linux request pointer
1634
- * (if there is one).
1635
- */
1636
-static struct rbd_img_request *rbd_img_request_create(
1637
- struct rbd_device *rbd_dev,
1638
- enum obj_operation_type op_type,
1639
- struct ceph_snap_context *snapc)
1630
+static void rbd_img_request_init(struct rbd_img_request *img_request,
1631
+ struct rbd_device *rbd_dev,
1632
+ enum obj_operation_type op_type)
16401633 {
1641
- struct rbd_img_request *img_request;
1642
-
1643
- img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1644
- if (!img_request)
1645
- return NULL;
1634
+ memset(img_request, 0, sizeof(*img_request));
16461635
16471636 img_request->rbd_dev = rbd_dev;
16481637 img_request->op_type = op_type;
1649
- if (!rbd_img_is_write(img_request))
1650
- img_request->snap_id = rbd_dev->spec->snap_id;
1651
- else
1652
- img_request->snapc = snapc;
16531638
1654
- if (rbd_dev_parent_get(rbd_dev))
1655
- img_request_layered_set(img_request);
1656
-
1657
- spin_lock_init(&img_request->completion_lock);
1639
+ INIT_LIST_HEAD(&img_request->lock_item);
16581640 INIT_LIST_HEAD(&img_request->object_extents);
1659
- kref_init(&img_request->kref);
1660
-
1661
- dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1662
- obj_op_name(op_type), img_request);
1663
- return img_request;
1641
+ mutex_init(&img_request->state_mutex);
16641642 }
16651643
1666
-static void rbd_img_request_destroy(struct kref *kref)
1644
+static void rbd_img_capture_header(struct rbd_img_request *img_req)
16671645 {
1668
- struct rbd_img_request *img_request;
1646
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
1647
+
1648
+ lockdep_assert_held(&rbd_dev->header_rwsem);
1649
+
1650
+ if (rbd_img_is_write(img_req))
1651
+ img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1652
+ else
1653
+ img_req->snap_id = rbd_dev->spec->snap_id;
1654
+
1655
+ if (rbd_dev_parent_get(rbd_dev))
1656
+ img_request_layered_set(img_req);
1657
+}
1658
+
1659
+static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1660
+{
16691661 struct rbd_obj_request *obj_request;
16701662 struct rbd_obj_request *next_obj_request;
16711663
1672
- img_request = container_of(kref, struct rbd_img_request, kref);
1673
-
16741664 dout("%s: img %p\n", __func__, img_request);
16751665
1666
+ WARN_ON(!list_empty(&img_request->lock_item));
16761667 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
16771668 rbd_img_obj_request_del(img_request, obj_request);
1678
- rbd_assert(img_request->obj_request_count == 0);
16791669
1680
- if (img_request_layered_test(img_request)) {
1681
- img_request_layered_clear(img_request);
1670
+ if (img_request_layered_test(img_request))
16821671 rbd_dev_parent_put(img_request->rbd_dev);
1683
- }
16841672
16851673 if (rbd_img_is_write(img_request))
16861674 ceph_put_snap_context(img_request->snapc);
16871675
1688
- kmem_cache_free(rbd_img_request_cache, img_request);
1676
+ if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1677
+ kmem_cache_free(rbd_img_request_cache, img_request);
1678
+}
1679
+
1680
+#define BITS_PER_OBJ 2
1681
+#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1682
+#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1683
+
1684
+static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1685
+ u64 *index, u8 *shift)
1686
+{
1687
+ u32 off;
1688
+
1689
+ rbd_assert(objno < rbd_dev->object_map_size);
1690
+ *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1691
+ *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1692
+}
1693
+
1694
+static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1695
+{
1696
+ u64 index;
1697
+ u8 shift;
1698
+
1699
+ lockdep_assert_held(&rbd_dev->object_map_lock);
1700
+ __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1701
+ return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1702
+}
1703
+
1704
+static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1705
+{
1706
+ u64 index;
1707
+ u8 shift;
1708
+ u8 *p;
1709
+
1710
+ lockdep_assert_held(&rbd_dev->object_map_lock);
1711
+ rbd_assert(!(val & ~OBJ_MASK));
1712
+
1713
+ __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1714
+ p = &rbd_dev->object_map[index];
1715
+ *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1716
+}
1717
+
1718
+static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1719
+{
1720
+ u8 state;
1721
+
1722
+ spin_lock(&rbd_dev->object_map_lock);
1723
+ state = __rbd_object_map_get(rbd_dev, objno);
1724
+ spin_unlock(&rbd_dev->object_map_lock);
1725
+ return state;
1726
+}
1727
+
1728
+static bool use_object_map(struct rbd_device *rbd_dev)
1729
+{
1730
+ /*
1731
+ * An image mapped read-only can't use the object map -- it isn't
1732
+ * loaded because the header lock isn't acquired. Someone else can
1733
+ * write to the image and update the object map behind our back.
1734
+ *
1735
+ * A snapshot can't be written to, so using the object map is always
1736
+ * safe.
1737
+ */
1738
+ if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1739
+ return false;
1740
+
1741
+ return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1742
+ !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1743
+}
1744
+
1745
+static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1746
+{
1747
+ u8 state;
1748
+
1749
+ /* fall back to default logic if object map is disabled or invalid */
1750
+ if (!use_object_map(rbd_dev))
1751
+ return true;
1752
+
1753
+ state = rbd_object_map_get(rbd_dev, objno);
1754
+ return state != OBJECT_NONEXISTENT;
1755
+}
1756
+
1757
+static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1758
+ struct ceph_object_id *oid)
1759
+{
1760
+ if (snap_id == CEPH_NOSNAP)
1761
+ ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1762
+ rbd_dev->spec->image_id);
1763
+ else
1764
+ ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1765
+ rbd_dev->spec->image_id, snap_id);
1766
+}
1767
+
1768
+static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1769
+{
1770
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1771
+ CEPH_DEFINE_OID_ONSTACK(oid);
1772
+ u8 lock_type;
1773
+ char *lock_tag;
1774
+ struct ceph_locker *lockers;
1775
+ u32 num_lockers;
1776
+ bool broke_lock = false;
1777
+ int ret;
1778
+
1779
+ rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1780
+
1781
+again:
1782
+ ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1783
+ CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1784
+ if (ret != -EBUSY || broke_lock) {
1785
+ if (ret == -EEXIST)
1786
+ ret = 0; /* already locked by myself */
1787
+ if (ret)
1788
+ rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1789
+ return ret;
1790
+ }
1791
+
1792
+ ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1793
+ RBD_LOCK_NAME, &lock_type, &lock_tag,
1794
+ &lockers, &num_lockers);
1795
+ if (ret) {
1796
+ if (ret == -ENOENT)
1797
+ goto again;
1798
+
1799
+ rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1800
+ return ret;
1801
+ }
1802
+
1803
+ kfree(lock_tag);
1804
+ if (num_lockers == 0)
1805
+ goto again;
1806
+
1807
+ rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1808
+ ENTITY_NAME(lockers[0].id.name));
1809
+
1810
+ ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1811
+ RBD_LOCK_NAME, lockers[0].id.cookie,
1812
+ &lockers[0].id.name);
1813
+ ceph_free_lockers(lockers, num_lockers);
1814
+ if (ret) {
1815
+ if (ret == -ENOENT)
1816
+ goto again;
1817
+
1818
+ rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1819
+ return ret;
1820
+ }
1821
+
1822
+ broke_lock = true;
1823
+ goto again;
1824
+}
1825
+
1826
+static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1827
+{
1828
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1829
+ CEPH_DEFINE_OID_ONSTACK(oid);
1830
+ int ret;
1831
+
1832
+ rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1833
+
1834
+ ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1835
+ "");
1836
+ if (ret && ret != -ENOENT)
1837
+ rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1838
+}
1839
+
1840
+static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1841
+{
1842
+ u8 struct_v;
1843
+ u32 struct_len;
1844
+ u32 header_len;
1845
+ void *header_end;
1846
+ int ret;
1847
+
1848
+ ceph_decode_32_safe(p, end, header_len, e_inval);
1849
+ header_end = *p + header_len;
1850
+
1851
+ ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1852
+ &struct_len);
1853
+ if (ret)
1854
+ return ret;
1855
+
1856
+ ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1857
+
1858
+ *p = header_end;
1859
+ return 0;
1860
+
1861
+e_inval:
1862
+ return -EINVAL;
1863
+}
1864
+
1865
+static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1866
+{
1867
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1868
+ CEPH_DEFINE_OID_ONSTACK(oid);
1869
+ struct page **pages;
1870
+ void *p, *end;
1871
+ size_t reply_len;
1872
+ u64 num_objects;
1873
+ u64 object_map_bytes;
1874
+ u64 object_map_size;
1875
+ int num_pages;
1876
+ int ret;
1877
+
1878
+ rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1879
+
1880
+ num_objects = ceph_get_num_objects(&rbd_dev->layout,
1881
+ rbd_dev->mapping.size);
1882
+ object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1883
+ BITS_PER_BYTE);
1884
+ num_pages = calc_pages_for(0, object_map_bytes) + 1;
1885
+ pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1886
+ if (IS_ERR(pages))
1887
+ return PTR_ERR(pages);
1888
+
1889
+ reply_len = num_pages * PAGE_SIZE;
1890
+ rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1891
+ ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1892
+ "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1893
+ NULL, 0, pages, &reply_len);
1894
+ if (ret)
1895
+ goto out;
1896
+
1897
+ p = page_address(pages[0]);
1898
+ end = p + min(reply_len, (size_t)PAGE_SIZE);
1899
+ ret = decode_object_map_header(&p, end, &object_map_size);
1900
+ if (ret)
1901
+ goto out;
1902
+
1903
+ if (object_map_size != num_objects) {
1904
+ rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1905
+ object_map_size, num_objects);
1906
+ ret = -EINVAL;
1907
+ goto out;
1908
+ }
1909
+
1910
+ if (offset_in_page(p) + object_map_bytes > reply_len) {
1911
+ ret = -EINVAL;
1912
+ goto out;
1913
+ }
1914
+
1915
+ rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1916
+ if (!rbd_dev->object_map) {
1917
+ ret = -ENOMEM;
1918
+ goto out;
1919
+ }
1920
+
1921
+ rbd_dev->object_map_size = object_map_size;
1922
+ ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1923
+ offset_in_page(p), object_map_bytes);
1924
+
1925
+out:
1926
+ ceph_release_page_vector(pages, num_pages);
1927
+ return ret;
1928
+}
1929
+
1930
+static void rbd_object_map_free(struct rbd_device *rbd_dev)
1931
+{
1932
+ kvfree(rbd_dev->object_map);
1933
+ rbd_dev->object_map = NULL;
1934
+ rbd_dev->object_map_size = 0;
1935
+}
1936
+
1937
+static int rbd_object_map_load(struct rbd_device *rbd_dev)
1938
+{
1939
+ int ret;
1940
+
1941
+ ret = __rbd_object_map_load(rbd_dev);
1942
+ if (ret)
1943
+ return ret;
1944
+
1945
+ ret = rbd_dev_v2_get_flags(rbd_dev);
1946
+ if (ret) {
1947
+ rbd_object_map_free(rbd_dev);
1948
+ return ret;
1949
+ }
1950
+
1951
+ if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1952
+ rbd_warn(rbd_dev, "object map is invalid");
1953
+
1954
+ return 0;
1955
+}
1956
+
1957
+static int rbd_object_map_open(struct rbd_device *rbd_dev)
1958
+{
1959
+ int ret;
1960
+
1961
+ ret = rbd_object_map_lock(rbd_dev);
1962
+ if (ret)
1963
+ return ret;
1964
+
1965
+ ret = rbd_object_map_load(rbd_dev);
1966
+ if (ret) {
1967
+ rbd_object_map_unlock(rbd_dev);
1968
+ return ret;
1969
+ }
1970
+
1971
+ return 0;
1972
+}
1973
+
1974
+static void rbd_object_map_close(struct rbd_device *rbd_dev)
1975
+{
1976
+ rbd_object_map_free(rbd_dev);
1977
+ rbd_object_map_unlock(rbd_dev);
1978
+}
1979
+
1980
+/*
1981
+ * This function needs snap_id (or more precisely just something to
1982
+ * distinguish between HEAD and snapshot object maps), new_state and
1983
+ * current_state that were passed to rbd_object_map_update().
1984
+ *
1985
+ * To avoid allocating and stashing a context we piggyback on the OSD
1986
+ * request. A HEAD update has two ops (assert_locked). For new_state
1987
+ * and current_state we decode our own object_map_update op, encoded in
1988
+ * rbd_cls_object_map_update().
1989
+ */
1990
+static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1991
+ struct ceph_osd_request *osd_req)
1992
+{
1993
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1994
+ struct ceph_osd_data *osd_data;
1995
+ u64 objno;
1996
+ u8 state, new_state, current_state;
1997
+ bool has_current_state;
1998
+ void *p;
1999
+
2000
+ if (osd_req->r_result)
2001
+ return osd_req->r_result;
2002
+
2003
+ /*
2004
+ * Nothing to do for a snapshot object map.
2005
+ */
2006
+ if (osd_req->r_num_ops == 1)
2007
+ return 0;
2008
+
2009
+ /*
2010
+ * Update in-memory HEAD object map.
2011
+ */
2012
+ rbd_assert(osd_req->r_num_ops == 2);
2013
+ osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2014
+ rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2015
+
2016
+ p = page_address(osd_data->pages[0]);
2017
+ objno = ceph_decode_64(&p);
2018
+ rbd_assert(objno == obj_req->ex.oe_objno);
2019
+ rbd_assert(ceph_decode_64(&p) == objno + 1);
2020
+ new_state = ceph_decode_8(&p);
2021
+ has_current_state = ceph_decode_8(&p);
2022
+ if (has_current_state)
2023
+ current_state = ceph_decode_8(&p);
2024
+
2025
+ spin_lock(&rbd_dev->object_map_lock);
2026
+ state = __rbd_object_map_get(rbd_dev, objno);
2027
+ if (!has_current_state || current_state == state ||
2028
+ (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2029
+ __rbd_object_map_set(rbd_dev, objno, new_state);
2030
+ spin_unlock(&rbd_dev->object_map_lock);
2031
+
2032
+ return 0;
2033
+}
2034
+
2035
+static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2036
+{
2037
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2038
+ int result;
2039
+
2040
+ dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2041
+ osd_req->r_result, obj_req);
2042
+
2043
+ result = rbd_object_map_update_finish(obj_req, osd_req);
2044
+ rbd_obj_handle_request(obj_req, result);
2045
+}
2046
+
2047
+static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2048
+{
2049
+ u8 state = rbd_object_map_get(rbd_dev, objno);
2050
+
2051
+ if (state == new_state ||
2052
+ (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2053
+ (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2054
+ return false;
2055
+
2056
+ return true;
2057
+}
2058
+
2059
+static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2060
+ int which, u64 objno, u8 new_state,
2061
+ const u8 *current_state)
2062
+{
2063
+ struct page **pages;
2064
+ void *p, *start;
2065
+ int ret;
2066
+
2067
+ ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2068
+ if (ret)
2069
+ return ret;
2070
+
2071
+ pages = ceph_alloc_page_vector(1, GFP_NOIO);
2072
+ if (IS_ERR(pages))
2073
+ return PTR_ERR(pages);
2074
+
2075
+ p = start = page_address(pages[0]);
2076
+ ceph_encode_64(&p, objno);
2077
+ ceph_encode_64(&p, objno + 1);
2078
+ ceph_encode_8(&p, new_state);
2079
+ if (current_state) {
2080
+ ceph_encode_8(&p, 1);
2081
+ ceph_encode_8(&p, *current_state);
2082
+ } else {
2083
+ ceph_encode_8(&p, 0);
2084
+ }
2085
+
2086
+ osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2087
+ false, true);
2088
+ return 0;
2089
+}
2090
+
2091
+/*
2092
+ * Return:
2093
+ * 0 - object map update sent
2094
+ * 1 - object map update isn't needed
2095
+ * <0 - error
2096
+ */
2097
+static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2098
+ u8 new_state, const u8 *current_state)
2099
+{
2100
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2101
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2102
+ struct ceph_osd_request *req;
2103
+ int num_ops = 1;
2104
+ int which = 0;
2105
+ int ret;
2106
+
2107
+ if (snap_id == CEPH_NOSNAP) {
2108
+ if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2109
+ return 1;
2110
+
2111
+ num_ops++; /* assert_locked */
2112
+ }
2113
+
2114
+ req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2115
+ if (!req)
2116
+ return -ENOMEM;
2117
+
2118
+ list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2119
+ req->r_callback = rbd_object_map_callback;
2120
+ req->r_priv = obj_req;
2121
+
2122
+ rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2123
+ ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2124
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
2125
+ ktime_get_real_ts64(&req->r_mtime);
2126
+
2127
+ if (snap_id == CEPH_NOSNAP) {
2128
+ /*
2129
+ * Protect against possible race conditions during lock
2130
+ * ownership transitions.
2131
+ */
2132
+ ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2133
+ CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2134
+ if (ret)
2135
+ return ret;
2136
+ }
2137
+
2138
+ ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2139
+ new_state, current_state);
2140
+ if (ret)
2141
+ return ret;
2142
+
2143
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2144
+ if (ret)
2145
+ return ret;
2146
+
2147
+ ceph_osdc_start_request(osdc, req, false);
2148
+ return 0;
16892149 }
16902150
16912151 static void prune_extents(struct ceph_file_extent *img_extents,
....@@ -1735,11 +2195,13 @@
17352195 return 0;
17362196 }
17372197
1738
-static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
2198
+static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
17392199 {
2200
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2201
+
17402202 switch (obj_req->img_request->data_type) {
17412203 case OBJ_REQUEST_BIO:
1742
- osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
2204
+ osd_req_op_extent_osd_data_bio(osd_req, which,
17432205 &obj_req->bio_pos,
17442206 obj_req->ex.oe_len);
17452207 break;
....@@ -1748,30 +2210,15 @@
17482210 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
17492211 obj_req->ex.oe_len);
17502212 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1751
- osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
2213
+ osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
17522214 &obj_req->bvec_pos);
17532215 break;
17542216 default:
1755
- rbd_assert(0);
2217
+ BUG();
17562218 }
17572219 }
17582220
1759
-static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1760
-{
1761
- obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1762
- if (!obj_req->osd_req)
1763
- return -ENOMEM;
1764
-
1765
- osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1766
- obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1767
- rbd_osd_req_setup_data(obj_req, 0);
1768
-
1769
- rbd_osd_req_format_read(obj_req);
1770
- return 0;
1771
-}
1772
-
1773
-static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1774
- unsigned int which)
2221
+static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
17752222 {
17762223 struct page **pages;
17772224
....@@ -1787,39 +2234,61 @@
17872234 if (IS_ERR(pages))
17882235 return PTR_ERR(pages);
17892236
1790
- osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1791
- osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
2237
+ osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2238
+ osd_req_op_raw_data_in_pages(osd_req, which, pages,
17922239 8 + sizeof(struct ceph_timespec),
17932240 0, false, true);
17942241 return 0;
17952242 }
17962243
1797
-static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1798
- unsigned int which)
2244
+static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2245
+ u32 bytes)
17992246 {
2247
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2248
+ int ret;
2249
+
2250
+ ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2251
+ if (ret)
2252
+ return ret;
2253
+
2254
+ osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2255
+ obj_req->copyup_bvec_count, bytes);
2256
+ return 0;
2257
+}
2258
+
2259
+static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2260
+{
2261
+ obj_req->read_state = RBD_OBJ_READ_START;
2262
+ return 0;
2263
+}
2264
+
2265
+static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2266
+ int which)
2267
+{
2268
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
18002269 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
18012270 u16 opcode;
18022271
1803
- osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1804
- rbd_dev->layout.object_size,
1805
- rbd_dev->layout.object_size);
2272
+ if (!use_object_map(rbd_dev) ||
2273
+ !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2274
+ osd_req_op_alloc_hint_init(osd_req, which++,
2275
+ rbd_dev->layout.object_size,
2276
+ rbd_dev->layout.object_size,
2277
+ rbd_dev->opts->alloc_hint_flags);
2278
+ }
18062279
18072280 if (rbd_obj_is_entire(obj_req))
18082281 opcode = CEPH_OSD_OP_WRITEFULL;
18092282 else
18102283 opcode = CEPH_OSD_OP_WRITE;
18112284
1812
- osd_req_op_extent_init(obj_req->osd_req, which, opcode,
2285
+ osd_req_op_extent_init(osd_req, which, opcode,
18132286 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1814
- rbd_osd_req_setup_data(obj_req, which++);
1815
-
1816
- rbd_assert(which == obj_req->osd_req->r_num_ops);
1817
- rbd_osd_req_format_write(obj_req);
2287
+ rbd_osd_setup_data(osd_req, which);
18182288 }
18192289
1820
-static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
2290
+static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
18212291 {
1822
- unsigned int num_osd_ops, which = 0;
18232292 int ret;
18242293
18252294 /* reverse map the entire object onto the parent */
....@@ -1827,61 +2296,107 @@
18272296 if (ret)
18282297 return ret;
18292298
1830
- if (obj_req->num_img_extents) {
1831
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1832
- num_osd_ops = 3; /* stat + setallochint + write/writefull */
1833
- } else {
1834
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1835
- num_osd_ops = 2; /* setallochint + write/writefull */
1836
- }
2299
+ if (rbd_obj_copyup_enabled(obj_req))
2300
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
18372301
1838
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1839
- if (!obj_req->osd_req)
1840
- return -ENOMEM;
1841
-
1842
- if (obj_req->num_img_extents) {
1843
- ret = __rbd_obj_setup_stat(obj_req, which++);
1844
- if (ret)
1845
- return ret;
1846
- }
1847
-
1848
- __rbd_obj_setup_write(obj_req, which);
2302
+ obj_req->write_state = RBD_OBJ_WRITE_START;
18492303 return 0;
18502304 }
18512305
1852
-static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1853
- unsigned int which)
2306
+static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
18542307 {
2308
+ return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2309
+ CEPH_OSD_OP_ZERO;
2310
+}
2311
+
2312
+static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2313
+ int which)
2314
+{
2315
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2316
+
2317
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2318
+ rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2319
+ osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2320
+ } else {
2321
+ osd_req_op_extent_init(osd_req, which,
2322
+ truncate_or_zero_opcode(obj_req),
2323
+ obj_req->ex.oe_off, obj_req->ex.oe_len,
2324
+ 0, 0);
2325
+ }
2326
+}
2327
+
2328
+static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2329
+{
2330
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2331
+ u64 off, next_off;
2332
+ int ret;
2333
+
2334
+ /*
2335
+ * Align the range to alloc_size boundary and punt on discards
2336
+ * that are too small to free up any space.
2337
+ *
2338
+ * alloc_size == object_size && is_tail() is a special case for
2339
+ * filestore with filestore_punch_hole = false, needed to allow
2340
+ * truncate (in addition to delete).
2341
+ */
2342
+ if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2343
+ !rbd_obj_is_tail(obj_req)) {
2344
+ off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2345
+ next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2346
+ rbd_dev->opts->alloc_size);
2347
+ if (off >= next_off)
2348
+ return 1;
2349
+
2350
+ dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2351
+ obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2352
+ off, next_off - off);
2353
+ obj_req->ex.oe_off = off;
2354
+ obj_req->ex.oe_len = next_off - off;
2355
+ }
2356
+
2357
+ /* reverse map the entire object onto the parent */
2358
+ ret = rbd_obj_calc_img_extents(obj_req, true);
2359
+ if (ret)
2360
+ return ret;
2361
+
2362
+ obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2363
+ if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2364
+ obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2365
+
2366
+ obj_req->write_state = RBD_OBJ_WRITE_START;
2367
+ return 0;
2368
+}
2369
+
2370
+static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2371
+ int which)
2372
+{
2373
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
18552374 u16 opcode;
18562375
18572376 if (rbd_obj_is_entire(obj_req)) {
18582377 if (obj_req->num_img_extents) {
1859
- osd_req_op_init(obj_req->osd_req, which++,
1860
- CEPH_OSD_OP_CREATE, 0);
2378
+ if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2379
+ osd_req_op_init(osd_req, which++,
2380
+ CEPH_OSD_OP_CREATE, 0);
18612381 opcode = CEPH_OSD_OP_TRUNCATE;
18622382 } else {
1863
- osd_req_op_init(obj_req->osd_req, which++,
2383
+ rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2384
+ osd_req_op_init(osd_req, which++,
18642385 CEPH_OSD_OP_DELETE, 0);
18652386 opcode = 0;
18662387 }
1867
- } else if (rbd_obj_is_tail(obj_req)) {
1868
- opcode = CEPH_OSD_OP_TRUNCATE;
18692388 } else {
1870
- opcode = CEPH_OSD_OP_ZERO;
2389
+ opcode = truncate_or_zero_opcode(obj_req);
18712390 }
18722391
18732392 if (opcode)
1874
- osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
2393
+ osd_req_op_extent_init(osd_req, which, opcode,
18752394 obj_req->ex.oe_off, obj_req->ex.oe_len,
18762395 0, 0);
1877
-
1878
- rbd_assert(which == obj_req->osd_req->r_num_ops);
1879
- rbd_osd_req_format_write(obj_req);
18802396 }
18812397
1882
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
2398
+static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
18832399 {
1884
- unsigned int num_osd_ops, which = 0;
18852400 int ret;
18862401
18872402 /* reverse map the entire object onto the parent */
....@@ -1889,64 +2404,98 @@
18892404 if (ret)
18902405 return ret;
18912406
1892
- if (rbd_obj_is_entire(obj_req)) {
1893
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1894
- if (obj_req->num_img_extents)
1895
- num_osd_ops = 2; /* create + truncate */
1896
- else
1897
- num_osd_ops = 1; /* delete */
1898
- } else {
1899
- if (obj_req->num_img_extents) {
1900
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1901
- num_osd_ops = 2; /* stat + truncate/zero */
1902
- } else {
1903
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1904
- num_osd_ops = 1; /* truncate/zero */
1905
- }
2407
+ if (rbd_obj_copyup_enabled(obj_req))
2408
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2409
+ if (!obj_req->num_img_extents) {
2410
+ obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2411
+ if (rbd_obj_is_entire(obj_req))
2412
+ obj_req->flags |= RBD_OBJ_FLAG_DELETION;
19062413 }
19072414
1908
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1909
- if (!obj_req->osd_req)
1910
- return -ENOMEM;
1911
-
1912
- if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1913
- ret = __rbd_obj_setup_stat(obj_req, which++);
1914
- if (ret)
1915
- return ret;
1916
- }
1917
-
1918
- __rbd_obj_setup_discard(obj_req, which);
2415
+ obj_req->write_state = RBD_OBJ_WRITE_START;
19192416 return 0;
19202417 }
19212418
2419
+static int count_write_ops(struct rbd_obj_request *obj_req)
2420
+{
2421
+ struct rbd_img_request *img_req = obj_req->img_request;
2422
+
2423
+ switch (img_req->op_type) {
2424
+ case OBJ_OP_WRITE:
2425
+ if (!use_object_map(img_req->rbd_dev) ||
2426
+ !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2427
+ return 2; /* setallochint + write/writefull */
2428
+
2429
+ return 1; /* write/writefull */
2430
+ case OBJ_OP_DISCARD:
2431
+ return 1; /* delete/truncate/zero */
2432
+ case OBJ_OP_ZEROOUT:
2433
+ if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2434
+ !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2435
+ return 2; /* create + truncate */
2436
+
2437
+ return 1; /* delete/truncate/zero */
2438
+ default:
2439
+ BUG();
2440
+ }
2441
+}
2442
+
2443
+static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2444
+ int which)
2445
+{
2446
+ struct rbd_obj_request *obj_req = osd_req->r_priv;
2447
+
2448
+ switch (obj_req->img_request->op_type) {
2449
+ case OBJ_OP_WRITE:
2450
+ __rbd_osd_setup_write_ops(osd_req, which);
2451
+ break;
2452
+ case OBJ_OP_DISCARD:
2453
+ __rbd_osd_setup_discard_ops(osd_req, which);
2454
+ break;
2455
+ case OBJ_OP_ZEROOUT:
2456
+ __rbd_osd_setup_zeroout_ops(osd_req, which);
2457
+ break;
2458
+ default:
2459
+ BUG();
2460
+ }
2461
+}
2462
+
19222463 /*
1923
- * For each object request in @img_req, allocate an OSD request, add
1924
- * individual OSD ops and prepare them for submission. The number of
1925
- * OSD ops depends on op_type and the overlap point (if any).
2464
+ * Prune the list of object requests (adjust offset and/or length, drop
2465
+ * redundant requests). Prepare object request state machines and image
2466
+ * request state machine for execution.
19262467 */
19272468 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
19282469 {
1929
- struct rbd_obj_request *obj_req;
2470
+ struct rbd_obj_request *obj_req, *next_obj_req;
19302471 int ret;
19312472
1932
- for_each_obj_request(img_req, obj_req) {
2473
+ for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
19332474 switch (img_req->op_type) {
19342475 case OBJ_OP_READ:
1935
- ret = rbd_obj_setup_read(obj_req);
2476
+ ret = rbd_obj_init_read(obj_req);
19362477 break;
19372478 case OBJ_OP_WRITE:
1938
- ret = rbd_obj_setup_write(obj_req);
2479
+ ret = rbd_obj_init_write(obj_req);
19392480 break;
19402481 case OBJ_OP_DISCARD:
1941
- ret = rbd_obj_setup_discard(obj_req);
2482
+ ret = rbd_obj_init_discard(obj_req);
2483
+ break;
2484
+ case OBJ_OP_ZEROOUT:
2485
+ ret = rbd_obj_init_zeroout(obj_req);
19422486 break;
19432487 default:
1944
- rbd_assert(0);
2488
+ BUG();
19452489 }
1946
- if (ret)
2490
+ if (ret < 0)
19472491 return ret;
2492
+ if (ret > 0) {
2493
+ rbd_img_obj_request_del(img_req, obj_req);
2494
+ continue;
2495
+ }
19482496 }
19492497
2498
+ img_req->state = RBD_IMG_START;
19502499 return 0;
19512500 }
19522501
....@@ -2235,32 +2784,78 @@
22352784 &it);
22362785 }
22372786
2238
-static void rbd_img_request_submit(struct rbd_img_request *img_request)
2787
+static void rbd_img_handle_request_work(struct work_struct *work)
22392788 {
2240
- struct rbd_obj_request *obj_request;
2789
+ struct rbd_img_request *img_req =
2790
+ container_of(work, struct rbd_img_request, work);
22412791
2242
- dout("%s: img %p\n", __func__, img_request);
2792
+ rbd_img_handle_request(img_req, img_req->work_result);
2793
+}
22432794
2244
- rbd_img_request_get(img_request);
2245
- for_each_obj_request(img_request, obj_request)
2246
- rbd_obj_request_submit(obj_request);
2795
+static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2796
+{
2797
+ INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2798
+ img_req->work_result = result;
2799
+ queue_work(rbd_wq, &img_req->work);
2800
+}
22472801
2248
- rbd_img_request_put(img_request);
2802
+static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2803
+{
2804
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2805
+
2806
+ if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2807
+ obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2808
+ return true;
2809
+ }
2810
+
2811
+ dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2812
+ obj_req->ex.oe_objno);
2813
+ return false;
2814
+}
2815
+
2816
+static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2817
+{
2818
+ struct ceph_osd_request *osd_req;
2819
+ int ret;
2820
+
2821
+ osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2822
+ if (IS_ERR(osd_req))
2823
+ return PTR_ERR(osd_req);
2824
+
2825
+ osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2826
+ obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2827
+ rbd_osd_setup_data(osd_req, 0);
2828
+ rbd_osd_format_read(osd_req);
2829
+
2830
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2831
+ if (ret)
2832
+ return ret;
2833
+
2834
+ rbd_osd_submit(osd_req);
2835
+ return 0;
22492836 }
22502837
22512838 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
22522839 {
22532840 struct rbd_img_request *img_req = obj_req->img_request;
2841
+ struct rbd_device *parent = img_req->rbd_dev->parent;
22542842 struct rbd_img_request *child_img_req;
22552843 int ret;
22562844
2257
- child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2258
- OBJ_OP_READ, NULL);
2845
+ child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
22592846 if (!child_img_req)
22602847 return -ENOMEM;
22612848
2849
+ rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
22622850 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
22632851 child_img_req->obj_request = obj_req;
2852
+
2853
+ down_read(&parent->header_rwsem);
2854
+ rbd_img_capture_header(child_img_req);
2855
+ up_read(&parent->header_rwsem);
2856
+
2857
+ dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2858
+ obj_req);
22642859
22652860 if (!rbd_img_is_write(img_req)) {
22662861 switch (img_req->data_type) {
....@@ -2278,7 +2873,7 @@
22782873 &obj_req->bvec_pos);
22792874 break;
22802875 default:
2281
- rbd_assert(0);
2876
+ BUG();
22822877 }
22832878 } else {
22842879 ret = rbd_img_fill_from_bvecs(child_img_req,
....@@ -2287,55 +2882,159 @@
22872882 obj_req->copyup_bvecs);
22882883 }
22892884 if (ret) {
2290
- rbd_img_request_put(child_img_req);
2885
+ rbd_img_request_destroy(child_img_req);
22912886 return ret;
22922887 }
22932888
2294
- rbd_img_request_submit(child_img_req);
2889
+ /* avoid parent chain recursion */
2890
+ rbd_img_schedule(child_img_req, 0);
22952891 return 0;
22962892 }
22972893
2298
-static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2894
+static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
22992895 {
23002896 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
23012897 int ret;
23022898
2303
- if (obj_req->result == -ENOENT &&
2304
- rbd_dev->parent_overlap && !obj_req->tried_parent) {
2305
- /* reverse map this object extent onto the parent */
2306
- ret = rbd_obj_calc_img_extents(obj_req, false);
2899
+again:
2900
+ switch (obj_req->read_state) {
2901
+ case RBD_OBJ_READ_START:
2902
+ rbd_assert(!*result);
2903
+
2904
+ if (!rbd_obj_may_exist(obj_req)) {
2905
+ *result = -ENOENT;
2906
+ obj_req->read_state = RBD_OBJ_READ_OBJECT;
2907
+ goto again;
2908
+ }
2909
+
2910
+ ret = rbd_obj_read_object(obj_req);
23072911 if (ret) {
2308
- obj_req->result = ret;
2912
+ *result = ret;
23092913 return true;
23102914 }
2311
-
2312
- if (obj_req->num_img_extents) {
2313
- obj_req->tried_parent = true;
2314
- ret = rbd_obj_read_from_parent(obj_req);
2915
+ obj_req->read_state = RBD_OBJ_READ_OBJECT;
2916
+ return false;
2917
+ case RBD_OBJ_READ_OBJECT:
2918
+ if (*result == -ENOENT && rbd_dev->parent_overlap) {
2919
+ /* reverse map this object extent onto the parent */
2920
+ ret = rbd_obj_calc_img_extents(obj_req, false);
23152921 if (ret) {
2316
- obj_req->result = ret;
2922
+ *result = ret;
23172923 return true;
23182924 }
2319
- return false;
2925
+ if (obj_req->num_img_extents) {
2926
+ ret = rbd_obj_read_from_parent(obj_req);
2927
+ if (ret) {
2928
+ *result = ret;
2929
+ return true;
2930
+ }
2931
+ obj_req->read_state = RBD_OBJ_READ_PARENT;
2932
+ return false;
2933
+ }
23202934 }
2935
+
2936
+ /*
2937
+ * -ENOENT means a hole in the image -- zero-fill the entire
2938
+ * length of the request. A short read also implies zero-fill
2939
+ * to the end of the request.
2940
+ */
2941
+ if (*result == -ENOENT) {
2942
+ rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2943
+ *result = 0;
2944
+ } else if (*result >= 0) {
2945
+ if (*result < obj_req->ex.oe_len)
2946
+ rbd_obj_zero_range(obj_req, *result,
2947
+ obj_req->ex.oe_len - *result);
2948
+ else
2949
+ rbd_assert(*result == obj_req->ex.oe_len);
2950
+ *result = 0;
2951
+ }
2952
+ return true;
2953
+ case RBD_OBJ_READ_PARENT:
2954
+ /*
2955
+ * The parent image is read only up to the overlap -- zero-fill
2956
+ * from the overlap to the end of the request.
2957
+ */
2958
+ if (!*result) {
2959
+ u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2960
+
2961
+ if (obj_overlap < obj_req->ex.oe_len)
2962
+ rbd_obj_zero_range(obj_req, obj_overlap,
2963
+ obj_req->ex.oe_len - obj_overlap);
2964
+ }
2965
+ return true;
2966
+ default:
2967
+ BUG();
2968
+ }
2969
+}
2970
+
2971
+static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2972
+{
2973
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2974
+
2975
+ if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2976
+ obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2977
+
2978
+ if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2979
+ (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2980
+ dout("%s %p noop for nonexistent\n", __func__, obj_req);
2981
+ return true;
23212982 }
23222983
2323
- /*
2324
- * -ENOENT means a hole in the image -- zero-fill the entire
2325
- * length of the request. A short read also implies zero-fill
2326
- * to the end of the request. In both cases we update xferred
2327
- * count to indicate the whole request was satisfied.
2328
- */
2329
- if (obj_req->result == -ENOENT ||
2330
- (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2331
- rbd_assert(!obj_req->xferred || !obj_req->result);
2332
- rbd_obj_zero_range(obj_req, obj_req->xferred,
2333
- obj_req->ex.oe_len - obj_req->xferred);
2334
- obj_req->result = 0;
2335
- obj_req->xferred = obj_req->ex.oe_len;
2984
+ return false;
2985
+}
2986
+
2987
+/*
2988
+ * Return:
2989
+ * 0 - object map update sent
2990
+ * 1 - object map update isn't needed
2991
+ * <0 - error
2992
+ */
2993
+static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2994
+{
2995
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2996
+ u8 new_state;
2997
+
2998
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2999
+ return 1;
3000
+
3001
+ if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3002
+ new_state = OBJECT_PENDING;
3003
+ else
3004
+ new_state = OBJECT_EXISTS;
3005
+
3006
+ return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3007
+}
3008
+
3009
+static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3010
+{
3011
+ struct ceph_osd_request *osd_req;
3012
+ int num_ops = count_write_ops(obj_req);
3013
+ int which = 0;
3014
+ int ret;
3015
+
3016
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3017
+ num_ops++; /* stat */
3018
+
3019
+ osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3020
+ if (IS_ERR(osd_req))
3021
+ return PTR_ERR(osd_req);
3022
+
3023
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3024
+ ret = rbd_osd_setup_stat(osd_req, which++);
3025
+ if (ret)
3026
+ return ret;
23363027 }
23373028
2338
- return true;
3029
+ rbd_osd_setup_write_ops(osd_req, which);
3030
+ rbd_osd_format_write(osd_req);
3031
+
3032
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3033
+ if (ret)
3034
+ return ret;
3035
+
3036
+ rbd_osd_submit(osd_req);
3037
+ return 0;
23393038 }
23403039
23413040 /*
....@@ -2356,56 +3055,66 @@
23563055 return true;
23573056 }
23583057
2359
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
3058
+#define MODS_ONLY U32_MAX
3059
+
3060
+static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3061
+ u32 bytes)
23603062 {
2361
- unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
3063
+ struct ceph_osd_request *osd_req;
23623064 int ret;
23633065
23643066 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2365
- rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2366
- rbd_osd_req_destroy(obj_req->osd_req);
3067
+ rbd_assert(bytes > 0 && bytes != MODS_ONLY);
23673068
2368
- /*
2369
- * Create a copyup request with the same number of OSD ops as
2370
- * the original request. The original request was stat + op(s),
2371
- * the new copyup request will be copyup + the same op(s).
2372
- */
2373
- obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2374
- if (!obj_req->osd_req)
2375
- return -ENOMEM;
3069
+ osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3070
+ if (IS_ERR(osd_req))
3071
+ return PTR_ERR(osd_req);
23763072
2377
- ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2378
- "copyup");
3073
+ ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
23793074 if (ret)
23803075 return ret;
23813076
2382
- /*
2383
- * Only send non-zero copyup data to save some I/O and network
2384
- * bandwidth -- zero copyup data is equivalent to the object not
2385
- * existing.
2386
- */
2387
- if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2388
- dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2389
- bytes = 0;
2390
- }
2391
- osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2392
- obj_req->copyup_bvecs,
2393
- obj_req->copyup_bvec_count,
2394
- bytes);
3077
+ rbd_osd_format_write(osd_req);
23953078
2396
- switch (obj_req->img_request->op_type) {
2397
- case OBJ_OP_WRITE:
2398
- __rbd_obj_setup_write(obj_req, 1);
2399
- break;
2400
- case OBJ_OP_DISCARD:
2401
- rbd_assert(!rbd_obj_is_entire(obj_req));
2402
- __rbd_obj_setup_discard(obj_req, 1);
2403
- break;
2404
- default:
2405
- rbd_assert(0);
3079
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3080
+ if (ret)
3081
+ return ret;
3082
+
3083
+ rbd_osd_submit(osd_req);
3084
+ return 0;
3085
+}
3086
+
3087
+static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3088
+ u32 bytes)
3089
+{
3090
+ struct ceph_osd_request *osd_req;
3091
+ int num_ops = count_write_ops(obj_req);
3092
+ int which = 0;
3093
+ int ret;
3094
+
3095
+ dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3096
+
3097
+ if (bytes != MODS_ONLY)
3098
+ num_ops++; /* copyup */
3099
+
3100
+ osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3101
+ if (IS_ERR(osd_req))
3102
+ return PTR_ERR(osd_req);
3103
+
3104
+ if (bytes != MODS_ONLY) {
3105
+ ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3106
+ if (ret)
3107
+ return ret;
24063108 }
24073109
2408
- rbd_obj_request_submit(obj_req);
3110
+ rbd_osd_setup_write_ops(osd_req, which);
3111
+ rbd_osd_format_write(osd_req);
3112
+
3113
+ ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3114
+ if (ret)
3115
+ return ret;
3116
+
3117
+ rbd_osd_submit(osd_req);
24093118 return 0;
24103119 }
24113120
....@@ -2437,7 +3146,12 @@
24373146 return 0;
24383147 }
24393148
2440
-static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
3149
+/*
3150
+ * The target object doesn't exist. Read the data for the entire
3151
+ * target object up to the overlap point (if any) from the parent,
3152
+ * so we can use it for a copyup.
3153
+ */
3154
+static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
24413155 {
24423156 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
24433157 int ret;
....@@ -2448,174 +3162,496 @@
24483162 if (!obj_req->num_img_extents) {
24493163 /*
24503164 * The overlap has become 0 (most likely because the
2451
- * image has been flattened). Use rbd_obj_issue_copyup()
2452
- * to re-submit the original write request -- the copyup
2453
- * operation itself will be a no-op, since someone must
2454
- * have populated the child object while we weren't
2455
- * looking. Move to WRITE_FLAT state as we'll be done
2456
- * with the operation once the null copyup completes.
3165
+ * image has been flattened). Re-submit the original write
3166
+ * request -- pass MODS_ONLY since the copyup isn't needed
3167
+ * anymore.
24573168 */
2458
- obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2459
- return rbd_obj_issue_copyup(obj_req, 0);
3169
+ return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
24603170 }
24613171
24623172 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
24633173 if (ret)
24643174 return ret;
24653175
2466
- obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
24673176 return rbd_obj_read_from_parent(obj_req);
24683177 }
24693178
2470
-static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
3179
+static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
24713180 {
3181
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3182
+ struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3183
+ u8 new_state;
3184
+ u32 i;
3185
+ int ret;
3186
+
3187
+ rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3188
+
3189
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3190
+ return;
3191
+
3192
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3193
+ return;
3194
+
3195
+ for (i = 0; i < snapc->num_snaps; i++) {
3196
+ if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3197
+ i + 1 < snapc->num_snaps)
3198
+ new_state = OBJECT_EXISTS_CLEAN;
3199
+ else
3200
+ new_state = OBJECT_EXISTS;
3201
+
3202
+ ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3203
+ new_state, NULL);
3204
+ if (ret < 0) {
3205
+ obj_req->pending.result = ret;
3206
+ return;
3207
+ }
3208
+
3209
+ rbd_assert(!ret);
3210
+ obj_req->pending.num_pending++;
3211
+ }
3212
+}
3213
+
3214
+static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3215
+{
3216
+ u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3217
+ int ret;
3218
+
3219
+ rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3220
+
3221
+ /*
3222
+ * Only send non-zero copyup data to save some I/O and network
3223
+ * bandwidth -- zero copyup data is equivalent to the object not
3224
+ * existing.
3225
+ */
3226
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3227
+ bytes = 0;
3228
+
3229
+ if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3230
+ /*
3231
+ * Send a copyup request with an empty snapshot context to
3232
+ * deep-copyup the object through all existing snapshots.
3233
+ * A second request with the current snapshot context will be
3234
+ * sent for the actual modification.
3235
+ */
3236
+ ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3237
+ if (ret) {
3238
+ obj_req->pending.result = ret;
3239
+ return;
3240
+ }
3241
+
3242
+ obj_req->pending.num_pending++;
3243
+ bytes = MODS_ONLY;
3244
+ }
3245
+
3246
+ ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3247
+ if (ret) {
3248
+ obj_req->pending.result = ret;
3249
+ return;
3250
+ }
3251
+
3252
+ obj_req->pending.num_pending++;
3253
+}
3254
+
3255
+static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3256
+{
3257
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
24723258 int ret;
24733259
24743260 again:
2475
- switch (obj_req->write_state) {
2476
- case RBD_OBJ_WRITE_GUARD:
2477
- rbd_assert(!obj_req->xferred);
2478
- if (obj_req->result == -ENOENT) {
2479
- /*
2480
- * The target object doesn't exist. Read the data for
2481
- * the entire target object up to the overlap point (if
2482
- * any) from the parent, so we can use it for a copyup.
2483
- */
2484
- ret = rbd_obj_handle_write_guard(obj_req);
2485
- if (ret) {
2486
- obj_req->result = ret;
2487
- return true;
2488
- }
2489
- return false;
2490
- }
2491
- /* fall through */
2492
- case RBD_OBJ_WRITE_FLAT:
2493
- if (!obj_req->result)
2494
- /*
2495
- * There is no such thing as a successful short
2496
- * write -- indicate the whole request was satisfied.
2497
- */
2498
- obj_req->xferred = obj_req->ex.oe_len;
2499
- return true;
2500
- case RBD_OBJ_WRITE_COPYUP:
2501
- obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2502
- if (obj_req->result)
2503
- goto again;
3261
+ switch (obj_req->copyup_state) {
3262
+ case RBD_OBJ_COPYUP_START:
3263
+ rbd_assert(!*result);
25043264
2505
- rbd_assert(obj_req->xferred);
2506
- ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
3265
+ ret = rbd_obj_copyup_read_parent(obj_req);
25073266 if (ret) {
2508
- obj_req->result = ret;
2509
- obj_req->xferred = 0;
3267
+ *result = ret;
25103268 return true;
25113269 }
3270
+ if (obj_req->num_img_extents)
3271
+ obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3272
+ else
3273
+ obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
25123274 return false;
3275
+ case RBD_OBJ_COPYUP_READ_PARENT:
3276
+ if (*result)
3277
+ return true;
3278
+
3279
+ if (is_zero_bvecs(obj_req->copyup_bvecs,
3280
+ rbd_obj_img_extents_bytes(obj_req))) {
3281
+ dout("%s %p detected zeros\n", __func__, obj_req);
3282
+ obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3283
+ }
3284
+
3285
+ rbd_obj_copyup_object_maps(obj_req);
3286
+ if (!obj_req->pending.num_pending) {
3287
+ *result = obj_req->pending.result;
3288
+ obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3289
+ goto again;
3290
+ }
3291
+ obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3292
+ return false;
3293
+ case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3294
+ if (!pending_result_dec(&obj_req->pending, result))
3295
+ return false;
3296
+ fallthrough;
3297
+ case RBD_OBJ_COPYUP_OBJECT_MAPS:
3298
+ if (*result) {
3299
+ rbd_warn(rbd_dev, "snap object map update failed: %d",
3300
+ *result);
3301
+ return true;
3302
+ }
3303
+
3304
+ rbd_obj_copyup_write_object(obj_req);
3305
+ if (!obj_req->pending.num_pending) {
3306
+ *result = obj_req->pending.result;
3307
+ obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3308
+ goto again;
3309
+ }
3310
+ obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3311
+ return false;
3312
+ case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3313
+ if (!pending_result_dec(&obj_req->pending, result))
3314
+ return false;
3315
+ fallthrough;
3316
+ case RBD_OBJ_COPYUP_WRITE_OBJECT:
3317
+ return true;
25133318 default:
25143319 BUG();
25153320 }
25163321 }
25173322
25183323 /*
2519
- * Returns true if @obj_req is completed, or false otherwise.
3324
+ * Return:
3325
+ * 0 - object map update sent
3326
+ * 1 - object map update isn't needed
3327
+ * <0 - error
25203328 */
2521
-static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
3329
+static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
25223330 {
2523
- switch (obj_req->img_request->op_type) {
2524
- case OBJ_OP_READ:
2525
- return rbd_obj_handle_read(obj_req);
2526
- case OBJ_OP_WRITE:
2527
- return rbd_obj_handle_write(obj_req);
2528
- case OBJ_OP_DISCARD:
2529
- if (rbd_obj_handle_write(obj_req)) {
2530
- /*
2531
- * Hide -ENOENT from delete/truncate/zero -- discarding
2532
- * a non-existent object is not a problem.
2533
- */
2534
- if (obj_req->result == -ENOENT) {
2535
- obj_req->result = 0;
2536
- obj_req->xferred = obj_req->ex.oe_len;
2537
- }
3331
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3332
+ u8 current_state = OBJECT_PENDING;
3333
+
3334
+ if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3335
+ return 1;
3336
+
3337
+ if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3338
+ return 1;
3339
+
3340
+ return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3341
+ &current_state);
3342
+}
3343
+
3344
+static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3345
+{
3346
+ struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3347
+ int ret;
3348
+
3349
+again:
3350
+ switch (obj_req->write_state) {
3351
+ case RBD_OBJ_WRITE_START:
3352
+ rbd_assert(!*result);
3353
+
3354
+ if (rbd_obj_write_is_noop(obj_req))
3355
+ return true;
3356
+
3357
+ ret = rbd_obj_write_pre_object_map(obj_req);
3358
+ if (ret < 0) {
3359
+ *result = ret;
25383360 return true;
25393361 }
3362
+ obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3363
+ if (ret > 0)
3364
+ goto again;
25403365 return false;
3366
+ case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3367
+ if (*result) {
3368
+ rbd_warn(rbd_dev, "pre object map update failed: %d",
3369
+ *result);
3370
+ return true;
3371
+ }
3372
+ ret = rbd_obj_write_object(obj_req);
3373
+ if (ret) {
3374
+ *result = ret;
3375
+ return true;
3376
+ }
3377
+ obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3378
+ return false;
3379
+ case RBD_OBJ_WRITE_OBJECT:
3380
+ if (*result == -ENOENT) {
3381
+ if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3382
+ *result = 0;
3383
+ obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3384
+ obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3385
+ goto again;
3386
+ }
3387
+ /*
3388
+ * On a non-existent object:
3389
+ * delete - -ENOENT, truncate/zero - 0
3390
+ */
3391
+ if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3392
+ *result = 0;
3393
+ }
3394
+ if (*result)
3395
+ return true;
3396
+
3397
+ obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3398
+ goto again;
3399
+ case __RBD_OBJ_WRITE_COPYUP:
3400
+ if (!rbd_obj_advance_copyup(obj_req, result))
3401
+ return false;
3402
+ fallthrough;
3403
+ case RBD_OBJ_WRITE_COPYUP:
3404
+ if (*result) {
3405
+ rbd_warn(rbd_dev, "copyup failed: %d", *result);
3406
+ return true;
3407
+ }
3408
+ ret = rbd_obj_write_post_object_map(obj_req);
3409
+ if (ret < 0) {
3410
+ *result = ret;
3411
+ return true;
3412
+ }
3413
+ obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3414
+ if (ret > 0)
3415
+ goto again;
3416
+ return false;
3417
+ case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3418
+ if (*result)
3419
+ rbd_warn(rbd_dev, "post object map update failed: %d",
3420
+ *result);
3421
+ return true;
25413422 default:
25423423 BUG();
25433424 }
25443425 }
25453426
2546
-static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
3427
+/*
3428
+ * Return true if @obj_req is completed.
3429
+ */
3430
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3431
+ int *result)
25473432 {
25483433 struct rbd_img_request *img_req = obj_req->img_request;
3434
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3435
+ bool done;
25493436
2550
- rbd_assert((!obj_req->result &&
2551
- obj_req->xferred == obj_req->ex.oe_len) ||
2552
- (obj_req->result < 0 && !obj_req->xferred));
2553
- if (!obj_req->result) {
2554
- img_req->xferred += obj_req->xferred;
2555
- return;
2556
- }
3437
+ mutex_lock(&obj_req->state_mutex);
3438
+ if (!rbd_img_is_write(img_req))
3439
+ done = rbd_obj_advance_read(obj_req, result);
3440
+ else
3441
+ done = rbd_obj_advance_write(obj_req, result);
3442
+ mutex_unlock(&obj_req->state_mutex);
25573443
2558
- rbd_warn(img_req->rbd_dev,
2559
- "%s at objno %llu %llu~%llu result %d xferred %llu",
2560
- obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2561
- obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2562
- obj_req->xferred);
2563
- if (!img_req->result) {
2564
- img_req->result = obj_req->result;
2565
- img_req->xferred = 0;
3444
+ if (done && *result) {
3445
+ rbd_assert(*result < 0);
3446
+ rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3447
+ obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3448
+ obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
25663449 }
3450
+ return done;
25673451 }
25683452
2569
-static void rbd_img_end_child_request(struct rbd_img_request *img_req)
3453
+/*
3454
+ * This is open-coded in rbd_img_handle_request() to avoid parent chain
3455
+ * recursion.
3456
+ */
3457
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
25703458 {
2571
- struct rbd_obj_request *obj_req = img_req->obj_request;
2572
-
2573
- rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2574
- rbd_assert((!img_req->result &&
2575
- img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2576
- (img_req->result < 0 && !img_req->xferred));
2577
-
2578
- obj_req->result = img_req->result;
2579
- obj_req->xferred = img_req->xferred;
2580
- rbd_img_request_put(img_req);
3459
+ if (__rbd_obj_handle_request(obj_req, &result))
3460
+ rbd_img_handle_request(obj_req->img_request, result);
25813461 }
25823462
2583
-static void rbd_img_end_request(struct rbd_img_request *img_req)
3463
+static bool need_exclusive_lock(struct rbd_img_request *img_req)
25843464 {
3465
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3466
+
3467
+ if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3468
+ return false;
3469
+
3470
+ if (rbd_is_ro(rbd_dev))
3471
+ return false;
3472
+
25853473 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2586
- rbd_assert((!img_req->result &&
2587
- img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2588
- (img_req->result < 0 && !img_req->xferred));
3474
+ if (rbd_dev->opts->lock_on_read ||
3475
+ (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3476
+ return true;
25893477
2590
- blk_mq_end_request(img_req->rq,
2591
- errno_to_blk_status(img_req->result));
2592
- rbd_img_request_put(img_req);
3478
+ return rbd_img_is_write(img_req);
25933479 }
25943480
2595
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
3481
+static bool rbd_lock_add_request(struct rbd_img_request *img_req)
25963482 {
2597
- struct rbd_img_request *img_req;
3483
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3484
+ bool locked;
3485
+
3486
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
3487
+ locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3488
+ spin_lock(&rbd_dev->lock_lists_lock);
3489
+ rbd_assert(list_empty(&img_req->lock_item));
3490
+ if (!locked)
3491
+ list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3492
+ else
3493
+ list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3494
+ spin_unlock(&rbd_dev->lock_lists_lock);
3495
+ return locked;
3496
+}
3497
+
3498
+static void rbd_lock_del_request(struct rbd_img_request *img_req)
3499
+{
3500
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3501
+ bool need_wakeup;
3502
+
3503
+ lockdep_assert_held(&rbd_dev->lock_rwsem);
3504
+ spin_lock(&rbd_dev->lock_lists_lock);
3505
+ rbd_assert(!list_empty(&img_req->lock_item));
3506
+ list_del_init(&img_req->lock_item);
3507
+ need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3508
+ list_empty(&rbd_dev->running_list));
3509
+ spin_unlock(&rbd_dev->lock_lists_lock);
3510
+ if (need_wakeup)
3511
+ complete(&rbd_dev->releasing_wait);
3512
+}
3513
+
3514
+static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3515
+{
3516
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3517
+
3518
+ if (!need_exclusive_lock(img_req))
3519
+ return 1;
3520
+
3521
+ if (rbd_lock_add_request(img_req))
3522
+ return 1;
3523
+
3524
+ if (rbd_dev->opts->exclusive) {
3525
+ WARN_ON(1); /* lock got released? */
3526
+ return -EROFS;
3527
+ }
3528
+
3529
+ /*
3530
+ * Note the use of mod_delayed_work() in rbd_acquire_lock()
3531
+ * and cancel_delayed_work() in wake_lock_waiters().
3532
+ */
3533
+ dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3534
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3535
+ return 0;
3536
+}
3537
+
3538
+static void rbd_img_object_requests(struct rbd_img_request *img_req)
3539
+{
3540
+ struct rbd_obj_request *obj_req;
3541
+
3542
+ rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3543
+
3544
+ for_each_obj_request(img_req, obj_req) {
3545
+ int result = 0;
3546
+
3547
+ if (__rbd_obj_handle_request(obj_req, &result)) {
3548
+ if (result) {
3549
+ img_req->pending.result = result;
3550
+ return;
3551
+ }
3552
+ } else {
3553
+ img_req->pending.num_pending++;
3554
+ }
3555
+ }
3556
+}
3557
+
3558
+static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3559
+{
3560
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3561
+ int ret;
25983562
25993563 again:
2600
- if (!__rbd_obj_handle_request(obj_req))
2601
- return;
3564
+ switch (img_req->state) {
3565
+ case RBD_IMG_START:
3566
+ rbd_assert(!*result);
26023567
2603
- img_req = obj_req->img_request;
2604
- spin_lock(&img_req->completion_lock);
2605
- rbd_obj_end_request(obj_req);
2606
- rbd_assert(img_req->pending_count);
2607
- if (--img_req->pending_count) {
2608
- spin_unlock(&img_req->completion_lock);
2609
- return;
3568
+ ret = rbd_img_exclusive_lock(img_req);
3569
+ if (ret < 0) {
3570
+ *result = ret;
3571
+ return true;
3572
+ }
3573
+ img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3574
+ if (ret > 0)
3575
+ goto again;
3576
+ return false;
3577
+ case RBD_IMG_EXCLUSIVE_LOCK:
3578
+ if (*result)
3579
+ return true;
3580
+
3581
+ rbd_assert(!need_exclusive_lock(img_req) ||
3582
+ __rbd_is_lock_owner(rbd_dev));
3583
+
3584
+ rbd_img_object_requests(img_req);
3585
+ if (!img_req->pending.num_pending) {
3586
+ *result = img_req->pending.result;
3587
+ img_req->state = RBD_IMG_OBJECT_REQUESTS;
3588
+ goto again;
3589
+ }
3590
+ img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3591
+ return false;
3592
+ case __RBD_IMG_OBJECT_REQUESTS:
3593
+ if (!pending_result_dec(&img_req->pending, result))
3594
+ return false;
3595
+ fallthrough;
3596
+ case RBD_IMG_OBJECT_REQUESTS:
3597
+ return true;
3598
+ default:
3599
+ BUG();
3600
+ }
3601
+}
3602
+
3603
+/*
3604
+ * Return true if @img_req is completed.
3605
+ */
3606
+static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3607
+ int *result)
3608
+{
3609
+ struct rbd_device *rbd_dev = img_req->rbd_dev;
3610
+ bool done;
3611
+
3612
+ if (need_exclusive_lock(img_req)) {
3613
+ down_read(&rbd_dev->lock_rwsem);
3614
+ mutex_lock(&img_req->state_mutex);
3615
+ done = rbd_img_advance(img_req, result);
3616
+ if (done)
3617
+ rbd_lock_del_request(img_req);
3618
+ mutex_unlock(&img_req->state_mutex);
3619
+ up_read(&rbd_dev->lock_rwsem);
3620
+ } else {
3621
+ mutex_lock(&img_req->state_mutex);
3622
+ done = rbd_img_advance(img_req, result);
3623
+ mutex_unlock(&img_req->state_mutex);
26103624 }
26113625
2612
- spin_unlock(&img_req->completion_lock);
3626
+ if (done && *result) {
3627
+ rbd_assert(*result < 0);
3628
+ rbd_warn(rbd_dev, "%s%s result %d",
3629
+ test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3630
+ obj_op_name(img_req->op_type), *result);
3631
+ }
3632
+ return done;
3633
+}
3634
+
3635
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3636
+{
3637
+again:
3638
+ if (!__rbd_img_handle_request(img_req, &result))
3639
+ return;
3640
+
26133641 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2614
- obj_req = img_req->obj_request;
2615
- rbd_img_end_child_request(img_req);
2616
- goto again;
3642
+ struct rbd_obj_request *obj_req = img_req->obj_request;
3643
+
3644
+ rbd_img_request_destroy(img_req);
3645
+ if (__rbd_obj_handle_request(obj_req, &result)) {
3646
+ img_req = obj_req->img_request;
3647
+ goto again;
3648
+ }
3649
+ } else {
3650
+ struct request *rq = blk_mq_rq_from_pdu(img_req);
3651
+
3652
+ rbd_img_request_destroy(img_req);
3653
+ blk_mq_end_request(rq, errno_to_blk_status(result));
26173654 }
2618
- rbd_img_end_request(img_req);
26193655 }
26203656
26213657 static const struct rbd_client_id rbd_empty_cid;
....@@ -2660,6 +3696,7 @@
26603696 {
26613697 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
26623698
3699
+ rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
26633700 strcpy(rbd_dev->lock_cookie, cookie);
26643701 rbd_set_owner_cid(rbd_dev, &cid);
26653702 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
....@@ -2684,7 +3721,6 @@
26843721 if (ret)
26853722 return ret;
26863723
2687
- rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
26883724 __rbd_lock(rbd_dev, cookie);
26893725 return 0;
26903726 }
....@@ -2703,7 +3739,7 @@
27033739 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
27043740 RBD_LOCK_NAME, rbd_dev->lock_cookie);
27053741 if (ret && ret != -ENOENT)
2706
- rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3742
+ rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
27073743
27083744 /* treat errors as the image is unlocked */
27093745 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
....@@ -2739,11 +3775,7 @@
27393775 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
27403776 enum rbd_notify_op notify_op)
27413777 {
2742
- struct page **reply_pages;
2743
- size_t reply_len;
2744
-
2745
- __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2746
- ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3778
+ __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
27473779 }
27483780
27493781 static void rbd_notify_acquired_lock(struct work_struct *work)
....@@ -2830,15 +3862,34 @@
28303862 goto out;
28313863 }
28323864
2833
-static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3865
+/*
3866
+ * Either image request state machine(s) or rbd_add_acquire_lock()
3867
+ * (i.e. "rbd map").
3868
+ */
3869
+static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
28343870 {
2835
- dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3871
+ struct rbd_img_request *img_req;
3872
+
3873
+ dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3874
+ lockdep_assert_held_write(&rbd_dev->lock_rwsem);
28363875
28373876 cancel_delayed_work(&rbd_dev->lock_dwork);
2838
- if (wake_all)
2839
- wake_up_all(&rbd_dev->lock_waitq);
2840
- else
2841
- wake_up(&rbd_dev->lock_waitq);
3877
+ if (!completion_done(&rbd_dev->acquire_wait)) {
3878
+ rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3879
+ list_empty(&rbd_dev->running_list));
3880
+ rbd_dev->acquire_err = result;
3881
+ complete_all(&rbd_dev->acquire_wait);
3882
+ return;
3883
+ }
3884
+
3885
+ list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3886
+ mutex_lock(&img_req->state_mutex);
3887
+ rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3888
+ rbd_img_schedule(img_req, result);
3889
+ mutex_unlock(&img_req->state_mutex);
3890
+ }
3891
+
3892
+ list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
28423893 }
28433894
28443895 static int get_lock_owner_info(struct rbd_device *rbd_dev,
....@@ -2953,19 +4004,16 @@
29534004 goto again;
29544005
29554006 ret = find_watcher(rbd_dev, lockers);
2956
- if (ret) {
2957
- if (ret > 0)
2958
- ret = 0; /* have to request lock */
2959
- goto out;
2960
- }
4007
+ if (ret)
4008
+ goto out; /* request lock or error */
29614009
2962
- rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
4010
+ rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
29634011 ENTITY_NAME(lockers[0].id.name));
29644012
2965
- ret = ceph_monc_blacklist_add(&client->monc,
4013
+ ret = ceph_monc_blocklist_add(&client->monc,
29664014 &lockers[0].info.addr);
29674015 if (ret) {
2968
- rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4016
+ rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
29694017 ENTITY_NAME(lockers[0].id.name), ret);
29704018 goto out;
29714019 }
....@@ -2986,53 +4034,90 @@
29864034 return ret;
29874035 }
29884036
2989
-/*
2990
- * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2991
- */
2992
-static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2993
- int *pret)
4037
+static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
29944038 {
2995
- enum rbd_lock_state lock_state;
4039
+ int ret;
4040
+
4041
+ if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4042
+ ret = rbd_object_map_open(rbd_dev);
4043
+ if (ret)
4044
+ return ret;
4045
+ }
4046
+
4047
+ return 0;
4048
+}
4049
+
4050
+/*
4051
+ * Return:
4052
+ * 0 - lock acquired
4053
+ * 1 - caller should call rbd_request_lock()
4054
+ * <0 - error
4055
+ */
4056
+static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4057
+{
4058
+ int ret;
29964059
29974060 down_read(&rbd_dev->lock_rwsem);
29984061 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
29994062 rbd_dev->lock_state);
30004063 if (__rbd_is_lock_owner(rbd_dev)) {
3001
- lock_state = rbd_dev->lock_state;
30024064 up_read(&rbd_dev->lock_rwsem);
3003
- return lock_state;
4065
+ return 0;
30044066 }
30054067
30064068 up_read(&rbd_dev->lock_rwsem);
30074069 down_write(&rbd_dev->lock_rwsem);
30084070 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
30094071 rbd_dev->lock_state);
3010
- if (!__rbd_is_lock_owner(rbd_dev)) {
3011
- *pret = rbd_try_lock(rbd_dev);
3012
- if (*pret)
3013
- rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
4072
+ if (__rbd_is_lock_owner(rbd_dev)) {
4073
+ up_write(&rbd_dev->lock_rwsem);
4074
+ return 0;
30144075 }
30154076
3016
- lock_state = rbd_dev->lock_state;
4077
+ ret = rbd_try_lock(rbd_dev);
4078
+ if (ret < 0) {
4079
+ rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4080
+ if (ret == -EBLOCKLISTED)
4081
+ goto out;
4082
+
4083
+ ret = 1; /* request lock anyway */
4084
+ }
4085
+ if (ret > 0) {
4086
+ up_write(&rbd_dev->lock_rwsem);
4087
+ return ret;
4088
+ }
4089
+
4090
+ rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4091
+ rbd_assert(list_empty(&rbd_dev->running_list));
4092
+
4093
+ ret = rbd_post_acquire_action(rbd_dev);
4094
+ if (ret) {
4095
+ rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4096
+ /*
4097
+ * Can't stay in RBD_LOCK_STATE_LOCKED because
4098
+ * rbd_lock_add_request() would let the request through,
4099
+ * assuming that e.g. object map is locked and loaded.
4100
+ */
4101
+ rbd_unlock(rbd_dev);
4102
+ }
4103
+
4104
+out:
4105
+ wake_lock_waiters(rbd_dev, ret);
30174106 up_write(&rbd_dev->lock_rwsem);
3018
- return lock_state;
4107
+ return ret;
30194108 }
30204109
30214110 static void rbd_acquire_lock(struct work_struct *work)
30224111 {
30234112 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
30244113 struct rbd_device, lock_dwork);
3025
- enum rbd_lock_state lock_state;
3026
- int ret = 0;
4114
+ int ret;
30274115
30284116 dout("%s rbd_dev %p\n", __func__, rbd_dev);
30294117 again:
3030
- lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3031
- if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3032
- if (lock_state == RBD_LOCK_STATE_LOCKED)
3033
- wake_requests(rbd_dev, true);
3034
- dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3035
- rbd_dev, lock_state, ret);
4118
+ ret = rbd_try_acquire_lock(rbd_dev);
4119
+ if (ret <= 0) {
4120
+ dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
30364121 return;
30374122 }
30384123
....@@ -3041,16 +4126,9 @@
30414126 goto again; /* treat this as a dead client */
30424127 } else if (ret == -EROFS) {
30434128 rbd_warn(rbd_dev, "peer will not release lock");
3044
- /*
3045
- * If this is rbd_add_acquire_lock(), we want to fail
3046
- * immediately -- reuse BLACKLISTED flag. Otherwise we
3047
- * want to block.
3048
- */
3049
- if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3050
- set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3051
- /* wake "rbd map --exclusive" process */
3052
- wake_requests(rbd_dev, false);
3053
- }
4129
+ down_write(&rbd_dev->lock_rwsem);
4130
+ wake_lock_waiters(rbd_dev, ret);
4131
+ up_write(&rbd_dev->lock_rwsem);
30544132 } else if (ret < 0) {
30554133 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
30564134 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
....@@ -3060,50 +4138,72 @@
30604138 * lock owner acked, but resend if we don't see them
30614139 * release the lock
30624140 */
3063
- dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
4141
+ dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
30644142 rbd_dev);
30654143 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
30664144 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
30674145 }
30684146 }
30694147
3070
-/*
3071
- * lock_rwsem must be held for write
3072
- */
3073
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
4148
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
30744149 {
3075
- dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3076
- rbd_dev->lock_state);
4150
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
4151
+ lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4152
+
30774153 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
30784154 return false;
30794155
3080
- rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3081
- downgrade_write(&rbd_dev->lock_rwsem);
30824156 /*
30834157 * Ensure that all in-flight IO is flushed.
3084
- *
3085
- * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3086
- * may be shared with other devices.
30874158 */
3088
- ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3089
- up_read(&rbd_dev->lock_rwsem);
4159
+ rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4160
+ rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4161
+ if (list_empty(&rbd_dev->running_list))
4162
+ return true;
4163
+
4164
+ up_write(&rbd_dev->lock_rwsem);
4165
+ wait_for_completion(&rbd_dev->releasing_wait);
30904166
30914167 down_write(&rbd_dev->lock_rwsem);
3092
- dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3093
- rbd_dev->lock_state);
30944168 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
30954169 return false;
30964170
4171
+ rbd_assert(list_empty(&rbd_dev->running_list));
4172
+ return true;
4173
+}
4174
+
4175
+static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4176
+{
4177
+ if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4178
+ rbd_object_map_close(rbd_dev);
4179
+}
4180
+
4181
+static void __rbd_release_lock(struct rbd_device *rbd_dev)
4182
+{
4183
+ rbd_assert(list_empty(&rbd_dev->running_list));
4184
+
4185
+ rbd_pre_release_action(rbd_dev);
30974186 rbd_unlock(rbd_dev);
4187
+}
4188
+
4189
+/*
4190
+ * lock_rwsem must be held for write
4191
+ */
4192
+static void rbd_release_lock(struct rbd_device *rbd_dev)
4193
+{
4194
+ if (!rbd_quiesce_lock(rbd_dev))
4195
+ return;
4196
+
4197
+ __rbd_release_lock(rbd_dev);
4198
+
30984199 /*
30994200 * Give others a chance to grab the lock - we would re-acquire
3100
- * almost immediately if we got new IO during ceph_osdc_sync()
3101
- * otherwise. We need to ack our own notifications, so this
3102
- * lock_dwork will be requeued from rbd_wait_state_locked()
3103
- * after wake_requests() in rbd_handle_released_lock().
4201
+ * almost immediately if we got new IO while draining the running
4202
+ * list otherwise. We need to ack our own notifications, so this
4203
+ * lock_dwork will be requeued from rbd_handle_released_lock() by
4204
+ * way of maybe_kick_acquire().
31044205 */
31054206 cancel_delayed_work(&rbd_dev->lock_dwork);
3106
- return true;
31074207 }
31084208
31094209 static void rbd_release_lock_work(struct work_struct *work)
....@@ -3114,6 +4214,23 @@
31144214 down_write(&rbd_dev->lock_rwsem);
31154215 rbd_release_lock(rbd_dev);
31164216 up_write(&rbd_dev->lock_rwsem);
4217
+}
4218
+
4219
+static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4220
+{
4221
+ bool have_requests;
4222
+
4223
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
4224
+ if (__rbd_is_lock_owner(rbd_dev))
4225
+ return;
4226
+
4227
+ spin_lock(&rbd_dev->lock_lists_lock);
4228
+ have_requests = !list_empty(&rbd_dev->acquiring_list);
4229
+ spin_unlock(&rbd_dev->lock_lists_lock);
4230
+ if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4231
+ dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4232
+ mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4233
+ }
31174234 }
31184235
31194236 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
....@@ -3131,22 +4248,17 @@
31314248 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
31324249 down_write(&rbd_dev->lock_rwsem);
31334250 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3134
- /*
3135
- * we already know that the remote client is
3136
- * the owner
3137
- */
3138
- up_write(&rbd_dev->lock_rwsem);
3139
- return;
4251
+ dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4252
+ __func__, rbd_dev, cid.gid, cid.handle);
4253
+ } else {
4254
+ rbd_set_owner_cid(rbd_dev, &cid);
31404255 }
3141
-
3142
- rbd_set_owner_cid(rbd_dev, &cid);
31434256 downgrade_write(&rbd_dev->lock_rwsem);
31444257 } else {
31454258 down_read(&rbd_dev->lock_rwsem);
31464259 }
31474260
3148
- if (!__rbd_is_lock_owner(rbd_dev))
3149
- wake_requests(rbd_dev, false);
4261
+ maybe_kick_acquire(rbd_dev);
31504262 up_read(&rbd_dev->lock_rwsem);
31514263 }
31524264
....@@ -3165,21 +4277,18 @@
31654277 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
31664278 down_write(&rbd_dev->lock_rwsem);
31674279 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3168
- dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4280
+ dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
31694281 __func__, rbd_dev, cid.gid, cid.handle,
31704282 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3171
- up_write(&rbd_dev->lock_rwsem);
3172
- return;
4283
+ } else {
4284
+ rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
31734285 }
3174
-
3175
- rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
31764286 downgrade_write(&rbd_dev->lock_rwsem);
31774287 } else {
31784288 down_read(&rbd_dev->lock_rwsem);
31794289 }
31804290
3181
- if (!__rbd_is_lock_owner(rbd_dev))
3182
- wake_requests(rbd_dev, false);
4291
+ maybe_kick_acquire(rbd_dev);
31834292 up_read(&rbd_dev->lock_rwsem);
31844293 }
31854294
....@@ -3433,7 +4542,6 @@
34334542 */
34344543 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
34354544 {
3436
- WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
34374545 cancel_tasks_sync(rbd_dev);
34384546
34394547 mutex_lock(&rbd_dev->watch_mutex);
....@@ -3455,7 +4563,8 @@
34554563 char cookie[32];
34564564 int ret;
34574565
3458
- WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4566
+ if (!rbd_quiesce_lock(rbd_dev))
4567
+ return;
34594568
34604569 format_lock_cookie(rbd_dev, cookie);
34614570 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
....@@ -3471,11 +4580,11 @@
34714580 * Lock cookie cannot be updated on older OSDs, so do
34724581 * a manual release and queue an acquire.
34734582 */
3474
- if (rbd_release_lock(rbd_dev))
3475
- queue_delayed_work(rbd_dev->task_wq,
3476
- &rbd_dev->lock_dwork, 0);
4583
+ __rbd_release_lock(rbd_dev);
4584
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
34774585 } else {
34784586 __rbd_lock(rbd_dev, cookie);
4587
+ wake_lock_waiters(rbd_dev, 0);
34794588 }
34804589 }
34814590
....@@ -3496,15 +4605,18 @@
34964605 ret = __rbd_register_watch(rbd_dev);
34974606 if (ret) {
34984607 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3499
- if (ret == -EBLACKLISTED || ret == -ENOENT) {
3500
- set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3501
- wake_requests(rbd_dev, true);
3502
- } else {
4608
+ if (ret != -EBLOCKLISTED && ret != -ENOENT) {
35034609 queue_delayed_work(rbd_dev->task_wq,
35044610 &rbd_dev->watch_dwork,
35054611 RBD_RETRY_DELAY);
4612
+ mutex_unlock(&rbd_dev->watch_mutex);
4613
+ return;
35064614 }
4615
+
35074616 mutex_unlock(&rbd_dev->watch_mutex);
4617
+ down_write(&rbd_dev->lock_rwsem);
4618
+ wake_lock_waiters(rbd_dev, ret);
4619
+ up_write(&rbd_dev->lock_rwsem);
35084620 return;
35094621 }
35104622
....@@ -3567,7 +4679,7 @@
35674679
35684680 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
35694681 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3570
- reply_page, &inbound_size);
4682
+ &reply_page, &inbound_size);
35714683 if (!ret) {
35724684 memcpy(inbound, page_address(reply_page), inbound_size);
35734685 ret = inbound_size;
....@@ -3579,71 +4691,74 @@
35794691 return ret;
35804692 }
35814693
3582
-/*
3583
- * lock_rwsem must be held for read
3584
- */
3585
-static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
3586
-{
3587
- DEFINE_WAIT(wait);
3588
- unsigned long timeout;
3589
- int ret = 0;
3590
-
3591
- if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3592
- return -EBLACKLISTED;
3593
-
3594
- if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3595
- return 0;
3596
-
3597
- if (!may_acquire) {
3598
- rbd_warn(rbd_dev, "exclusive lock required");
3599
- return -EROFS;
3600
- }
3601
-
3602
- do {
3603
- /*
3604
- * Note the use of mod_delayed_work() in rbd_acquire_lock()
3605
- * and cancel_delayed_work() in wake_requests().
3606
- */
3607
- dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3608
- queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3609
- prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3610
- TASK_UNINTERRUPTIBLE);
3611
- up_read(&rbd_dev->lock_rwsem);
3612
- timeout = schedule_timeout(ceph_timeout_jiffies(
3613
- rbd_dev->opts->lock_timeout));
3614
- down_read(&rbd_dev->lock_rwsem);
3615
- if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3616
- ret = -EBLACKLISTED;
3617
- break;
3618
- }
3619
- if (!timeout) {
3620
- rbd_warn(rbd_dev, "timed out waiting for lock");
3621
- ret = -ETIMEDOUT;
3622
- break;
3623
- }
3624
- } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3625
-
3626
- finish_wait(&rbd_dev->lock_waitq, &wait);
3627
- return ret;
3628
-}
3629
-
36304694 static void rbd_queue_workfn(struct work_struct *work)
36314695 {
3632
- struct request *rq = blk_mq_rq_from_pdu(work);
3633
- struct rbd_device *rbd_dev = rq->q->queuedata;
3634
- struct rbd_img_request *img_request;
3635
- struct ceph_snap_context *snapc = NULL;
4696
+ struct rbd_img_request *img_request =
4697
+ container_of(work, struct rbd_img_request, work);
4698
+ struct rbd_device *rbd_dev = img_request->rbd_dev;
4699
+ enum obj_operation_type op_type = img_request->op_type;
4700
+ struct request *rq = blk_mq_rq_from_pdu(img_request);
36364701 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
36374702 u64 length = blk_rq_bytes(rq);
3638
- enum obj_operation_type op_type;
36394703 u64 mapping_size;
3640
- bool must_be_locked;
36414704 int result;
36424705
3643
- switch (req_op(rq)) {
4706
+ /* Ignore/skip any zero-length requests */
4707
+ if (!length) {
4708
+ dout("%s: zero-length request\n", __func__);
4709
+ result = 0;
4710
+ goto err_img_request;
4711
+ }
4712
+
4713
+ blk_mq_start_request(rq);
4714
+
4715
+ down_read(&rbd_dev->header_rwsem);
4716
+ mapping_size = rbd_dev->mapping.size;
4717
+ rbd_img_capture_header(img_request);
4718
+ up_read(&rbd_dev->header_rwsem);
4719
+
4720
+ if (offset + length > mapping_size) {
4721
+ rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4722
+ length, mapping_size);
4723
+ result = -EIO;
4724
+ goto err_img_request;
4725
+ }
4726
+
4727
+ dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4728
+ img_request, obj_op_name(op_type), offset, length);
4729
+
4730
+ if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4731
+ result = rbd_img_fill_nodata(img_request, offset, length);
4732
+ else
4733
+ result = rbd_img_fill_from_bio(img_request, offset, length,
4734
+ rq->bio);
4735
+ if (result)
4736
+ goto err_img_request;
4737
+
4738
+ rbd_img_handle_request(img_request, 0);
4739
+ return;
4740
+
4741
+err_img_request:
4742
+ rbd_img_request_destroy(img_request);
4743
+ if (result)
4744
+ rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4745
+ obj_op_name(op_type), length, offset, result);
4746
+ blk_mq_end_request(rq, errno_to_blk_status(result));
4747
+}
4748
+
4749
+static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4750
+ const struct blk_mq_queue_data *bd)
4751
+{
4752
+ struct rbd_device *rbd_dev = hctx->queue->queuedata;
4753
+ struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4754
+ enum obj_operation_type op_type;
4755
+
4756
+ switch (req_op(bd->rq)) {
36444757 case REQ_OP_DISCARD:
3645
- case REQ_OP_WRITE_ZEROES:
36464758 op_type = OBJ_OP_DISCARD;
4759
+ break;
4760
+ case REQ_OP_WRITE_ZEROES:
4761
+ op_type = OBJ_OP_ZEROOUT;
36474762 break;
36484763 case REQ_OP_WRITE:
36494764 op_type = OBJ_OP_WRITE;
....@@ -3652,112 +4767,23 @@
36524767 op_type = OBJ_OP_READ;
36534768 break;
36544769 default:
3655
- dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3656
- result = -EIO;
3657
- goto err;
4770
+ rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4771
+ return BLK_STS_IOERR;
36584772 }
36594773
3660
- /* Ignore/skip any zero-length requests */
4774
+ rbd_img_request_init(img_req, rbd_dev, op_type);
36614775
3662
- if (!length) {
3663
- dout("%s: zero-length request\n", __func__);
3664
- result = 0;
3665
- goto err_rq;
4776
+ if (rbd_img_is_write(img_req)) {
4777
+ if (rbd_is_ro(rbd_dev)) {
4778
+ rbd_warn(rbd_dev, "%s on read-only mapping",
4779
+ obj_op_name(img_req->op_type));
4780
+ return BLK_STS_IOERR;
4781
+ }
4782
+ rbd_assert(!rbd_is_snap(rbd_dev));
36664783 }
36674784
3668
- rbd_assert(op_type == OBJ_OP_READ ||
3669
- rbd_dev->spec->snap_id == CEPH_NOSNAP);
3670
-
3671
- /*
3672
- * Quit early if the mapped snapshot no longer exists. It's
3673
- * still possible the snapshot will have disappeared by the
3674
- * time our request arrives at the osd, but there's no sense in
3675
- * sending it if we already know.
3676
- */
3677
- if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3678
- dout("request for non-existent snapshot");
3679
- rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3680
- result = -ENXIO;
3681
- goto err_rq;
3682
- }
3683
-
3684
- if (offset && length > U64_MAX - offset + 1) {
3685
- rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3686
- length);
3687
- result = -EINVAL;
3688
- goto err_rq; /* Shouldn't happen */
3689
- }
3690
-
3691
- blk_mq_start_request(rq);
3692
-
3693
- down_read(&rbd_dev->header_rwsem);
3694
- mapping_size = rbd_dev->mapping.size;
3695
- if (op_type != OBJ_OP_READ) {
3696
- snapc = rbd_dev->header.snapc;
3697
- ceph_get_snap_context(snapc);
3698
- }
3699
- up_read(&rbd_dev->header_rwsem);
3700
-
3701
- if (offset + length > mapping_size) {
3702
- rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3703
- length, mapping_size);
3704
- result = -EIO;
3705
- goto err_rq;
3706
- }
3707
-
3708
- must_be_locked =
3709
- (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3710
- (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
3711
- if (must_be_locked) {
3712
- down_read(&rbd_dev->lock_rwsem);
3713
- result = rbd_wait_state_locked(rbd_dev,
3714
- !rbd_dev->opts->exclusive);
3715
- if (result)
3716
- goto err_unlock;
3717
- }
3718
-
3719
- img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
3720
- if (!img_request) {
3721
- result = -ENOMEM;
3722
- goto err_unlock;
3723
- }
3724
- img_request->rq = rq;
3725
- snapc = NULL; /* img_request consumes a ref */
3726
-
3727
- if (op_type == OBJ_OP_DISCARD)
3728
- result = rbd_img_fill_nodata(img_request, offset, length);
3729
- else
3730
- result = rbd_img_fill_from_bio(img_request, offset, length,
3731
- rq->bio);
3732
- if (result)
3733
- goto err_img_request;
3734
-
3735
- rbd_img_request_submit(img_request);
3736
- if (must_be_locked)
3737
- up_read(&rbd_dev->lock_rwsem);
3738
- return;
3739
-
3740
-err_img_request:
3741
- rbd_img_request_put(img_request);
3742
-err_unlock:
3743
- if (must_be_locked)
3744
- up_read(&rbd_dev->lock_rwsem);
3745
-err_rq:
3746
- if (result)
3747
- rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3748
- obj_op_name(op_type), length, offset, result);
3749
- ceph_put_snap_context(snapc);
3750
-err:
3751
- blk_mq_end_request(rq, errno_to_blk_status(result));
3752
-}
3753
-
3754
-static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3755
- const struct blk_mq_queue_data *bd)
3756
-{
3757
- struct request *rq = bd->rq;
3758
- struct work_struct *work = blk_mq_rq_to_pdu(rq);
3759
-
3760
- queue_work(rbd_wq, work);
4785
+ INIT_WORK(&img_req->work, rbd_queue_workfn);
4786
+ queue_work(rbd_wq, &img_req->work);
37614787 return BLK_STS_OK;
37624788 }
37634789
....@@ -3789,10 +4815,6 @@
37894815 ceph_oloc_copy(&req->r_base_oloc, oloc);
37904816 req->r_flags = CEPH_OSD_FLAG_READ;
37914817
3792
- ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3793
- if (ret)
3794
- goto out_req;
3795
-
37964818 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
37974819 if (IS_ERR(pages)) {
37984820 ret = PTR_ERR(pages);
....@@ -3802,6 +4824,10 @@
38024824 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
38034825 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
38044826 true);
4827
+
4828
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4829
+ if (ret)
4830
+ goto out_req;
38054831
38064832 ceph_osdc_start_request(osdc, req, false);
38074833 ret = ceph_osdc_wait_request(osdc, req);
....@@ -3873,25 +4899,6 @@
38734899 return ret;
38744900 }
38754901
3876
-/*
3877
- * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3878
- * has disappeared from the (just updated) snapshot context.
3879
- */
3880
-static void rbd_exists_validate(struct rbd_device *rbd_dev)
3881
-{
3882
- u64 snap_id;
3883
-
3884
- if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3885
- return;
3886
-
3887
- snap_id = rbd_dev->spec->snap_id;
3888
- if (snap_id == CEPH_NOSNAP)
3889
- return;
3890
-
3891
- if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3892
- clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3893
-}
3894
-
38954902 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
38964903 {
38974904 sector_t size;
....@@ -3906,7 +4913,7 @@
39064913 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
39074914 dout("setting size to %llu sectors", (unsigned long long)size);
39084915 set_capacity(rbd_dev->disk, size);
3909
- revalidate_disk(rbd_dev->disk);
4916
+ revalidate_disk_size(rbd_dev->disk, true);
39104917 }
39114918 }
39124919
....@@ -3932,12 +4939,8 @@
39324939 goto out;
39334940 }
39344941
3935
- if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3936
- rbd_dev->mapping.size = rbd_dev->header.image_size;
3937
- } else {
3938
- /* validate mapped snapshot's EXISTS flag */
3939
- rbd_exists_validate(rbd_dev);
3940
- }
4942
+ rbd_assert(!rbd_is_snap(rbd_dev));
4943
+ rbd_dev->mapping.size = rbd_dev->header.image_size;
39414944
39424945 out:
39434946 up_write(&rbd_dev->header_rwsem);
....@@ -3947,18 +4950,8 @@
39474950 return ret;
39484951 }
39494952
3950
-static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3951
- unsigned int hctx_idx, unsigned int numa_node)
3952
-{
3953
- struct work_struct *work = blk_mq_rq_to_pdu(rq);
3954
-
3955
- INIT_WORK(work, rbd_queue_workfn);
3956
- return 0;
3957
-}
3958
-
39594953 static const struct blk_mq_ops rbd_mq_ops = {
39604954 .queue_rq = rbd_queue_rq,
3961
- .init_request = rbd_init_request,
39624955 };
39634956
39644957 static int rbd_init_disk(struct rbd_device *rbd_dev)
....@@ -3989,9 +4982,9 @@
39894982 rbd_dev->tag_set.ops = &rbd_mq_ops;
39904983 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
39914984 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3992
- rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3993
- rbd_dev->tag_set.nr_hw_queues = 1;
3994
- rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4985
+ rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4986
+ rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4987
+ rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
39954988
39964989 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
39974990 if (err)
....@@ -4010,18 +5003,18 @@
40105003 q->limits.max_sectors = queue_max_hw_sectors(q);
40115004 blk_queue_max_segments(q, USHRT_MAX);
40125005 blk_queue_max_segment_size(q, UINT_MAX);
4013
- blk_queue_io_min(q, objset_bytes);
4014
- blk_queue_io_opt(q, objset_bytes);
5006
+ blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5007
+ blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
40155008
40165009 if (rbd_dev->opts->trim) {
40175010 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4018
- q->limits.discard_granularity = objset_bytes;
5011
+ q->limits.discard_granularity = rbd_dev->opts->alloc_size;
40195012 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
40205013 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
40215014 }
40225015
40235016 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4024
- q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5017
+ blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
40255018
40265019 /*
40275020 * disk_release() expects a queue ref from add_disk() and will
....@@ -4059,17 +5052,12 @@
40595052 (unsigned long long)rbd_dev->mapping.size);
40605053 }
40615054
4062
-/*
4063
- * Note this shows the features for whatever's mapped, which is not
4064
- * necessarily the base image.
4065
- */
40665055 static ssize_t rbd_features_show(struct device *dev,
40675056 struct device_attribute *attr, char *buf)
40685057 {
40695058 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
40705059
4071
- return sprintf(buf, "0x%016llx\n",
4072
- (unsigned long long)rbd_dev->mapping.features);
5060
+ return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
40735061 }
40745062
40755063 static ssize_t rbd_major_show(struct device *dev,
....@@ -4414,7 +5402,13 @@
44145402 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
44155403 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
44165404 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4417
- init_waitqueue_head(&rbd_dev->lock_waitq);
5405
+ spin_lock_init(&rbd_dev->lock_lists_lock);
5406
+ INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5407
+ INIT_LIST_HEAD(&rbd_dev->running_list);
5408
+ init_completion(&rbd_dev->acquire_wait);
5409
+ init_completion(&rbd_dev->releasing_wait);
5410
+
5411
+ spin_lock_init(&rbd_dev->object_map_lock);
44185412
44195413 rbd_dev->dev.bus = &rbd_bus_type;
44205414 rbd_dev->dev.type = &rbd_device_type;
....@@ -4521,17 +5515,20 @@
45215515
45225516 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
45235517 {
5518
+ size_t size;
45245519 void *reply_buf;
45255520 int ret;
45265521 void *p;
45275522
4528
- reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
5523
+ /* Response will be an encoded string, which includes a length */
5524
+ size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5525
+ reply_buf = kzalloc(size, GFP_KERNEL);
45295526 if (!reply_buf)
45305527 return -ENOMEM;
45315528
45325529 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
45335530 &rbd_dev->header_oloc, "get_object_prefix",
4534
- NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
5531
+ NULL, 0, reply_buf, size);
45355532 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
45365533 if (ret < 0)
45375534 goto out;
....@@ -4554,9 +5551,12 @@
45545551 }
45555552
45565553 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4557
- u64 *snap_features)
5554
+ bool read_only, u64 *snap_features)
45585555 {
4559
- __le64 snapid = cpu_to_le64(snap_id);
5556
+ struct {
5557
+ __le64 snap_id;
5558
+ u8 read_only;
5559
+ } features_in;
45605560 struct {
45615561 __le64 features;
45625562 __le64 incompat;
....@@ -4564,9 +5564,12 @@
45645564 u64 unsup;
45655565 int ret;
45665566
5567
+ features_in.snap_id = cpu_to_le64(snap_id);
5568
+ features_in.read_only = read_only;
5569
+
45675570 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
45685571 &rbd_dev->header_oloc, "get_features",
4569
- &snapid, sizeof(snapid),
5572
+ &features_in, sizeof(features_in),
45705573 &features_buf, sizeof(features_buf));
45715574 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
45725575 if (ret < 0)
....@@ -4594,7 +5597,34 @@
45945597 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
45955598 {
45965599 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4597
- &rbd_dev->header.features);
5600
+ rbd_is_ro(rbd_dev),
5601
+ &rbd_dev->header.features);
5602
+}
5603
+
5604
+/*
5605
+ * These are generic image flags, but since they are used only for
5606
+ * object map, store them in rbd_dev->object_map_flags.
5607
+ *
5608
+ * For the same reason, this function is called only on object map
5609
+ * (re)load and not on header refresh.
5610
+ */
5611
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5612
+{
5613
+ __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5614
+ __le64 flags;
5615
+ int ret;
5616
+
5617
+ ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5618
+ &rbd_dev->header_oloc, "get_flags",
5619
+ &snapid, sizeof(snapid),
5620
+ &flags, sizeof(flags));
5621
+ if (ret < 0)
5622
+ return ret;
5623
+ if (ret < sizeof(flags))
5624
+ return -EBADMSG;
5625
+
5626
+ rbd_dev->object_map_flags = le64_to_cpu(flags);
5627
+ return 0;
45985628 }
45995629
46005630 struct parent_image_info {
....@@ -4654,7 +5684,7 @@
46545684
46555685 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
46565686 "rbd", "parent_get", CEPH_OSD_FLAG_READ,
4657
- req_page, sizeof(u64), reply_page, &reply_len);
5687
+ req_page, sizeof(u64), &reply_page, &reply_len);
46585688 if (ret)
46595689 return ret == -EOPNOTSUPP ? 1 : ret;
46605690
....@@ -4666,7 +5696,7 @@
46665696
46675697 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
46685698 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
4669
- req_page, sizeof(u64), reply_page, &reply_len);
5699
+ req_page, sizeof(u64), &reply_page, &reply_len);
46705700 if (ret)
46715701 return ret;
46725702
....@@ -4697,7 +5727,7 @@
46975727
46985728 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
46995729 "rbd", "get_parent", CEPH_OSD_FLAG_READ,
4700
- req_page, sizeof(u64), reply_page, &reply_len);
5730
+ req_page, sizeof(u64), &reply_page, &reply_len);
47015731 if (ret)
47025732 return ret;
47035733
....@@ -5275,6 +6305,141 @@
52756305 return dup;
52766306 }
52776307
6308
+static int rbd_parse_param(struct fs_parameter *param,
6309
+ struct rbd_parse_opts_ctx *pctx)
6310
+{
6311
+ struct rbd_options *opt = pctx->opts;
6312
+ struct fs_parse_result result;
6313
+ struct p_log log = {.prefix = "rbd"};
6314
+ int token, ret;
6315
+
6316
+ ret = ceph_parse_param(param, pctx->copts, NULL);
6317
+ if (ret != -ENOPARAM)
6318
+ return ret;
6319
+
6320
+ token = __fs_parse(&log, rbd_parameters, param, &result);
6321
+ dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6322
+ if (token < 0) {
6323
+ if (token == -ENOPARAM)
6324
+ return inval_plog(&log, "Unknown parameter '%s'",
6325
+ param->key);
6326
+ return token;
6327
+ }
6328
+
6329
+ switch (token) {
6330
+ case Opt_queue_depth:
6331
+ if (result.uint_32 < 1)
6332
+ goto out_of_range;
6333
+ opt->queue_depth = result.uint_32;
6334
+ break;
6335
+ case Opt_alloc_size:
6336
+ if (result.uint_32 < SECTOR_SIZE)
6337
+ goto out_of_range;
6338
+ if (!is_power_of_2(result.uint_32))
6339
+ return inval_plog(&log, "alloc_size must be a power of 2");
6340
+ opt->alloc_size = result.uint_32;
6341
+ break;
6342
+ case Opt_lock_timeout:
6343
+ /* 0 is "wait forever" (i.e. infinite timeout) */
6344
+ if (result.uint_32 > INT_MAX / 1000)
6345
+ goto out_of_range;
6346
+ opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6347
+ break;
6348
+ case Opt_pool_ns:
6349
+ kfree(pctx->spec->pool_ns);
6350
+ pctx->spec->pool_ns = param->string;
6351
+ param->string = NULL;
6352
+ break;
6353
+ case Opt_compression_hint:
6354
+ switch (result.uint_32) {
6355
+ case Opt_compression_hint_none:
6356
+ opt->alloc_hint_flags &=
6357
+ ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6358
+ CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6359
+ break;
6360
+ case Opt_compression_hint_compressible:
6361
+ opt->alloc_hint_flags |=
6362
+ CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6363
+ opt->alloc_hint_flags &=
6364
+ ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6365
+ break;
6366
+ case Opt_compression_hint_incompressible:
6367
+ opt->alloc_hint_flags |=
6368
+ CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6369
+ opt->alloc_hint_flags &=
6370
+ ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6371
+ break;
6372
+ default:
6373
+ BUG();
6374
+ }
6375
+ break;
6376
+ case Opt_read_only:
6377
+ opt->read_only = true;
6378
+ break;
6379
+ case Opt_read_write:
6380
+ opt->read_only = false;
6381
+ break;
6382
+ case Opt_lock_on_read:
6383
+ opt->lock_on_read = true;
6384
+ break;
6385
+ case Opt_exclusive:
6386
+ opt->exclusive = true;
6387
+ break;
6388
+ case Opt_notrim:
6389
+ opt->trim = false;
6390
+ break;
6391
+ default:
6392
+ BUG();
6393
+ }
6394
+
6395
+ return 0;
6396
+
6397
+out_of_range:
6398
+ return inval_plog(&log, "%s out of range", param->key);
6399
+}
6400
+
6401
+/*
6402
+ * This duplicates most of generic_parse_monolithic(), untying it from
6403
+ * fs_context and skipping standard superblock and security options.
6404
+ */
6405
+static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6406
+{
6407
+ char *key;
6408
+ int ret = 0;
6409
+
6410
+ dout("%s '%s'\n", __func__, options);
6411
+ while ((key = strsep(&options, ",")) != NULL) {
6412
+ if (*key) {
6413
+ struct fs_parameter param = {
6414
+ .key = key,
6415
+ .type = fs_value_is_flag,
6416
+ };
6417
+ char *value = strchr(key, '=');
6418
+ size_t v_len = 0;
6419
+
6420
+ if (value) {
6421
+ if (value == key)
6422
+ continue;
6423
+ *value++ = 0;
6424
+ v_len = strlen(value);
6425
+ param.string = kmemdup_nul(value, v_len,
6426
+ GFP_KERNEL);
6427
+ if (!param.string)
6428
+ return -ENOMEM;
6429
+ param.type = fs_value_is_string;
6430
+ }
6431
+ param.size = v_len;
6432
+
6433
+ ret = rbd_parse_param(&param, pctx);
6434
+ kfree(param.string);
6435
+ if (ret)
6436
+ break;
6437
+ }
6438
+ }
6439
+
6440
+ return ret;
6441
+}
6442
+
52786443 /*
52796444 * Parse the options provided for an "rbd add" (i.e., rbd image
52806445 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
....@@ -5326,8 +6491,7 @@
53266491 const char *mon_addrs;
53276492 char *snap_name;
53286493 size_t mon_addrs_size;
5329
- struct parse_rbd_opts_ctx pctx = { 0 };
5330
- struct ceph_options *copts;
6494
+ struct rbd_parse_opts_ctx pctx = { 0 };
53316495 int ret;
53326496
53336497 /* The first four tokens are required */
....@@ -5338,7 +6502,7 @@
53386502 return -EINVAL;
53396503 }
53406504 mon_addrs = buf;
5341
- mon_addrs_size = len + 1;
6505
+ mon_addrs_size = len;
53426506 buf += len;
53436507
53446508 ret = -EINVAL;
....@@ -5388,6 +6552,10 @@
53886552 *(snap_name + len) = '\0';
53896553 pctx.spec->snap_name = snap_name;
53906554
6555
+ pctx.copts = ceph_alloc_options();
6556
+ if (!pctx.copts)
6557
+ goto out_mem;
6558
+
53916559 /* Initialize all rbd options to the defaults */
53926560
53936561 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
....@@ -5396,32 +6564,33 @@
53966564
53976565 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
53986566 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6567
+ pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
53996568 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
54006569 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
54016570 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
54026571 pctx.opts->trim = RBD_TRIM_DEFAULT;
54036572
5404
- copts = ceph_parse_options(options, mon_addrs,
5405
- mon_addrs + mon_addrs_size - 1,
5406
- parse_rbd_opts_token, &pctx);
5407
- if (IS_ERR(copts)) {
5408
- ret = PTR_ERR(copts);
6573
+ ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
6574
+ if (ret)
54096575 goto out_err;
5410
- }
5411
- kfree(options);
54126576
5413
- *ceph_opts = copts;
6577
+ ret = rbd_parse_options(options, &pctx);
6578
+ if (ret)
6579
+ goto out_err;
6580
+
6581
+ *ceph_opts = pctx.copts;
54146582 *opts = pctx.opts;
54156583 *rbd_spec = pctx.spec;
5416
-
6584
+ kfree(options);
54176585 return 0;
6586
+
54186587 out_mem:
54196588 ret = -ENOMEM;
54206589 out_err:
54216590 kfree(pctx.opts);
6591
+ ceph_destroy_options(pctx.copts);
54226592 rbd_spec_put(pctx.spec);
54236593 kfree(options);
5424
-
54256594 return ret;
54266595 }
54276596
....@@ -5429,28 +6598,52 @@
54296598 {
54306599 down_write(&rbd_dev->lock_rwsem);
54316600 if (__rbd_is_lock_owner(rbd_dev))
5432
- rbd_unlock(rbd_dev);
6601
+ __rbd_release_lock(rbd_dev);
54336602 up_write(&rbd_dev->lock_rwsem);
54346603 }
54356604
6605
+/*
6606
+ * If the wait is interrupted, an error is returned even if the lock
6607
+ * was successfully acquired. rbd_dev_image_unlock() will release it
6608
+ * if needed.
6609
+ */
54366610 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
54376611 {
5438
- int ret;
6612
+ long ret;
54396613
54406614 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6615
+ if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6616
+ return 0;
6617
+
54416618 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
54426619 return -EINVAL;
54436620 }
54446621
5445
- /* FIXME: "rbd map --exclusive" should be in interruptible */
5446
- down_read(&rbd_dev->lock_rwsem);
5447
- ret = rbd_wait_state_locked(rbd_dev, true);
5448
- up_read(&rbd_dev->lock_rwsem);
5449
- if (ret) {
5450
- rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5451
- return -EROFS;
6622
+ if (rbd_is_ro(rbd_dev))
6623
+ return 0;
6624
+
6625
+ rbd_assert(!rbd_is_lock_owner(rbd_dev));
6626
+ queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6627
+ ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6628
+ ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6629
+ if (ret > 0) {
6630
+ ret = rbd_dev->acquire_err;
6631
+ } else {
6632
+ cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6633
+ if (!ret)
6634
+ ret = -ETIMEDOUT;
54526635 }
54536636
6637
+ if (ret) {
6638
+ rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6639
+ return ret;
6640
+ }
6641
+
6642
+ /*
6643
+ * The lock may have been released by now, unless automatic lock
6644
+ * transitions are disabled.
6645
+ */
6646
+ rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
54546647 return 0;
54556648 }
54566649
....@@ -5500,7 +6693,6 @@
55006693 dout("rbd id object name is %s\n", oid.name);
55016694
55026695 /* Response will be an encoded string, which includes a length */
5503
-
55046696 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
55056697 response = kzalloc(size, GFP_NOIO);
55066698 if (!response) {
....@@ -5512,7 +6704,7 @@
55126704
55136705 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
55146706 "get_id", NULL, 0,
5515
- response, RBD_IMAGE_ID_LEN_MAX);
6707
+ response, size);
55166708 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
55176709 if (ret == -ENOENT) {
55186710 image_id = kstrdup("", GFP_KERNEL);
....@@ -5548,6 +6740,8 @@
55486740 struct rbd_image_header *header;
55496741
55506742 rbd_dev_parent_put(rbd_dev);
6743
+ rbd_object_map_free(rbd_dev);
6744
+ rbd_dev_mapping_clear(rbd_dev);
55516745
55526746 /* Free dynamic fields from the header, then zero it out */
55536747
....@@ -5631,6 +6825,8 @@
56316825 __rbd_get_client(rbd_dev->rbd_client);
56326826 rbd_spec_get(rbd_dev->parent_spec);
56336827
6828
+ __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6829
+
56346830 ret = rbd_dev_image_probe(parent, depth);
56356831 if (ret < 0)
56366832 goto out_err;
....@@ -5648,7 +6844,6 @@
56486844 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
56496845 {
56506846 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5651
- rbd_dev_mapping_clear(rbd_dev);
56526847 rbd_free_disk(rbd_dev);
56536848 if (!single_major)
56546849 unregister_blkdev(rbd_dev->major, rbd_dev->name);
....@@ -5682,23 +6877,17 @@
56826877 if (ret)
56836878 goto err_out_blkdev;
56846879
5685
- ret = rbd_dev_mapping_set(rbd_dev);
5686
- if (ret)
5687
- goto err_out_disk;
5688
-
56896880 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5690
- set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
6881
+ set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
56916882
56926883 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
56936884 if (ret)
5694
- goto err_out_mapping;
6885
+ goto err_out_disk;
56956886
56966887 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
56976888 up_write(&rbd_dev->header_rwsem);
56986889 return 0;
56996890
5700
-err_out_mapping:
5701
- rbd_dev_mapping_clear(rbd_dev);
57026891 err_out_disk:
57036892 rbd_free_disk(rbd_dev);
57046893 err_out_blkdev:
....@@ -5727,9 +6916,27 @@
57276916 return ret;
57286917 }
57296918
6919
+static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6920
+{
6921
+ if (!is_snap) {
6922
+ pr_info("image %s/%s%s%s does not exist\n",
6923
+ rbd_dev->spec->pool_name,
6924
+ rbd_dev->spec->pool_ns ?: "",
6925
+ rbd_dev->spec->pool_ns ? "/" : "",
6926
+ rbd_dev->spec->image_name);
6927
+ } else {
6928
+ pr_info("snap %s/%s%s%s@%s does not exist\n",
6929
+ rbd_dev->spec->pool_name,
6930
+ rbd_dev->spec->pool_ns ?: "",
6931
+ rbd_dev->spec->pool_ns ? "/" : "",
6932
+ rbd_dev->spec->image_name,
6933
+ rbd_dev->spec->snap_name);
6934
+ }
6935
+}
6936
+
57306937 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
57316938 {
5732
- if (rbd_dev->opts)
6939
+ if (!rbd_is_ro(rbd_dev))
57336940 rbd_unregister_watch(rbd_dev);
57346941
57356942 rbd_dev_unprobe(rbd_dev);
....@@ -5749,6 +6956,7 @@
57496956 */
57506957 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
57516958 {
6959
+ bool need_watch = !rbd_is_ro(rbd_dev);
57526960 int ret;
57536961
57546962 /*
....@@ -5765,15 +6973,11 @@
57656973 if (ret)
57666974 goto err_out_format;
57676975
5768
- if (!depth) {
6976
+ if (need_watch) {
57696977 ret = rbd_register_watch(rbd_dev);
57706978 if (ret) {
57716979 if (ret == -ENOENT)
5772
- pr_info("image %s/%s%s%s does not exist\n",
5773
- rbd_dev->spec->pool_name,
5774
- rbd_dev->spec->pool_ns ?: "",
5775
- rbd_dev->spec->pool_ns ? "/" : "",
5776
- rbd_dev->spec->image_name);
6980
+ rbd_print_dne(rbd_dev, false);
57776981 goto err_out_format;
57786982 }
57796983 }
....@@ -5782,8 +6986,11 @@
57826986 down_write(&rbd_dev->header_rwsem);
57836987
57846988 ret = rbd_dev_header_info(rbd_dev);
5785
- if (ret)
6989
+ if (ret) {
6990
+ if (ret == -ENOENT && !need_watch)
6991
+ rbd_print_dne(rbd_dev, false);
57866992 goto err_out_probe;
6993
+ }
57876994
57886995 /*
57896996 * If this image is the one being mapped, we have pool name and
....@@ -5797,27 +7004,25 @@
57977004 ret = rbd_spec_fill_names(rbd_dev);
57987005 if (ret) {
57997006 if (ret == -ENOENT)
5800
- pr_info("snap %s/%s%s%s@%s does not exist\n",
5801
- rbd_dev->spec->pool_name,
5802
- rbd_dev->spec->pool_ns ?: "",
5803
- rbd_dev->spec->pool_ns ? "/" : "",
5804
- rbd_dev->spec->image_name,
5805
- rbd_dev->spec->snap_name);
7007
+ rbd_print_dne(rbd_dev, true);
58067008 goto err_out_probe;
7009
+ }
7010
+
7011
+ ret = rbd_dev_mapping_set(rbd_dev);
7012
+ if (ret)
7013
+ goto err_out_probe;
7014
+
7015
+ if (rbd_is_snap(rbd_dev) &&
7016
+ (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7017
+ ret = rbd_object_map_load(rbd_dev);
7018
+ if (ret)
7019
+ goto err_out_probe;
58077020 }
58087021
58097022 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
58107023 ret = rbd_dev_v2_parent_info(rbd_dev);
58117024 if (ret)
58127025 goto err_out_probe;
5813
-
5814
- /*
5815
- * Need to warn users if this image is the one being
5816
- * mapped and has a parent.
5817
- */
5818
- if (!depth && rbd_dev->parent_spec)
5819
- rbd_warn(rbd_dev,
5820
- "WARNING: kernel layering is EXPERIMENTAL!");
58217026 }
58227027
58237028 ret = rbd_dev_probe_parent(rbd_dev, depth);
....@@ -5831,7 +7036,7 @@
58317036 err_out_probe:
58327037 if (!depth)
58337038 up_write(&rbd_dev->header_rwsem);
5834
- if (!depth)
7039
+ if (need_watch)
58357040 rbd_unregister_watch(rbd_dev);
58367041 rbd_dev_unprobe(rbd_dev);
58377042 err_out_format:
....@@ -5887,6 +7092,11 @@
58877092 spec = NULL; /* rbd_dev now owns this */
58887093 rbd_opts = NULL; /* rbd_dev now owns this */
58897094
7095
+ /* if we are mapping a snapshot it will be a read-only mapping */
7096
+ if (rbd_dev->opts->read_only ||
7097
+ strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7098
+ __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7099
+
58907100 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
58917101 if (!rbd_dev->config_info) {
58927102 rc = -ENOMEM;
....@@ -5897,19 +7107,19 @@
58977107 if (rc < 0)
58987108 goto err_out_rbd_dev;
58997109
5900
- /* If we are mapping a snapshot it must be marked read-only */
5901
- if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5902
- rbd_dev->opts->read_only = true;
7110
+ if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7111
+ rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7112
+ rbd_dev->layout.object_size);
7113
+ rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7114
+ }
59037115
59047116 rc = rbd_dev_device_setup(rbd_dev);
59057117 if (rc)
59067118 goto err_out_image_probe;
59077119
5908
- if (rbd_dev->opts->exclusive) {
5909
- rc = rbd_add_acquire_lock(rbd_dev);
5910
- if (rc)
5911
- goto err_out_device_setup;
5912
- }
7120
+ rc = rbd_add_acquire_lock(rbd_dev);
7121
+ if (rc)
7122
+ goto err_out_image_lock;
59137123
59147124 /* Everything's ready. Announce the disk to the world. */
59157125
....@@ -5917,7 +7127,7 @@
59177127 if (rc)
59187128 goto err_out_image_lock;
59197129
5920
- add_disk(rbd_dev->disk);
7130
+ device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
59217131 /* see rbd_init_disk() */
59227132 blk_put_queue(rbd_dev->disk->queue);
59237133
....@@ -5935,7 +7145,6 @@
59357145
59367146 err_out_image_lock:
59377147 rbd_dev_image_unlock(rbd_dev);
5938
-err_out_device_setup:
59397148 rbd_dev_device_release(rbd_dev);
59407149 err_out_image_probe:
59417150 rbd_dev_image_release(rbd_dev);
....@@ -5949,9 +7158,7 @@
59497158 goto out;
59507159 }
59517160
5952
-static ssize_t rbd_add(struct bus_type *bus,
5953
- const char *buf,
5954
- size_t count)
7161
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
59557162 {
59567163 if (single_major)
59577164 return -EINVAL;
....@@ -5959,9 +7166,8 @@
59597166 return do_rbd_add(bus, buf, count);
59607167 }
59617168
5962
-static ssize_t rbd_add_single_major(struct bus_type *bus,
5963
- const char *buf,
5964
- size_t count)
7169
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7170
+ size_t count)
59657171 {
59667172 return do_rbd_add(bus, buf, count);
59677173 }
....@@ -6067,9 +7273,7 @@
60677273 return count;
60687274 }
60697275
6070
-static ssize_t rbd_remove(struct bus_type *bus,
6071
- const char *buf,
6072
- size_t count)
7276
+static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
60737277 {
60747278 if (single_major)
60757279 return -EINVAL;
....@@ -6077,9 +7281,8 @@
60777281 return do_rbd_remove(bus, buf, count);
60787282 }
60797283
6080
-static ssize_t rbd_remove_single_major(struct bus_type *bus,
6081
- const char *buf,
6082
- size_t count)
7284
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7285
+ size_t count)
60837286 {
60847287 return do_rbd_remove(bus, buf, count);
60857288 }
....@@ -6088,7 +7291,7 @@
60887291 * create control files in sysfs
60897292 * /sys/bus/rbd/...
60907293 */
6091
-static int rbd_sysfs_init(void)
7294
+static int __init rbd_sysfs_init(void)
60927295 {
60937296 int ret;
60947297
....@@ -6103,13 +7306,13 @@
61037306 return ret;
61047307 }
61057308
6106
-static void rbd_sysfs_cleanup(void)
7309
+static void __exit rbd_sysfs_cleanup(void)
61077310 {
61087311 bus_unregister(&rbd_bus_type);
61097312 device_unregister(&rbd_root_dev);
61107313 }
61117314
6112
-static int rbd_slab_init(void)
7315
+static int __init rbd_slab_init(void)
61137316 {
61147317 rbd_assert(!rbd_img_request_cache);
61157318 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);