From 9370bb92b2d16684ee45cf24e879c93c509162da Mon Sep 17 00:00:00 2001 From: hc <hc@nodka.com> Date: Thu, 19 Dec 2024 01:47:39 +0000 Subject: [PATCH] add wifi6 8852be driver --- kernel/drivers/block/rbd.c | 3744 ++++++++++++++++++++++++++++++++++++++++------------------- 1 files changed, 2,527 insertions(+), 1,217 deletions(-) diff --git a/kernel/drivers/block/rbd.c b/kernel/drivers/block/rbd.c index 9f1265c..b0f7930 100644 --- a/kernel/drivers/block/rbd.c +++ b/kernel/drivers/block/rbd.c @@ -34,7 +34,7 @@ #include <linux/ceph/cls_lock_client.h> #include <linux/ceph/striper.h> #include <linux/ceph/decode.h> -#include <linux/parser.h> +#include <linux/fs_parser.h> #include <linux/bsearch.h> #include <linux/kernel.h> @@ -115,12 +115,18 @@ #define RBD_FEATURE_LAYERING (1ULL<<0) #define RBD_FEATURE_STRIPINGV2 (1ULL<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2) +#define RBD_FEATURE_OBJECT_MAP (1ULL<<3) +#define RBD_FEATURE_FAST_DIFF (1ULL<<4) +#define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) #define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_OPERATIONS (1ULL<<8) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_OBJECT_MAP | \ + RBD_FEATURE_FAST_DIFF | \ + RBD_FEATURE_DEEP_FLATTEN | \ RBD_FEATURE_DATA_POOL | \ RBD_FEATURE_OPERATIONS) @@ -201,6 +207,11 @@ struct list_head node; }; +struct pending_result { + int result; /* first nonzero result */ + int num_pending; +}; + struct rbd_img_request; enum obj_request_type { @@ -214,34 +225,69 @@ OBJ_OP_READ = 1, OBJ_OP_WRITE, OBJ_OP_DISCARD, + OBJ_OP_ZEROOUT, +}; + +#define RBD_OBJ_FLAG_DELETION (1U << 0) +#define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1) +#define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2) +#define RBD_OBJ_FLAG_MAY_EXIST (1U << 3) +#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4) + +enum rbd_obj_read_state { + RBD_OBJ_READ_START = 1, + RBD_OBJ_READ_OBJECT, + RBD_OBJ_READ_PARENT, }; /* * Writes go through the following state machine to deal with * layering: * - * need copyup - * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP - * | ^ | - * v \------------------------------/ - * done - * ^ - * | - * RBD_OBJ_WRITE_FLAT + * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . . + * . | . + * . v . + * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . . + * . | . . + * . v v (deep-copyup . + * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) . + * flattened) v | . . + * . v . . + * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup . + * | not needed) v + * v . + * done . . . . . . . . . . . . . . . . . . + * ^ + * | + * RBD_OBJ_WRITE_FLAT * * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether - * there is a parent or not. + * assert_exists guard is needed or not (in some cases it's not needed + * even if there is a parent). */ enum rbd_obj_write_state { - RBD_OBJ_WRITE_FLAT = 1, - RBD_OBJ_WRITE_GUARD, + RBD_OBJ_WRITE_START = 1, + RBD_OBJ_WRITE_PRE_OBJECT_MAP, + RBD_OBJ_WRITE_OBJECT, + __RBD_OBJ_WRITE_COPYUP, RBD_OBJ_WRITE_COPYUP, + RBD_OBJ_WRITE_POST_OBJECT_MAP, +}; + +enum rbd_obj_copyup_state { + RBD_OBJ_COPYUP_START = 1, + RBD_OBJ_COPYUP_READ_PARENT, + __RBD_OBJ_COPYUP_OBJECT_MAPS, + RBD_OBJ_COPYUP_OBJECT_MAPS, + __RBD_OBJ_COPYUP_WRITE_OBJECT, + RBD_OBJ_COPYUP_WRITE_OBJECT, }; struct rbd_obj_request { struct ceph_object_extent ex; + unsigned int flags; /* RBD_OBJ_FLAG_* */ union { - bool tried_parent; /* for reads */ + enum rbd_obj_read_state read_state; /* for reads */ enum rbd_obj_write_state write_state; /* for writes */ }; @@ -257,14 +303,15 @@ u32 bvec_idx; }; }; + + enum rbd_obj_copyup_state copyup_state; struct bio_vec *copyup_bvecs; u32 copyup_bvec_count; - struct ceph_osd_request *osd_req; + struct list_head osd_reqs; /* w/ r_private_item */ - u64 xferred; /* bytes transferred */ - int result; - + struct mutex state_mutex; + struct pending_result pending; struct kref kref; }; @@ -273,28 +320,32 @@ IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ }; +enum rbd_img_state { + RBD_IMG_START = 1, + RBD_IMG_EXCLUSIVE_LOCK, + __RBD_IMG_OBJECT_REQUESTS, + RBD_IMG_OBJECT_REQUESTS, +}; + struct rbd_img_request { struct rbd_device *rbd_dev; enum obj_operation_type op_type; enum obj_request_type data_type; unsigned long flags; + enum rbd_img_state state; union { u64 snap_id; /* for reads */ struct ceph_snap_context *snapc; /* for writes */ }; - union { - struct request *rq; /* block request */ - struct rbd_obj_request *obj_request; /* obj req initiator */ - }; - spinlock_t completion_lock; - u64 xferred;/* aggregate bytes transferred */ - int result; /* first nonzero obj_request result */ + struct rbd_obj_request *obj_request; /* obj req initiator */ + struct list_head lock_item; struct list_head object_extents; /* obj_req.ex structs */ - u32 obj_request_count; - u32 pending_count; - struct kref kref; + struct mutex state_mutex; + struct pending_result pending; + struct work_struct work; + int work_result; }; #define for_each_obj_request(ireq, oreq) \ @@ -322,7 +373,6 @@ struct rbd_mapping { u64 size; - u64 features; }; /* @@ -367,7 +417,17 @@ struct work_struct released_lock_work; struct delayed_work lock_dwork; struct work_struct unlock_work; - wait_queue_head_t lock_waitq; + spinlock_t lock_lists_lock; + struct list_head acquiring_list; + struct list_head running_list; + struct completion acquire_wait; + int acquire_err; + struct completion releasing_wait; + + spinlock_t object_map_lock; + u8 *object_map; + u64 object_map_size; /* in objects */ + u64 object_map_flags; struct workqueue_struct *task_wq; @@ -395,12 +455,11 @@ * Flag bits for rbd_dev->flags: * - REMOVING (which is coupled with rbd_dev->open_count) is protected * by rbd_dev->lock - * - BLACKLISTED is protected by rbd_dev->lock_rwsem */ enum rbd_dev_flags { - RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ + RBD_DEV_FLAG_EXISTS, /* rbd_dev_device_setup() ran */ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ - RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ + RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */ }; static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ @@ -421,6 +480,10 @@ static struct workqueue_struct *rbd_wq; +static struct ceph_snap_context rbd_empty_snapc = { + .nref = REFCOUNT_INIT(1), +}; + /* * single-major requires >= 0.75 version of userspace rbd utility. */ @@ -428,14 +491,13 @@ module_param(single_major, bool, 0444); MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)"); -static ssize_t rbd_add(struct bus_type *bus, const char *buf, - size_t count); -static ssize_t rbd_remove(struct bus_type *bus, const char *buf, - size_t count); -static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, - size_t count); -static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, - size_t count); +static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count); +static ssize_t remove_store(struct bus_type *bus, const char *buf, + size_t count); +static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, + size_t count); +static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, + size_t count); static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth); static int rbd_dev_id_to_minor(int dev_id) @@ -448,8 +510,20 @@ return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } +static bool rbd_is_ro(struct rbd_device *rbd_dev) +{ + return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags); +} + +static bool rbd_is_snap(struct rbd_device *rbd_dev) +{ + return rbd_dev->spec->snap_id != CEPH_NOSNAP; +} + static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) { + lockdep_assert_held(&rbd_dev->lock_rwsem); + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; } @@ -464,16 +538,16 @@ return is_lock_owner; } -static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf) +static ssize_t supported_features_show(struct bus_type *bus, char *buf) { return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED); } -static BUS_ATTR(add, 0200, NULL, rbd_add); -static BUS_ATTR(remove, 0200, NULL, rbd_remove); -static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major); -static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major); -static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL); +static BUS_ATTR_WO(add); +static BUS_ATTR_WO(remove); +static BUS_ATTR_WO(add_single_major); +static BUS_ATTR_WO(remove_single_major); +static BUS_ATTR_RO(supported_features); static struct attribute *rbd_bus_attrs[] = { &bus_attr_add.attr, @@ -558,15 +632,32 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); static int rbd_dev_refresh(struct rbd_device *rbd_dev); -static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); -static int rbd_dev_header_info(struct rbd_device *rbd_dev); -static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev); +static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev, + struct rbd_image_header *header); static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u64 snap_id); static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, u8 *order, u64 *snap_size); -static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, - u64 *snap_features); +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev); + +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result); +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result); + +/* + * Return true if nothing else is pending. + */ +static bool pending_result_dec(struct pending_result *pending, int *result) +{ + rbd_assert(pending->num_pending > 0); + + if (*result && !pending->result) + pending->result = *result; + if (--pending->num_pending) + return false; + + *result = pending->result; + return true; +} static int rbd_open(struct block_device *bdev, fmode_t mode) { @@ -607,9 +698,16 @@ if (get_user(ro, (int __user *)arg)) return -EFAULT; - /* Snapshots can't be marked read-write */ - if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) - return -EROFS; + /* + * Both images mapped read-only and snapshots can't be marked + * read-write. + */ + if (!ro) { + if (rbd_is_ro(rbd_dev)) + return -EROFS; + + rbd_assert(!rbd_is_snap(rbd_dev)); + } /* Let blkdev_roset() handle it */ return -ENOTTY; @@ -733,121 +831,74 @@ */ enum { Opt_queue_depth, + Opt_alloc_size, Opt_lock_timeout, - Opt_last_int, /* int args above */ Opt_pool_ns, - Opt_last_string, + Opt_compression_hint, /* string args above */ Opt_read_only, Opt_read_write, Opt_lock_on_read, Opt_exclusive, Opt_notrim, - Opt_err }; -static match_table_t rbd_opts_tokens = { - {Opt_queue_depth, "queue_depth=%d"}, - {Opt_lock_timeout, "lock_timeout=%d"}, - /* int args above */ - {Opt_pool_ns, "_pool_ns=%s"}, - /* string args above */ - {Opt_read_only, "read_only"}, - {Opt_read_only, "ro"}, /* Alternate spelling */ - {Opt_read_write, "read_write"}, - {Opt_read_write, "rw"}, /* Alternate spelling */ - {Opt_lock_on_read, "lock_on_read"}, - {Opt_exclusive, "exclusive"}, - {Opt_notrim, "notrim"}, - {Opt_err, NULL} +enum { + Opt_compression_hint_none, + Opt_compression_hint_compressible, + Opt_compression_hint_incompressible, +}; + +static const struct constant_table rbd_param_compression_hint[] = { + {"none", Opt_compression_hint_none}, + {"compressible", Opt_compression_hint_compressible}, + {"incompressible", Opt_compression_hint_incompressible}, + {} +}; + +static const struct fs_parameter_spec rbd_parameters[] = { + fsparam_u32 ("alloc_size", Opt_alloc_size), + fsparam_enum ("compression_hint", Opt_compression_hint, + rbd_param_compression_hint), + fsparam_flag ("exclusive", Opt_exclusive), + fsparam_flag ("lock_on_read", Opt_lock_on_read), + fsparam_u32 ("lock_timeout", Opt_lock_timeout), + fsparam_flag ("notrim", Opt_notrim), + fsparam_string ("_pool_ns", Opt_pool_ns), + fsparam_u32 ("queue_depth", Opt_queue_depth), + fsparam_flag ("read_only", Opt_read_only), + fsparam_flag ("read_write", Opt_read_write), + fsparam_flag ("ro", Opt_read_only), + fsparam_flag ("rw", Opt_read_write), + {} }; struct rbd_options { int queue_depth; + int alloc_size; unsigned long lock_timeout; bool read_only; bool lock_on_read; bool exclusive; bool trim; + + u32 alloc_hint_flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */ }; #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ +#define RBD_ALLOC_SIZE_DEFAULT (64 * 1024) #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */ #define RBD_READ_ONLY_DEFAULT false #define RBD_LOCK_ON_READ_DEFAULT false #define RBD_EXCLUSIVE_DEFAULT false #define RBD_TRIM_DEFAULT true -struct parse_rbd_opts_ctx { +struct rbd_parse_opts_ctx { struct rbd_spec *spec; + struct ceph_options *copts; struct rbd_options *opts; }; - -static int parse_rbd_opts_token(char *c, void *private) -{ - struct parse_rbd_opts_ctx *pctx = private; - substring_t argstr[MAX_OPT_ARGS]; - int token, intval, ret; - - token = match_token(c, rbd_opts_tokens, argstr); - if (token < Opt_last_int) { - ret = match_int(&argstr[0], &intval); - if (ret < 0) { - pr_err("bad option arg (not int) at '%s'\n", c); - return ret; - } - dout("got int token %d val %d\n", token, intval); - } else if (token > Opt_last_int && token < Opt_last_string) { - dout("got string token %d val %s\n", token, argstr[0].from); - } else { - dout("got token %d\n", token); - } - - switch (token) { - case Opt_queue_depth: - if (intval < 1) { - pr_err("queue_depth out of range\n"); - return -EINVAL; - } - pctx->opts->queue_depth = intval; - break; - case Opt_lock_timeout: - /* 0 is "wait forever" (i.e. infinite timeout) */ - if (intval < 0 || intval > INT_MAX / 1000) { - pr_err("lock_timeout out of range\n"); - return -EINVAL; - } - pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000); - break; - case Opt_pool_ns: - kfree(pctx->spec->pool_ns); - pctx->spec->pool_ns = match_strdup(argstr); - if (!pctx->spec->pool_ns) - return -ENOMEM; - break; - case Opt_read_only: - pctx->opts->read_only = true; - break; - case Opt_read_write: - pctx->opts->read_only = false; - break; - case Opt_lock_on_read: - pctx->opts->lock_on_read = true; - break; - case Opt_exclusive: - pctx->opts->exclusive = true; - break; - case Opt_notrim: - pctx->opts->trim = false; - break; - default: - /* libceph prints "bad option" msg */ - return -EINVAL; - } - - return 0; -} static char* obj_op_name(enum obj_operation_type op_type) { @@ -858,6 +909,8 @@ return "write"; case OBJ_OP_DISCARD: return "discard"; + case OBJ_OP_ZEROOUT: + return "zeroout"; default: return "???"; } @@ -891,23 +944,6 @@ kref_put(&rbdc->kref, rbd_client_release); } -static int wait_for_latest_osdmap(struct ceph_client *client) -{ - u64 newest_epoch; - int ret; - - ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch); - if (ret) - return ret; - - if (client->osdc.osdmap->epoch >= newest_epoch) - return 0; - - ceph_osdc_maybe_request_map(&client->osdc); - return ceph_monc_wait_osdmap(&client->monc, newest_epoch, - client->options->mount_timeout); -} - /* * Get a ceph client with specific addr and configuration, if one does * not exist create it. Either way, ceph_opts is consumed by this @@ -918,7 +954,7 @@ struct rbd_client *rbdc; int ret; - mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); + mutex_lock(&client_mutex); rbdc = rbd_client_find(ceph_opts); if (rbdc) { ceph_destroy_options(ceph_opts); @@ -927,7 +963,8 @@ * Using an existing client. Make sure ->pg_pools is up to * date before we look up the pool id in do_rbd_add(). */ - ret = wait_for_latest_osdmap(rbdc->client); + ret = ceph_wait_for_latest_osdmap(rbdc->client, + rbdc->client->options->mount_timeout); if (ret) { rbd_warn(NULL, "failed to get latest osdmap: %d", ret); rbd_put_client(rbdc); @@ -1009,15 +1046,24 @@ RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); } +static void rbd_image_header_cleanup(struct rbd_image_header *header) +{ + kfree(header->object_prefix); + ceph_put_snap_context(header->snapc); + kfree(header->snap_sizes); + kfree(header->snap_names); + + memset(header, 0, sizeof(*header)); +} + /* * Fill an rbd image header with information from the given format 1 * on-disk header. */ -static int rbd_header_from_disk(struct rbd_device *rbd_dev, - struct rbd_image_header_ondisk *ondisk) +static int rbd_header_from_disk(struct rbd_image_header *header, + struct rbd_image_header_ondisk *ondisk, + bool first_time) { - struct rbd_image_header *header = &rbd_dev->header; - bool first_time = header->object_prefix == NULL; struct ceph_snap_context *snapc; char *object_prefix = NULL; char *snap_names = NULL; @@ -1084,11 +1130,6 @@ if (first_time) { header->object_prefix = object_prefix; header->obj_order = ondisk->options.order; - rbd_init_layout(rbd_dev); - } else { - ceph_put_snap_context(header->snapc); - kfree(header->snap_names); - kfree(header->snap_sizes); } /* The remaining fields always get updated (when we refresh) */ @@ -1213,51 +1254,23 @@ return 0; } -static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, - u64 *snap_features) -{ - rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); - if (snap_id == CEPH_NOSNAP) { - *snap_features = rbd_dev->header.features; - } else if (rbd_dev->image_format == 1) { - *snap_features = 0; /* No features for format 1 */ - } else { - u64 features = 0; - int ret; - - ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); - if (ret) - return ret; - - *snap_features = features; - } - return 0; -} - static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) { u64 snap_id = rbd_dev->spec->snap_id; u64 size = 0; - u64 features = 0; int ret; ret = rbd_snap_size(rbd_dev, snap_id, &size); if (ret) return ret; - ret = rbd_snap_features(rbd_dev, snap_id, &features); - if (ret) - return ret; rbd_dev->mapping.size = size; - rbd_dev->mapping.features = features; - return 0; } static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) { rbd_dev->mapping.size = 0; - rbd_dev->mapping.features = 0; } static void zero_bvec(struct bio_vec *bv) @@ -1300,6 +1313,8 @@ static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, u32 bytes) { + dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes); + switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO: zero_bios(&obj_req->bio_pos, off, bytes); @@ -1309,7 +1324,7 @@ zero_bvecs(&obj_req->bvec_pos, off, bytes); break; default: - rbd_assert(0); + BUG(); } } @@ -1322,22 +1337,6 @@ kref_put(&obj_request->kref, rbd_obj_request_destroy); } -static void rbd_img_request_get(struct rbd_img_request *img_request) -{ - dout("%s: img %p (was %d)\n", __func__, img_request, - kref_read(&img_request->kref)); - kref_get(&img_request->kref); -} - -static void rbd_img_request_destroy(struct kref *kref); -static void rbd_img_request_put(struct rbd_img_request *img_request) -{ - rbd_assert(img_request != NULL); - dout("%s: img %p (was %d)\n", __func__, img_request, - kref_read(&img_request->kref)); - kref_put(&img_request->kref, rbd_img_request_destroy); -} - static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, struct rbd_obj_request *obj_request) { @@ -1345,8 +1344,6 @@ /* Image request now owns object's original reference */ obj_request->img_request = img_request; - img_request->obj_request_count++; - img_request->pending_count++; dout("%s: img %p obj %p\n", __func__, img_request, obj_request); } @@ -1355,19 +1352,17 @@ { dout("%s: img %p obj %p\n", __func__, img_request, obj_request); list_del(&obj_request->ex.oe_item); - rbd_assert(img_request->obj_request_count > 0); - img_request->obj_request_count--; rbd_assert(obj_request->img_request == img_request); rbd_obj_request_put(obj_request); } -static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) +static void rbd_osd_submit(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_req = osd_req->r_priv; - dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off, - obj_request->ex.oe_len, osd_req); + dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n", + __func__, osd_req, obj_req, obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len); ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } @@ -1379,18 +1374,10 @@ static void img_request_layered_set(struct rbd_img_request *img_request) { set_bit(IMG_REQ_LAYERED, &img_request->flags); - smp_mb(); -} - -static void img_request_layered_clear(struct rbd_img_request *img_request) -{ - clear_bit(IMG_REQ_LAYERED, &img_request->flags); - smp_mb(); } static bool img_request_layered_test(struct rbd_img_request *img_request) { - smp_mb(); return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; } @@ -1410,6 +1397,35 @@ rbd_dev->layout.object_size; } +/* + * Must be called after rbd_obj_calc_img_extents(). + */ +static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req) +{ + rbd_assert(obj_req->img_request->snapc); + + if (obj_req->img_request->op_type == OBJ_OP_DISCARD) { + dout("%s %p objno %llu discard\n", __func__, obj_req, + obj_req->ex.oe_objno); + return; + } + + if (!obj_req->num_img_extents) { + dout("%s %p objno %llu not overlapping\n", __func__, obj_req, + obj_req->ex.oe_objno); + return; + } + + if (rbd_obj_is_entire(obj_req) && + !obj_req->img_request->snapc->num_snaps) { + dout("%s %p objno %llu entire\n", __func__, obj_req, + obj_req->ex.oe_objno); + return; + } + + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED; +} + static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req) { return ceph_file_extents_bytes(obj_req->img_extents, @@ -1423,47 +1439,47 @@ return false; case OBJ_OP_WRITE: case OBJ_OP_DISCARD: + case OBJ_OP_ZEROOUT: return true; default: BUG(); } } -static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); - static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, osd_req->r_result, obj_req); - rbd_assert(osd_req == obj_req->osd_req); - obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; - if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) - obj_req->xferred = osd_req->r_result; + /* + * Writes aren't allowed to return a data payload. In some + * guarded write cases (e.g. stat + zero on an empty object) + * a stat response makes it through, but we don't care. + */ + if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request)) + result = 0; else - /* - * Writes aren't allowed to return a data payload. In some - * guarded write cases (e.g. stat + zero on an empty object) - * a stat response makes it through, but we don't care. - */ - obj_req->xferred = 0; + result = osd_req->r_result; - rbd_obj_handle_request(obj_req); + rbd_obj_handle_request(obj_req, result); } -static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +static void rbd_osd_format_read(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_request = osd_req->r_priv; + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; + struct ceph_options *opt = rbd_dev->rbd_client->client->options; - osd_req->r_flags = CEPH_OSD_FLAG_READ; + osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica; osd_req->r_snapid = obj_request->img_request->snap_id; } -static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +static void rbd_osd_format_write(struct ceph_osd_request *osd_req) { - struct ceph_osd_request *osd_req = obj_request->osd_req; + struct rbd_obj_request *obj_request = osd_req->r_priv; osd_req->r_flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts64(&osd_req->r_mtime); @@ -1471,21 +1487,21 @@ } static struct ceph_osd_request * -rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) +__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, + struct ceph_snap_context *snapc, int num_ops) { - struct rbd_img_request *img_req = obj_req->img_request; - struct rbd_device *rbd_dev = img_req->rbd_dev; + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; const char *name_format = rbd_dev->image_format == 1 ? RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; + int ret; - req = ceph_osdc_alloc_request(osdc, - (rbd_img_is_write(img_req) ? img_req->snapc : NULL), - num_ops, false, GFP_NOIO); + req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); if (!req) - return NULL; + return ERR_PTR(-ENOMEM); + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); req->r_callback = rbd_osd_req_callback; req->r_priv = obj_req; @@ -1496,23 +1512,21 @@ ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, - rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) - goto err_req; - - if (ceph_osdc_alloc_messages(req, GFP_NOIO)) - goto err_req; + ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, + rbd_dev->header.object_prefix, + obj_req->ex.oe_objno); + if (ret) + return ERR_PTR(ret); return req; - -err_req: - ceph_osdc_put_request(req); - return NULL; } -static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) +static struct ceph_osd_request * +rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops) { - ceph_osdc_put_request(osd_req); + rbd_assert(obj_req->img_request->snapc); + return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc, + num_ops); } static struct rbd_obj_request *rbd_obj_request_create(void) @@ -1524,6 +1538,8 @@ return NULL; ceph_object_extent_init(&obj_request->ex); + INIT_LIST_HEAD(&obj_request->osd_reqs); + mutex_init(&obj_request->state_mutex); kref_init(&obj_request->kref); dout("%s %p\n", __func__, obj_request); @@ -1533,14 +1549,19 @@ static void rbd_obj_request_destroy(struct kref *kref) { struct rbd_obj_request *obj_request; + struct ceph_osd_request *osd_req; u32 i; obj_request = container_of(kref, struct rbd_obj_request, kref); dout("%s: obj %p\n", __func__, obj_request); - if (obj_request->osd_req) - rbd_osd_req_destroy(obj_request->osd_req); + while (!list_empty(&obj_request->osd_reqs)) { + osd_req = list_first_entry(&obj_request->osd_reqs, + struct ceph_osd_request, r_private_item); + list_del_init(&osd_req->r_private_item); + ceph_osdc_put_request(osd_req); + } switch (obj_request->img_request->data_type) { case OBJ_REQUEST_NODATA: @@ -1551,7 +1572,7 @@ kfree(obj_request->bvec_pos.bvecs); break; default: - rbd_assert(0); + BUG(); } kfree(obj_request->img_extents); @@ -1617,10 +1638,8 @@ if (!rbd_dev->parent_spec) return false; - down_read(&rbd_dev->header_rwsem); if (rbd_dev->parent_overlap) counter = atomic_inc_return_safe(&rbd_dev->parent_ref); - up_read(&rbd_dev->header_rwsem); if (counter < 0) rbd_warn(rbd_dev, "parent reference overflow"); @@ -1628,64 +1647,528 @@ return counter > 0; } -/* - * Caller is responsible for filling in the list of object requests - * that comprises the image request, and the Linux request pointer - * (if there is one). - */ -static struct rbd_img_request *rbd_img_request_create( - struct rbd_device *rbd_dev, - enum obj_operation_type op_type, - struct ceph_snap_context *snapc) +static void rbd_img_request_init(struct rbd_img_request *img_request, + struct rbd_device *rbd_dev, + enum obj_operation_type op_type) { - struct rbd_img_request *img_request; - - img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO); - if (!img_request) - return NULL; + memset(img_request, 0, sizeof(*img_request)); img_request->rbd_dev = rbd_dev; img_request->op_type = op_type; - if (!rbd_img_is_write(img_request)) - img_request->snap_id = rbd_dev->spec->snap_id; - else - img_request->snapc = snapc; - if (rbd_dev_parent_get(rbd_dev)) - img_request_layered_set(img_request); - - spin_lock_init(&img_request->completion_lock); + INIT_LIST_HEAD(&img_request->lock_item); INIT_LIST_HEAD(&img_request->object_extents); - kref_init(&img_request->kref); - - dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev, - obj_op_name(op_type), img_request); - return img_request; + mutex_init(&img_request->state_mutex); } -static void rbd_img_request_destroy(struct kref *kref) +/* + * Only snap_id is captured here, for reads. For writes, snapshot + * context is captured in rbd_img_object_requests() after exclusive + * lock is ensured to be held. + */ +static void rbd_img_capture_header(struct rbd_img_request *img_req) { - struct rbd_img_request *img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; + + lockdep_assert_held(&rbd_dev->header_rwsem); + + if (!rbd_img_is_write(img_req)) + img_req->snap_id = rbd_dev->spec->snap_id; + + if (rbd_dev_parent_get(rbd_dev)) + img_request_layered_set(img_req); +} + +static void rbd_img_request_destroy(struct rbd_img_request *img_request) +{ struct rbd_obj_request *obj_request; struct rbd_obj_request *next_obj_request; - img_request = container_of(kref, struct rbd_img_request, kref); - dout("%s: img %p\n", __func__, img_request); + WARN_ON(!list_empty(&img_request->lock_item)); for_each_obj_request_safe(img_request, obj_request, next_obj_request) rbd_img_obj_request_del(img_request, obj_request); - rbd_assert(img_request->obj_request_count == 0); - if (img_request_layered_test(img_request)) { - img_request_layered_clear(img_request); + if (img_request_layered_test(img_request)) rbd_dev_parent_put(img_request->rbd_dev); - } if (rbd_img_is_write(img_request)) ceph_put_snap_context(img_request->snapc); - kmem_cache_free(rbd_img_request_cache, img_request); + if (test_bit(IMG_REQ_CHILD, &img_request->flags)) + kmem_cache_free(rbd_img_request_cache, img_request); +} + +#define BITS_PER_OBJ 2 +#define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ) +#define OBJ_MASK ((1 << BITS_PER_OBJ) - 1) + +static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno, + u64 *index, u8 *shift) +{ + u32 off; + + rbd_assert(objno < rbd_dev->object_map_size); + *index = div_u64_rem(objno, OBJS_PER_BYTE, &off); + *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ; +} + +static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u64 index; + u8 shift; + + lockdep_assert_held(&rbd_dev->object_map_lock); + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + return (rbd_dev->object_map[index] >> shift) & OBJ_MASK; +} + +static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val) +{ + u64 index; + u8 shift; + u8 *p; + + lockdep_assert_held(&rbd_dev->object_map_lock); + rbd_assert(!(val & ~OBJ_MASK)); + + __rbd_object_map_index(rbd_dev, objno, &index, &shift); + p = &rbd_dev->object_map[index]; + *p = (*p & ~(OBJ_MASK << shift)) | (val << shift); +} + +static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + spin_unlock(&rbd_dev->object_map_lock); + return state; +} + +static bool use_object_map(struct rbd_device *rbd_dev) +{ + /* + * An image mapped read-only can't use the object map -- it isn't + * loaded because the header lock isn't acquired. Someone else can + * write to the image and update the object map behind our back. + * + * A snapshot can't be written to, so using the object map is always + * safe. + */ + if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev)) + return false; + + return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) && + !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)); +} + +static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno) +{ + u8 state; + + /* fall back to default logic if object map is disabled or invalid */ + if (!use_object_map(rbd_dev)) + return true; + + state = rbd_object_map_get(rbd_dev, objno); + return state != OBJECT_NONEXISTENT; +} + +static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id, + struct ceph_object_id *oid) +{ + if (snap_id == CEPH_NOSNAP) + ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id); + else + ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX, + rbd_dev->spec->image_id, snap_id); +} + +static int rbd_object_map_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + u8 lock_type; + char *lock_tag; + struct ceph_locker *lockers; + u32 num_lockers; + bool broke_lock = false; + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + +again: + ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0); + if (ret != -EBUSY || broke_lock) { + if (ret == -EEXIST) + ret = 0; /* already locked by myself */ + if (ret) + rbd_warn(rbd_dev, "failed to lock object map: %d", ret); + return ret; + } + + ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, &lock_type, &lock_tag, + &lockers, &num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret); + return ret; + } + + kfree(lock_tag); + if (num_lockers == 0) + goto again; + + rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, lockers[0].id.cookie, + &lockers[0].id.name); + ceph_free_lockers(lockers, num_lockers); + if (ret) { + if (ret == -ENOENT) + goto again; + + rbd_warn(rbd_dev, "failed to break object map lock: %d", ret); + return ret; + } + + broke_lock = true; + goto again; +} + +static void rbd_object_map_unlock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + int ret; + + rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid); + + ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, + ""); + if (ret && ret != -ENOENT) + rbd_warn(rbd_dev, "failed to unlock object map: %d", ret); +} + +static int decode_object_map_header(void **p, void *end, u64 *object_map_size) +{ + u8 struct_v; + u32 struct_len; + u32 header_len; + void *header_end; + int ret; + + ceph_decode_32_safe(p, end, header_len, e_inval); + header_end = *p + header_len; + + ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v, + &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, *object_map_size, e_inval); + + *p = header_end; + return 0; + +e_inval: + return -EINVAL; +} + +static int __rbd_object_map_load(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + CEPH_DEFINE_OID_ONSTACK(oid); + struct page **pages; + void *p, *end; + size_t reply_len; + u64 num_objects; + u64 object_map_bytes; + u64 object_map_size; + int num_pages; + int ret; + + rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size); + + num_objects = ceph_get_num_objects(&rbd_dev->layout, + rbd_dev->mapping.size); + object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, + BITS_PER_BYTE); + num_pages = calc_pages_for(0, object_map_bytes) + 1; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + reply_len = num_pages * PAGE_SIZE; + rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); + ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, + "rbd", "object_map_load", CEPH_OSD_FLAG_READ, + NULL, 0, pages, &reply_len); + if (ret) + goto out; + + p = page_address(pages[0]); + end = p + min(reply_len, (size_t)PAGE_SIZE); + ret = decode_object_map_header(&p, end, &object_map_size); + if (ret) + goto out; + + if (object_map_size != num_objects) { + rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", + object_map_size, num_objects); + ret = -EINVAL; + goto out; + } + + if (offset_in_page(p) + object_map_bytes > reply_len) { + ret = -EINVAL; + goto out; + } + + rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); + if (!rbd_dev->object_map) { + ret = -ENOMEM; + goto out; + } + + rbd_dev->object_map_size = object_map_size; + ceph_copy_from_page_vector(pages, rbd_dev->object_map, + offset_in_page(p), object_map_bytes); + +out: + ceph_release_page_vector(pages, num_pages); + return ret; +} + +static void rbd_object_map_free(struct rbd_device *rbd_dev) +{ + kvfree(rbd_dev->object_map); + rbd_dev->object_map = NULL; + rbd_dev->object_map_size = 0; +} + +static int rbd_object_map_load(struct rbd_device *rbd_dev) +{ + int ret; + + ret = __rbd_object_map_load(rbd_dev); + if (ret) + return ret; + + ret = rbd_dev_v2_get_flags(rbd_dev); + if (ret) { + rbd_object_map_free(rbd_dev); + return ret; + } + + if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID) + rbd_warn(rbd_dev, "object map is invalid"); + + return 0; +} + +static int rbd_object_map_open(struct rbd_device *rbd_dev) +{ + int ret; + + ret = rbd_object_map_lock(rbd_dev); + if (ret) + return ret; + + ret = rbd_object_map_load(rbd_dev); + if (ret) { + rbd_object_map_unlock(rbd_dev); + return ret; + } + + return 0; +} + +static void rbd_object_map_close(struct rbd_device *rbd_dev) +{ + rbd_object_map_free(rbd_dev); + rbd_object_map_unlock(rbd_dev); +} + +/* + * This function needs snap_id (or more precisely just something to + * distinguish between HEAD and snapshot object maps), new_state and + * current_state that were passed to rbd_object_map_update(). + * + * To avoid allocating and stashing a context we piggyback on the OSD + * request. A HEAD update has two ops (assert_locked). For new_state + * and current_state we decode our own object_map_update op, encoded in + * rbd_cls_object_map_update(). + */ +static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, + struct ceph_osd_request *osd_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_data *osd_data; + u64 objno; + u8 state, new_state, current_state; + bool has_current_state; + void *p; + + if (osd_req->r_result) + return osd_req->r_result; + + /* + * Nothing to do for a snapshot object map. + */ + if (osd_req->r_num_ops == 1) + return 0; + + /* + * Update in-memory HEAD object map. + */ + rbd_assert(osd_req->r_num_ops == 2); + osd_data = osd_req_op_data(osd_req, 1, cls, request_data); + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); + + p = page_address(osd_data->pages[0]); + objno = ceph_decode_64(&p); + rbd_assert(objno == obj_req->ex.oe_objno); + rbd_assert(ceph_decode_64(&p) == objno + 1); + new_state = ceph_decode_8(&p); + has_current_state = ceph_decode_8(&p); + if (has_current_state) + current_state = ceph_decode_8(&p); + + spin_lock(&rbd_dev->object_map_lock); + state = __rbd_object_map_get(rbd_dev, objno); + if (!has_current_state || current_state == state || + (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) + __rbd_object_map_set(rbd_dev, objno, new_state); + spin_unlock(&rbd_dev->object_map_lock); + + return 0; +} + +static void rbd_object_map_callback(struct ceph_osd_request *osd_req) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + int result; + + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, + osd_req->r_result, obj_req); + + result = rbd_object_map_update_finish(obj_req, osd_req); + rbd_obj_handle_request(obj_req, result); +} + +static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state) +{ + u8 state = rbd_object_map_get(rbd_dev, objno); + + if (state == new_state || + (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) || + (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) + return false; + + return true; +} + +static int rbd_cls_object_map_update(struct ceph_osd_request *req, + int which, u64 objno, u8 new_state, + const u8 *current_state) +{ + struct page **pages; + void *p, *start; + int ret; + + ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update"); + if (ret) + return ret; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + p = start = page_address(pages[0]); + ceph_encode_64(&p, objno); + ceph_encode_64(&p, objno + 1); + ceph_encode_8(&p, new_state); + if (current_state) { + ceph_encode_8(&p, 1); + ceph_encode_8(&p, *current_state); + } else { + ceph_encode_8(&p, 0); + } + + osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, + false, true); + return 0; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id, + u8 new_state, const u8 *current_state) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + int num_ops = 1; + int which = 0; + int ret; + + if (snap_id == CEPH_NOSNAP) { + if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state)) + return 1; + + num_ops++; /* assert_locked */ + } + + req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + list_add_tail(&req->r_private_item, &obj_req->osd_reqs); + req->r_callback = rbd_object_map_callback; + req->r_priv = obj_req; + + rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid); + ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + ktime_get_real_ts64(&req->r_mtime); + + if (snap_id == CEPH_NOSNAP) { + /* + * Protect against possible race conditions during lock + * ownership transitions. + */ + ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME, + CEPH_CLS_LOCK_EXCLUSIVE, "", ""); + if (ret) + return ret; + } + + ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno, + new_state, current_state); + if (ret) + return ret; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + return ret; + + ceph_osdc_start_request(osdc, req, false); + return 0; } static void prune_extents(struct ceph_file_extent *img_extents, @@ -1735,11 +2218,13 @@ return 0; } -static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) +static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which) { + struct rbd_obj_request *obj_req = osd_req->r_priv; + switch (obj_req->img_request->data_type) { case OBJ_REQUEST_BIO: - osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, + osd_req_op_extent_osd_data_bio(osd_req, which, &obj_req->bio_pos, obj_req->ex.oe_len); break; @@ -1748,30 +2233,15 @@ rbd_assert(obj_req->bvec_pos.iter.bi_size == obj_req->ex.oe_len); rbd_assert(obj_req->bvec_idx == obj_req->bvec_count); - osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which, + osd_req_op_extent_osd_data_bvec_pos(osd_req, which, &obj_req->bvec_pos); break; default: - rbd_assert(0); + BUG(); } } -static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) -{ - obj_req->osd_req = rbd_osd_req_create(obj_req, 1); - if (!obj_req->osd_req) - return -ENOMEM; - - osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, - obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - rbd_osd_req_setup_data(obj_req, 0); - - rbd_osd_req_format_read(obj_req); - return 0; -} - -static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, - unsigned int which) +static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which) { struct page **pages; @@ -1787,39 +2257,61 @@ if (IS_ERR(pages)) return PTR_ERR(pages); - osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, + osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(osd_req, which, pages, 8 + sizeof(struct ceph_timespec), 0, false, true); return 0; } -static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, - unsigned int which) +static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which, + u32 bytes) { + struct rbd_obj_request *obj_req = osd_req->r_priv; + int ret; + + ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup"); + if (ret) + return ret; + + osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs, + obj_req->copyup_bvec_count, bytes); + return 0; +} + +static int rbd_obj_init_read(struct rbd_obj_request *obj_req) +{ + obj_req->read_state = RBD_OBJ_READ_START; + return 0; +} + +static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; u16 opcode; - osd_req_op_alloc_hint_init(obj_req->osd_req, which++, - rbd_dev->layout.object_size, - rbd_dev->layout.object_size); + if (!use_object_map(rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) { + osd_req_op_alloc_hint_init(osd_req, which++, + rbd_dev->layout.object_size, + rbd_dev->layout.object_size, + rbd_dev->opts->alloc_hint_flags); + } if (rbd_obj_is_entire(obj_req)) opcode = CEPH_OSD_OP_WRITEFULL; else opcode = CEPH_OSD_OP_WRITE; - osd_req_op_extent_init(obj_req->osd_req, which, opcode, + osd_req_op_extent_init(osd_req, which, opcode, obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - rbd_osd_req_setup_data(obj_req, which++); - - rbd_assert(which == obj_req->osd_req->r_num_ops); - rbd_osd_req_format_write(obj_req); + rbd_osd_setup_data(osd_req, which); } -static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) +static int rbd_obj_init_write(struct rbd_obj_request *obj_req) { - unsigned int num_osd_ops, which = 0; int ret; /* reverse map the entire object onto the parent */ @@ -1827,61 +2319,104 @@ if (ret) return ret; - if (obj_req->num_img_extents) { - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - num_osd_ops = 3; /* stat + setallochint + write/writefull */ - } else { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - num_osd_ops = 2; /* setallochint + write/writefull */ - } - - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; - - if (obj_req->num_img_extents) { - ret = __rbd_obj_setup_stat(obj_req, which++); - if (ret) - return ret; - } - - __rbd_obj_setup_write(obj_req, which); + obj_req->write_state = RBD_OBJ_WRITE_START; return 0; } -static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, - unsigned int which) +static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req) { + return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE : + CEPH_OSD_OP_ZERO; +} + +static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) { + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0); + } else { + osd_req_op_extent_init(osd_req, which, + truncate_or_zero_opcode(obj_req), + obj_req->ex.oe_off, obj_req->ex.oe_len, + 0, 0); + } +} + +static int rbd_obj_init_discard(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u64 off, next_off; + int ret; + + /* + * Align the range to alloc_size boundary and punt on discards + * that are too small to free up any space. + * + * alloc_size == object_size && is_tail() is a special case for + * filestore with filestore_punch_hole = false, needed to allow + * truncate (in addition to delete). + */ + if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size || + !rbd_obj_is_tail(obj_req)) { + off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size); + next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len, + rbd_dev->opts->alloc_size); + if (off >= next_off) + return 1; + + dout("%s %p %llu~%llu -> %llu~%llu\n", __func__, + obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len, + off, next_off - off); + obj_req->ex.oe_off = off; + obj_req->ex.oe_len = next_off - off; + } + + /* reverse map the entire object onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, true); + if (ret) + return ret; + + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; + + obj_req->write_state = RBD_OBJ_WRITE_START; + return 0; +} + +static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; u16 opcode; if (rbd_obj_is_entire(obj_req)) { if (obj_req->num_img_extents) { - osd_req_op_init(obj_req->osd_req, which++, - CEPH_OSD_OP_CREATE, 0); + if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + osd_req_op_init(osd_req, which++, + CEPH_OSD_OP_CREATE, 0); opcode = CEPH_OSD_OP_TRUNCATE; } else { - osd_req_op_init(obj_req->osd_req, which++, + rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION); + osd_req_op_init(osd_req, which++, CEPH_OSD_OP_DELETE, 0); opcode = 0; } - } else if (rbd_obj_is_tail(obj_req)) { - opcode = CEPH_OSD_OP_TRUNCATE; } else { - opcode = CEPH_OSD_OP_ZERO; + opcode = truncate_or_zero_opcode(obj_req); } if (opcode) - osd_req_op_extent_init(obj_req->osd_req, which++, opcode, + osd_req_op_extent_init(osd_req, which, opcode, obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); - - rbd_assert(which == obj_req->osd_req->r_num_ops); - rbd_osd_req_format_write(obj_req); } -static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) +static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req) { - unsigned int num_osd_ops, which = 0; int ret; /* reverse map the entire object onto the parent */ @@ -1889,64 +2424,96 @@ if (ret) return ret; - if (rbd_obj_is_entire(obj_req)) { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - if (obj_req->num_img_extents) - num_osd_ops = 2; /* create + truncate */ - else - num_osd_ops = 1; /* delete */ - } else { - if (obj_req->num_img_extents) { - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - num_osd_ops = 2; /* stat + truncate/zero */ - } else { - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - num_osd_ops = 1; /* truncate/zero */ - } + if (!obj_req->num_img_extents) { + obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT; + if (rbd_obj_is_entire(obj_req)) + obj_req->flags |= RBD_OBJ_FLAG_DELETION; } - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; - - if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) { - ret = __rbd_obj_setup_stat(obj_req, which++); - if (ret) - return ret; - } - - __rbd_obj_setup_discard(obj_req, which); + obj_req->write_state = RBD_OBJ_WRITE_START; return 0; } +static int count_write_ops(struct rbd_obj_request *obj_req) +{ + struct rbd_img_request *img_req = obj_req->img_request; + + switch (img_req->op_type) { + case OBJ_OP_WRITE: + if (!use_object_map(img_req->rbd_dev) || + !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) + return 2; /* setallochint + write/writefull */ + + return 1; /* write/writefull */ + case OBJ_OP_DISCARD: + return 1; /* delete/truncate/zero */ + case OBJ_OP_ZEROOUT: + if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents && + !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)) + return 2; /* create + truncate */ + + return 1; /* delete/truncate/zero */ + default: + BUG(); + } +} + +static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, + int which) +{ + struct rbd_obj_request *obj_req = osd_req->r_priv; + + switch (obj_req->img_request->op_type) { + case OBJ_OP_WRITE: + __rbd_osd_setup_write_ops(osd_req, which); + break; + case OBJ_OP_DISCARD: + __rbd_osd_setup_discard_ops(osd_req, which); + break; + case OBJ_OP_ZEROOUT: + __rbd_osd_setup_zeroout_ops(osd_req, which); + break; + default: + BUG(); + } +} + /* - * For each object request in @img_req, allocate an OSD request, add - * individual OSD ops and prepare them for submission. The number of - * OSD ops depends on op_type and the overlap point (if any). + * Prune the list of object requests (adjust offset and/or length, drop + * redundant requests). Prepare object request state machines and image + * request state machine for execution. */ static int __rbd_img_fill_request(struct rbd_img_request *img_req) { - struct rbd_obj_request *obj_req; + struct rbd_obj_request *obj_req, *next_obj_req; int ret; - for_each_obj_request(img_req, obj_req) { + for_each_obj_request_safe(img_req, obj_req, next_obj_req) { switch (img_req->op_type) { case OBJ_OP_READ: - ret = rbd_obj_setup_read(obj_req); + ret = rbd_obj_init_read(obj_req); break; case OBJ_OP_WRITE: - ret = rbd_obj_setup_write(obj_req); + ret = rbd_obj_init_write(obj_req); break; case OBJ_OP_DISCARD: - ret = rbd_obj_setup_discard(obj_req); + ret = rbd_obj_init_discard(obj_req); + break; + case OBJ_OP_ZEROOUT: + ret = rbd_obj_init_zeroout(obj_req); break; default: - rbd_assert(0); + BUG(); } - if (ret) + if (ret < 0) return ret; + if (ret > 0) { + rbd_img_obj_request_del(img_req, obj_req); + continue; + } } + img_req->state = RBD_IMG_START; return 0; } @@ -2235,32 +2802,78 @@ &it); } -static void rbd_img_request_submit(struct rbd_img_request *img_request) +static void rbd_img_handle_request_work(struct work_struct *work) { - struct rbd_obj_request *obj_request; + struct rbd_img_request *img_req = + container_of(work, struct rbd_img_request, work); - dout("%s: img %p\n", __func__, img_request); + rbd_img_handle_request(img_req, img_req->work_result); +} - rbd_img_request_get(img_request); - for_each_obj_request(img_request, obj_request) - rbd_obj_request_submit(obj_request); +static void rbd_img_schedule(struct rbd_img_request *img_req, int result) +{ + INIT_WORK(&img_req->work, rbd_img_handle_request_work); + img_req->work_result = result; + queue_work(rbd_wq, &img_req->work); +} - rbd_img_request_put(img_request); +static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) { + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + return true; + } + + dout("%s %p objno %llu assuming dne\n", __func__, obj_req, + obj_req->ex.oe_objno); + return false; +} + +static int rbd_obj_read_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int ret; + + osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ, + obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0); + rbd_osd_setup_data(osd_req, 0); + rbd_osd_format_read(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; } static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req) { struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *parent = img_req->rbd_dev->parent; struct rbd_img_request *child_img_req; int ret; - child_img_req = rbd_img_request_create(img_req->rbd_dev->parent, - OBJ_OP_READ, NULL); + child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO); if (!child_img_req) return -ENOMEM; + rbd_img_request_init(child_img_req, parent, OBJ_OP_READ); __set_bit(IMG_REQ_CHILD, &child_img_req->flags); child_img_req->obj_request = obj_req; + + down_read(&parent->header_rwsem); + rbd_img_capture_header(child_img_req); + up_read(&parent->header_rwsem); + + dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req, + obj_req); if (!rbd_img_is_write(img_req)) { switch (img_req->data_type) { @@ -2278,7 +2891,7 @@ &obj_req->bvec_pos); break; default: - rbd_assert(0); + BUG(); } } else { ret = rbd_img_fill_from_bvecs(child_img_req, @@ -2287,55 +2900,159 @@ obj_req->copyup_bvecs); } if (ret) { - rbd_img_request_put(child_img_req); + rbd_img_request_destroy(child_img_req); return ret; } - rbd_img_request_submit(child_img_req); + /* avoid parent chain recursion */ + rbd_img_schedule(child_img_req, 0); return 0; } -static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) +static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; - if (obj_req->result == -ENOENT && - rbd_dev->parent_overlap && !obj_req->tried_parent) { - /* reverse map this object extent onto the parent */ - ret = rbd_obj_calc_img_extents(obj_req, false); +again: + switch (obj_req->read_state) { + case RBD_OBJ_READ_START: + rbd_assert(!*result); + + if (!rbd_obj_may_exist(obj_req)) { + *result = -ENOENT; + obj_req->read_state = RBD_OBJ_READ_OBJECT; + goto again; + } + + ret = rbd_obj_read_object(obj_req); if (ret) { - obj_req->result = ret; + *result = ret; return true; } - - if (obj_req->num_img_extents) { - obj_req->tried_parent = true; - ret = rbd_obj_read_from_parent(obj_req); + obj_req->read_state = RBD_OBJ_READ_OBJECT; + return false; + case RBD_OBJ_READ_OBJECT: + if (*result == -ENOENT && rbd_dev->parent_overlap) { + /* reverse map this object extent onto the parent */ + ret = rbd_obj_calc_img_extents(obj_req, false); if (ret) { - obj_req->result = ret; + *result = ret; return true; } - return false; + if (obj_req->num_img_extents) { + ret = rbd_obj_read_from_parent(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->read_state = RBD_OBJ_READ_PARENT; + return false; + } } + + /* + * -ENOENT means a hole in the image -- zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. + */ + if (*result == -ENOENT) { + rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len); + *result = 0; + } else if (*result >= 0) { + if (*result < obj_req->ex.oe_len) + rbd_obj_zero_range(obj_req, *result, + obj_req->ex.oe_len - *result); + else + rbd_assert(*result == obj_req->ex.oe_len); + *result = 0; + } + return true; + case RBD_OBJ_READ_PARENT: + /* + * The parent image is read only up to the overlap -- zero-fill + * from the overlap to the end of the request. + */ + if (!*result) { + u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req); + + if (obj_overlap < obj_req->ex.oe_len) + rbd_obj_zero_range(obj_req, obj_overlap, + obj_req->ex.oe_len - obj_overlap); + } + return true; + default: + BUG(); + } +} + +static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + + if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) + obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST; + + if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) && + (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) { + dout("%s %p noop for nonexistent\n", __func__, obj_req); + return true; } - /* - * -ENOENT means a hole in the image -- zero-fill the entire - * length of the request. A short read also implies zero-fill - * to the end of the request. In both cases we update xferred - * count to indicate the whole request was satisfied. - */ - if (obj_req->result == -ENOENT || - (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) { - rbd_assert(!obj_req->xferred || !obj_req->result); - rbd_obj_zero_range(obj_req, obj_req->xferred, - obj_req->ex.oe_len - obj_req->xferred); - obj_req->result = 0; - obj_req->xferred = obj_req->ex.oe_len; + return false; +} + +/* + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error + */ +static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 new_state; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + new_state = OBJECT_PENDING; + else + new_state = OBJECT_EXISTS; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL); +} + +static int rbd_obj_write_object(struct rbd_obj_request *obj_req) +{ + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; + int ret; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) + num_ops++; /* stat */ + + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + ret = rbd_osd_setup_stat(osd_req, which++); + if (ret) + return ret; } - return true; + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; } /* @@ -2356,56 +3073,66 @@ return true; } -static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) +#define MODS_ONLY U32_MAX + +static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req, + u32 bytes) { - unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; + struct ceph_osd_request *osd_req; int ret; dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); - rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); - rbd_osd_req_destroy(obj_req->osd_req); + rbd_assert(bytes > 0 && bytes != MODS_ONLY); - /* - * Create a copyup request with the same number of OSD ops as - * the original request. The original request was stat + op(s), - * the new copyup request will be copyup + the same op(s). - */ - obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops); - if (!obj_req->osd_req) - return -ENOMEM; + osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); - ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", - "copyup"); + ret = rbd_osd_setup_copyup(osd_req, 0, bytes); if (ret) return ret; - /* - * Only send non-zero copyup data to save some I/O and network - * bandwidth -- zero copyup data is equivalent to the object not - * existing. - */ - if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { - dout("%s obj_req %p detected zeroes\n", __func__, obj_req); - bytes = 0; - } - osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, - obj_req->copyup_bvecs, - obj_req->copyup_bvec_count, - bytes); + rbd_osd_format_write(osd_req); - switch (obj_req->img_request->op_type) { - case OBJ_OP_WRITE: - __rbd_obj_setup_write(obj_req, 1); - break; - case OBJ_OP_DISCARD: - rbd_assert(!rbd_obj_is_entire(obj_req)); - __rbd_obj_setup_discard(obj_req, 1); - break; - default: - rbd_assert(0); + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); + return 0; +} + +static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req, + u32 bytes) +{ + struct ceph_osd_request *osd_req; + int num_ops = count_write_ops(obj_req); + int which = 0; + int ret; + + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); + + if (bytes != MODS_ONLY) + num_ops++; /* copyup */ + + osd_req = rbd_obj_add_osd_request(obj_req, num_ops); + if (IS_ERR(osd_req)) + return PTR_ERR(osd_req); + + if (bytes != MODS_ONLY) { + ret = rbd_osd_setup_copyup(osd_req, which++, bytes); + if (ret) + return ret; } - rbd_obj_request_submit(obj_req); + rbd_osd_setup_write_ops(osd_req, which); + rbd_osd_format_write(osd_req); + + ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO); + if (ret) + return ret; + + rbd_osd_submit(osd_req); return 0; } @@ -2437,7 +3164,12 @@ return 0; } -static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) +/* + * The target object doesn't exist. Read the data for the entire + * target object up to the overlap point (if any) from the parent, + * so we can use it for a copyup. + */ +static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req) { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; @@ -2448,174 +3180,503 @@ if (!obj_req->num_img_extents) { /* * The overlap has become 0 (most likely because the - * image has been flattened). Use rbd_obj_issue_copyup() - * to re-submit the original write request -- the copyup - * operation itself will be a no-op, since someone must - * have populated the child object while we weren't - * looking. Move to WRITE_FLAT state as we'll be done - * with the operation once the null copyup completes. + * image has been flattened). Re-submit the original write + * request -- pass MODS_ONLY since the copyup isn't needed + * anymore. */ - obj_req->write_state = RBD_OBJ_WRITE_FLAT; - return rbd_obj_issue_copyup(obj_req, 0); + return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY); } ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req)); if (ret) return ret; - obj_req->write_state = RBD_OBJ_WRITE_COPYUP; return rbd_obj_read_from_parent(obj_req); } -static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) +static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req) { + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + struct ceph_snap_context *snapc = obj_req->img_request->snapc; + u8 new_state; + u32 i; + int ret; + + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return; + + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + return; + + for (i = 0; i < snapc->num_snaps; i++) { + if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) && + i + 1 < snapc->num_snaps) + new_state = OBJECT_EXISTS_CLEAN; + else + new_state = OBJECT_EXISTS; + + ret = rbd_object_map_update(obj_req, snapc->snaps[i], + new_state, NULL); + if (ret < 0) { + obj_req->pending.result = ret; + return; + } + + rbd_assert(!ret); + obj_req->pending.num_pending++; + } +} + +static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req) +{ + u32 bytes = rbd_obj_img_extents_bytes(obj_req); + int ret; + + rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending); + + /* + * Only send non-zero copyup data to save some I/O and network + * bandwidth -- zero copyup data is equivalent to the object not + * existing. + */ + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS) + bytes = 0; + + if (obj_req->img_request->snapc->num_snaps && bytes > 0) { + /* + * Send a copyup request with an empty snapshot context to + * deep-copyup the object through all existing snapshots. + * A second request with the current snapshot context will be + * sent for the actual modification. + */ + ret = rbd_obj_copyup_empty_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } + + obj_req->pending.num_pending++; + bytes = MODS_ONLY; + } + + ret = rbd_obj_copyup_current_snapc(obj_req, bytes); + if (ret) { + obj_req->pending.result = ret; + return; + } + + obj_req->pending.num_pending++; +} + +static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; int ret; again: - switch (obj_req->write_state) { - case RBD_OBJ_WRITE_GUARD: - rbd_assert(!obj_req->xferred); - if (obj_req->result == -ENOENT) { - /* - * The target object doesn't exist. Read the data for - * the entire target object up to the overlap point (if - * any) from the parent, so we can use it for a copyup. - */ - ret = rbd_obj_handle_write_guard(obj_req); - if (ret) { - obj_req->result = ret; - return true; - } - return false; - } - /* fall through */ - case RBD_OBJ_WRITE_FLAT: - if (!obj_req->result) - /* - * There is no such thing as a successful short - * write -- indicate the whole request was satisfied. - */ - obj_req->xferred = obj_req->ex.oe_len; - return true; - case RBD_OBJ_WRITE_COPYUP: - obj_req->write_state = RBD_OBJ_WRITE_GUARD; - if (obj_req->result) - goto again; + switch (obj_req->copyup_state) { + case RBD_OBJ_COPYUP_START: + rbd_assert(!*result); - rbd_assert(obj_req->xferred); - ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); + ret = rbd_obj_copyup_read_parent(obj_req); if (ret) { - obj_req->result = ret; - obj_req->xferred = 0; + *result = ret; return true; } + if (obj_req->num_img_extents) + obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT; + else + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; return false; + case RBD_OBJ_COPYUP_READ_PARENT: + if (*result) + return true; + + if (is_zero_bvecs(obj_req->copyup_bvecs, + rbd_obj_img_extents_bytes(obj_req))) { + dout("%s %p detected zeros\n", __func__, obj_req); + obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS; + } + + rbd_obj_copyup_object_maps(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS; + return false; + case __RBD_OBJ_COPYUP_OBJECT_MAPS: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + fallthrough; + case RBD_OBJ_COPYUP_OBJECT_MAPS: + if (*result) { + rbd_warn(rbd_dev, "snap object map update failed: %d", + *result); + return true; + } + + rbd_obj_copyup_write_object(obj_req); + if (!obj_req->pending.num_pending) { + *result = obj_req->pending.result; + obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT; + goto again; + } + obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT; + return false; + case __RBD_OBJ_COPYUP_WRITE_OBJECT: + if (!pending_result_dec(&obj_req->pending, result)) + return false; + fallthrough; + case RBD_OBJ_COPYUP_WRITE_OBJECT: + return true; default: BUG(); } } /* - * Returns true if @obj_req is completed, or false otherwise. + * Return: + * 0 - object map update sent + * 1 - object map update isn't needed + * <0 - error */ -static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) +static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req) { - switch (obj_req->img_request->op_type) { - case OBJ_OP_READ: - return rbd_obj_handle_read(obj_req); - case OBJ_OP_WRITE: - return rbd_obj_handle_write(obj_req); - case OBJ_OP_DISCARD: - if (rbd_obj_handle_write(obj_req)) { - /* - * Hide -ENOENT from delete/truncate/zero -- discarding - * a non-existent object is not a problem. - */ - if (obj_req->result == -ENOENT) { - obj_req->result = 0; - obj_req->xferred = obj_req->ex.oe_len; - } + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + u8 current_state = OBJECT_PENDING; + + if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return 1; + + if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION)) + return 1; + + return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT, + ¤t_state); +} + +static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result) +{ + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; + int ret; + +again: + switch (obj_req->write_state) { + case RBD_OBJ_WRITE_START: + rbd_assert(!*result); + + rbd_obj_set_copyup_enabled(obj_req); + if (rbd_obj_write_is_noop(obj_req)) + return true; + + ret = rbd_obj_write_pre_object_map(obj_req); + if (ret < 0) { + *result = ret; return true; } + obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP; + if (ret > 0) + goto again; return false; + case RBD_OBJ_WRITE_PRE_OBJECT_MAP: + if (*result) { + rbd_warn(rbd_dev, "pre object map update failed: %d", + *result); + return true; + } + ret = rbd_obj_write_object(obj_req); + if (ret) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_OBJECT; + return false; + case RBD_OBJ_WRITE_OBJECT: + if (*result == -ENOENT) { + if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) { + *result = 0; + obj_req->copyup_state = RBD_OBJ_COPYUP_START; + obj_req->write_state = __RBD_OBJ_WRITE_COPYUP; + goto again; + } + /* + * On a non-existent object: + * delete - -ENOENT, truncate/zero - 0 + */ + if (obj_req->flags & RBD_OBJ_FLAG_DELETION) + *result = 0; + } + if (*result) + return true; + + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; + goto again; + case __RBD_OBJ_WRITE_COPYUP: + if (!rbd_obj_advance_copyup(obj_req, result)) + return false; + fallthrough; + case RBD_OBJ_WRITE_COPYUP: + if (*result) { + rbd_warn(rbd_dev, "copyup failed: %d", *result); + return true; + } + ret = rbd_obj_write_post_object_map(obj_req); + if (ret < 0) { + *result = ret; + return true; + } + obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP; + if (ret > 0) + goto again; + return false; + case RBD_OBJ_WRITE_POST_OBJECT_MAP: + if (*result) + rbd_warn(rbd_dev, "post object map update failed: %d", + *result); + return true; default: BUG(); } } -static void rbd_obj_end_request(struct rbd_obj_request *obj_req) +/* + * Return true if @obj_req is completed. + */ +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req, + int *result) { struct rbd_img_request *img_req = obj_req->img_request; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; - rbd_assert((!obj_req->result && - obj_req->xferred == obj_req->ex.oe_len) || - (obj_req->result < 0 && !obj_req->xferred)); - if (!obj_req->result) { - img_req->xferred += obj_req->xferred; - return; - } + mutex_lock(&obj_req->state_mutex); + if (!rbd_img_is_write(img_req)) + done = rbd_obj_advance_read(obj_req, result); + else + done = rbd_obj_advance_write(obj_req, result); + mutex_unlock(&obj_req->state_mutex); - rbd_warn(img_req->rbd_dev, - "%s at objno %llu %llu~%llu result %d xferred %llu", - obj_op_name(img_req->op_type), obj_req->ex.oe_objno, - obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result, - obj_req->xferred); - if (!img_req->result) { - img_req->result = obj_req->result; - img_req->xferred = 0; + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d", + obj_op_name(img_req->op_type), obj_req->ex.oe_objno, + obj_req->ex.oe_off, obj_req->ex.oe_len, *result); } + return done; } -static void rbd_img_end_child_request(struct rbd_img_request *img_req) +/* + * This is open-coded in rbd_img_handle_request() to avoid parent chain + * recursion. + */ +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result) { - struct rbd_obj_request *obj_req = img_req->obj_request; - - rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) || - (img_req->result < 0 && !img_req->xferred)); - - obj_req->result = img_req->result; - obj_req->xferred = img_req->xferred; - rbd_img_request_put(img_req); + if (__rbd_obj_handle_request(obj_req, &result)) + rbd_img_handle_request(obj_req->img_request, result); } -static void rbd_img_end_request(struct rbd_img_request *img_req) +static bool need_exclusive_lock(struct rbd_img_request *img_req) { + struct rbd_device *rbd_dev = img_req->rbd_dev; + + if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) + return false; + + if (rbd_is_ro(rbd_dev)) + return false; + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); - rbd_assert((!img_req->result && - img_req->xferred == blk_rq_bytes(img_req->rq)) || - (img_req->result < 0 && !img_req->xferred)); + if (rbd_dev->opts->lock_on_read || + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) + return true; - blk_mq_end_request(img_req->rq, - errno_to_blk_status(img_req->result)); - rbd_img_request_put(img_req); + return rbd_img_is_write(img_req); } -static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) +static bool rbd_lock_add_request(struct rbd_img_request *img_req) { - struct rbd_img_request *img_req; + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool locked; + + lockdep_assert_held(&rbd_dev->lock_rwsem); + locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED; + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(list_empty(&img_req->lock_item)); + if (!locked) + list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list); + else + list_add_tail(&img_req->lock_item, &rbd_dev->running_list); + spin_unlock(&rbd_dev->lock_lists_lock); + return locked; +} + +static void rbd_lock_del_request(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool need_wakeup; + + lockdep_assert_held(&rbd_dev->lock_rwsem); + spin_lock(&rbd_dev->lock_lists_lock); + rbd_assert(!list_empty(&img_req->lock_item)); + list_del_init(&img_req->lock_item); + need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING && + list_empty(&rbd_dev->running_list)); + spin_unlock(&rbd_dev->lock_lists_lock); + if (need_wakeup) + complete(&rbd_dev->releasing_wait); +} + +static int rbd_img_exclusive_lock(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + + if (!need_exclusive_lock(img_req)) + return 1; + + if (rbd_lock_add_request(img_req)) + return 1; + + if (rbd_dev->opts->exclusive) { + WARN_ON(1); /* lock got released? */ + return -EROFS; + } + + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_lock_waiters(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + return 0; +} + +static void rbd_img_object_requests(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + struct rbd_obj_request *obj_req; + + rbd_assert(!img_req->pending.result && !img_req->pending.num_pending); + rbd_assert(!need_exclusive_lock(img_req) || + __rbd_is_lock_owner(rbd_dev)); + + if (rbd_img_is_write(img_req)) { + rbd_assert(!img_req->snapc); + down_read(&rbd_dev->header_rwsem); + img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc); + up_read(&rbd_dev->header_rwsem); + } + + for_each_obj_request(img_req, obj_req) { + int result = 0; + + if (__rbd_obj_handle_request(obj_req, &result)) { + if (result) { + img_req->pending.result = result; + return; + } + } else { + img_req->pending.num_pending++; + } + } +} + +static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) +{ + int ret; again: - if (!__rbd_obj_handle_request(obj_req)) - return; + switch (img_req->state) { + case RBD_IMG_START: + rbd_assert(!*result); - img_req = obj_req->img_request; - spin_lock(&img_req->completion_lock); - rbd_obj_end_request(obj_req); - rbd_assert(img_req->pending_count); - if (--img_req->pending_count) { - spin_unlock(&img_req->completion_lock); - return; + ret = rbd_img_exclusive_lock(img_req); + if (ret < 0) { + *result = ret; + return true; + } + img_req->state = RBD_IMG_EXCLUSIVE_LOCK; + if (ret > 0) + goto again; + return false; + case RBD_IMG_EXCLUSIVE_LOCK: + if (*result) + return true; + + rbd_img_object_requests(img_req); + if (!img_req->pending.num_pending) { + *result = img_req->pending.result; + img_req->state = RBD_IMG_OBJECT_REQUESTS; + goto again; + } + img_req->state = __RBD_IMG_OBJECT_REQUESTS; + return false; + case __RBD_IMG_OBJECT_REQUESTS: + if (!pending_result_dec(&img_req->pending, result)) + return false; + fallthrough; + case RBD_IMG_OBJECT_REQUESTS: + return true; + default: + BUG(); + } +} + +/* + * Return true if @img_req is completed. + */ +static bool __rbd_img_handle_request(struct rbd_img_request *img_req, + int *result) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + bool done; + + if (need_exclusive_lock(img_req)) { + down_read(&rbd_dev->lock_rwsem); + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + if (done) + rbd_lock_del_request(img_req); + mutex_unlock(&img_req->state_mutex); + up_read(&rbd_dev->lock_rwsem); + } else { + mutex_lock(&img_req->state_mutex); + done = rbd_img_advance(img_req, result); + mutex_unlock(&img_req->state_mutex); } - spin_unlock(&img_req->completion_lock); + if (done && *result) { + rbd_assert(*result < 0); + rbd_warn(rbd_dev, "%s%s result %d", + test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "", + obj_op_name(img_req->op_type), *result); + } + return done; +} + +static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) +{ +again: + if (!__rbd_img_handle_request(img_req, &result)) + return; + if (test_bit(IMG_REQ_CHILD, &img_req->flags)) { - obj_req = img_req->obj_request; - rbd_img_end_child_request(img_req); - goto again; + struct rbd_obj_request *obj_req = img_req->obj_request; + + rbd_img_request_destroy(img_req); + if (__rbd_obj_handle_request(obj_req, &result)) { + img_req = obj_req->img_request; + goto again; + } + } else { + struct request *rq = blk_mq_rq_from_pdu(img_req); + + rbd_img_request_destroy(img_req); + blk_mq_end_request(rq, errno_to_blk_status(result)); } - rbd_img_end_request(img_req); } static const struct rbd_client_id rbd_empty_cid; @@ -2660,6 +3721,7 @@ { struct rbd_client_id cid = rbd_get_cid(rbd_dev); + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; strcpy(rbd_dev->lock_cookie, cookie); rbd_set_owner_cid(rbd_dev, &cid); queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); @@ -2681,10 +3743,9 @@ ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, RBD_LOCK_TAG, "", 0); - if (ret) + if (ret && ret != -EEXIST) return ret; - rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; __rbd_lock(rbd_dev, cookie); return 0; } @@ -2703,7 +3764,7 @@ ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT) - rbd_warn(rbd_dev, "failed to unlock: %d", ret); + rbd_warn(rbd_dev, "failed to unlock header: %d", ret); /* treat errors as the image is unlocked */ rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; @@ -2739,11 +3800,7 @@ static void rbd_notify_op_lock(struct rbd_device *rbd_dev, enum rbd_notify_op notify_op) { - struct page **reply_pages; - size_t reply_len; - - __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); - ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); + __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL); } static void rbd_notify_acquired_lock(struct work_struct *work) @@ -2830,21 +3887,56 @@ goto out; } -static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +/* + * Either image request state machine(s) or rbd_add_acquire_lock() + * (i.e. "rbd map"). + */ +static void wake_lock_waiters(struct rbd_device *rbd_dev, int result) { - dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + struct rbd_img_request *img_req; + + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); cancel_delayed_work(&rbd_dev->lock_dwork); - if (wake_all) - wake_up_all(&rbd_dev->lock_waitq); - else - wake_up(&rbd_dev->lock_waitq); + if (!completion_done(&rbd_dev->acquire_wait)) { + rbd_assert(list_empty(&rbd_dev->acquiring_list) && + list_empty(&rbd_dev->running_list)); + rbd_dev->acquire_err = result; + complete_all(&rbd_dev->acquire_wait); + return; + } + + list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) { + mutex_lock(&img_req->state_mutex); + rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK); + rbd_img_schedule(img_req, result); + mutex_unlock(&img_req->state_mutex); + } + + list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list); } -static int get_lock_owner_info(struct rbd_device *rbd_dev, - struct ceph_locker **lockers, u32 *num_lockers) +static bool locker_equal(const struct ceph_locker *lhs, + const struct ceph_locker *rhs) +{ + return lhs->id.name.type == rhs->id.name.type && + lhs->id.name.num == rhs->id.name.num && + !strcmp(lhs->id.cookie, rhs->id.cookie) && + ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr); +} + +static void free_locker(struct ceph_locker *locker) +{ + if (locker) + ceph_free_lockers(locker, 1); +} + +static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_locker *lockers; + u32 num_lockers; u8 lock_type; char *lock_tag; int ret; @@ -2853,39 +3945,45 @@ ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, - &lock_type, &lock_tag, lockers, num_lockers); - if (ret) - return ret; + &lock_type, &lock_tag, &lockers, &num_lockers); + if (ret) { + rbd_warn(rbd_dev, "failed to get header lockers: %d", ret); + return ERR_PTR(ret); + } - if (*num_lockers == 0) { + if (num_lockers == 0) { dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); + lockers = NULL; goto out; } if (strcmp(lock_tag, RBD_LOCK_TAG)) { rbd_warn(rbd_dev, "locked by external mechanism, tag %s", lock_tag); - ret = -EBUSY; - goto out; + goto err_busy; } if (lock_type == CEPH_CLS_LOCK_SHARED) { rbd_warn(rbd_dev, "shared lock type detected"); - ret = -EBUSY; - goto out; + goto err_busy; } - if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, + WARN_ON(num_lockers != 1); + if (strncmp(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, strlen(RBD_LOCK_COOKIE_PREFIX))) { rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", - (*lockers)[0].id.cookie); - ret = -EBUSY; - goto out; + lockers[0].id.cookie); + goto err_busy; } out: kfree(lock_tag); - return ret; + return lockers; + +err_busy: + kfree(lock_tag); + ceph_free_lockers(lockers, num_lockers); + return ERR_PTR(-EBUSY); } static int find_watcher(struct rbd_device *rbd_dev, @@ -2901,13 +3999,19 @@ ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, &watchers, &num_watchers); - if (ret) + if (ret) { + rbd_warn(rbd_dev, "failed to get watchers: %d", ret); return ret; + } sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); for (i = 0; i < num_watchers; i++) { - if (!memcmp(&watchers[i].addr, &locker->info.addr, - sizeof(locker->info.addr)) && + /* + * Ignore addr->type while comparing. This mimics + * entity_addr_t::get_legacy_str() + strcmp(). + */ + if (ceph_addr_equal_no_type(&watchers[i].addr, + &locker->info.addr) && watchers[i].cookie == cookie) { struct rbd_client_id cid = { .gid = le64_to_cpu(watchers[i].name.num), @@ -2935,104 +4039,160 @@ static int rbd_try_lock(struct rbd_device *rbd_dev) { struct ceph_client *client = rbd_dev->rbd_client->client; - struct ceph_locker *lockers; - u32 num_lockers; + struct ceph_locker *locker, *refreshed_locker; int ret; for (;;) { + locker = refreshed_locker = NULL; + ret = rbd_lock(rbd_dev); - if (ret != -EBUSY) - return ret; - - /* determine if the current lock holder is still alive */ - ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); - if (ret) - return ret; - - if (num_lockers == 0) - goto again; - - ret = find_watcher(rbd_dev, lockers); - if (ret) { - if (ret > 0) - ret = 0; /* have to request lock */ + if (!ret) + goto out; + if (ret != -EBUSY) { + rbd_warn(rbd_dev, "failed to lock header: %d", ret); goto out; } - rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", - ENTITY_NAME(lockers[0].id.name)); + /* determine if the current lock holder is still alive */ + locker = get_lock_owner_info(rbd_dev); + if (IS_ERR(locker)) { + ret = PTR_ERR(locker); + locker = NULL; + goto out; + } + if (!locker) + goto again; - ret = ceph_monc_blacklist_add(&client->monc, - &lockers[0].info.addr); + ret = find_watcher(rbd_dev, locker); + if (ret) + goto out; /* request lock or error */ + + refreshed_locker = get_lock_owner_info(rbd_dev); + if (IS_ERR(refreshed_locker)) { + ret = PTR_ERR(refreshed_locker); + refreshed_locker = NULL; + goto out; + } + if (!refreshed_locker || + !locker_equal(locker, refreshed_locker)) + goto again; + + rbd_warn(rbd_dev, "breaking header lock owned by %s%llu", + ENTITY_NAME(locker->id.name)); + + ret = ceph_monc_blocklist_add(&client->monc, + &locker->info.addr); if (ret) { - rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", - ENTITY_NAME(lockers[0].id.name), ret); + rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d", + ENTITY_NAME(locker->id.name), ret); goto out; } ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, - lockers[0].id.cookie, - &lockers[0].id.name); - if (ret && ret != -ENOENT) + locker->id.cookie, &locker->id.name); + if (ret && ret != -ENOENT) { + rbd_warn(rbd_dev, "failed to break header lock: %d", + ret); goto out; + } again: - ceph_free_lockers(lockers, num_lockers); + free_locker(refreshed_locker); + free_locker(locker); } out: - ceph_free_lockers(lockers, num_lockers); + free_locker(refreshed_locker); + free_locker(locker); return ret; } -/* - * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED - */ -static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, - int *pret) +static int rbd_post_acquire_action(struct rbd_device *rbd_dev) { - enum rbd_lock_state lock_state; + int ret; + + ret = rbd_dev_refresh(rbd_dev); + if (ret) + return ret; + + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) { + ret = rbd_object_map_open(rbd_dev); + if (ret) + return ret; + } + + return 0; +} + +/* + * Return: + * 0 - lock acquired + * 1 - caller should call rbd_request_lock() + * <0 - error + */ +static int rbd_try_acquire_lock(struct rbd_device *rbd_dev) +{ + int ret; down_read(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); if (__rbd_is_lock_owner(rbd_dev)) { - lock_state = rbd_dev->lock_state; up_read(&rbd_dev->lock_rwsem); - return lock_state; + return 0; } up_read(&rbd_dev->lock_rwsem); down_write(&rbd_dev->lock_rwsem); dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, rbd_dev->lock_state); - if (!__rbd_is_lock_owner(rbd_dev)) { - *pret = rbd_try_lock(rbd_dev); - if (*pret) - rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + if (__rbd_is_lock_owner(rbd_dev)) { + up_write(&rbd_dev->lock_rwsem); + return 0; } - lock_state = rbd_dev->lock_state; + ret = rbd_try_lock(rbd_dev); + if (ret < 0) { + rbd_warn(rbd_dev, "failed to acquire lock: %d", ret); + goto out; + } + if (ret > 0) { + up_write(&rbd_dev->lock_rwsem); + return ret; + } + + rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED); + rbd_assert(list_empty(&rbd_dev->running_list)); + + ret = rbd_post_acquire_action(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "post-acquire action failed: %d", ret); + /* + * Can't stay in RBD_LOCK_STATE_LOCKED because + * rbd_lock_add_request() would let the request through, + * assuming that e.g. object map is locked and loaded. + */ + rbd_unlock(rbd_dev); + } + +out: + wake_lock_waiters(rbd_dev, ret); up_write(&rbd_dev->lock_rwsem); - return lock_state; + return ret; } static void rbd_acquire_lock(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, lock_dwork); - enum rbd_lock_state lock_state; - int ret = 0; + int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); again: - lock_state = rbd_try_acquire_lock(rbd_dev, &ret); - if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { - if (lock_state == RBD_LOCK_STATE_LOCKED) - wake_requests(rbd_dev, true); - dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, - rbd_dev, lock_state, ret); + ret = rbd_try_acquire_lock(rbd_dev); + if (ret <= 0) { + dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret); return; } @@ -3041,16 +4201,9 @@ goto again; /* treat this as a dead client */ } else if (ret == -EROFS) { rbd_warn(rbd_dev, "peer will not release lock"); - /* - * If this is rbd_add_acquire_lock(), we want to fail - * immediately -- reuse BLACKLISTED flag. Otherwise we - * want to block. - */ - if (!(rbd_dev->disk->flags & GENHD_FL_UP)) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - /* wake "rbd map --exclusive" process */ - wake_requests(rbd_dev, false); - } + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); } else if (ret < 0) { rbd_warn(rbd_dev, "error requesting lock: %d", ret); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, @@ -3060,50 +4213,72 @@ * lock owner acked, but resend if we don't see them * release the lock */ - dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, + dout("%s rbd_dev %p requeuing lock_dwork\n", __func__, rbd_dev); mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); } } -/* - * lock_rwsem must be held for write - */ -static bool rbd_release_lock(struct rbd_device *rbd_dev) +static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) { - dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); + dout("%s rbd_dev %p\n", __func__, rbd_dev); + lockdep_assert_held_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) return false; - rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; - downgrade_write(&rbd_dev->lock_rwsem); /* * Ensure that all in-flight IO is flushed. - * - * FIXME: ceph_osdc_sync() flushes the entire OSD client, which - * may be shared with other devices. */ - ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); - up_read(&rbd_dev->lock_rwsem); + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + rbd_assert(!completion_done(&rbd_dev->releasing_wait)); + if (list_empty(&rbd_dev->running_list)) + return true; + + up_write(&rbd_dev->lock_rwsem); + wait_for_completion(&rbd_dev->releasing_wait); down_write(&rbd_dev->lock_rwsem); - dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, - rbd_dev->lock_state); if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) return false; + rbd_assert(list_empty(&rbd_dev->running_list)); + return true; +} + +static void rbd_pre_release_action(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) + rbd_object_map_close(rbd_dev); +} + +static void __rbd_release_lock(struct rbd_device *rbd_dev) +{ + rbd_assert(list_empty(&rbd_dev->running_list)); + + rbd_pre_release_action(rbd_dev); rbd_unlock(rbd_dev); +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_release_lock(struct rbd_device *rbd_dev) +{ + if (!rbd_quiesce_lock(rbd_dev)) + return; + + __rbd_release_lock(rbd_dev); + /* * Give others a chance to grab the lock - we would re-acquire - * almost immediately if we got new IO during ceph_osdc_sync() - * otherwise. We need to ack our own notifications, so this - * lock_dwork will be requeued from rbd_wait_state_locked() - * after wake_requests() in rbd_handle_released_lock(). + * almost immediately if we got new IO while draining the running + * list otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_handle_released_lock() by + * way of maybe_kick_acquire(). */ cancel_delayed_work(&rbd_dev->lock_dwork); - return true; } static void rbd_release_lock_work(struct work_struct *work) @@ -3114,6 +4289,23 @@ down_write(&rbd_dev->lock_rwsem); rbd_release_lock(rbd_dev); up_write(&rbd_dev->lock_rwsem); +} + +static void maybe_kick_acquire(struct rbd_device *rbd_dev) +{ + bool have_requests; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + if (__rbd_is_lock_owner(rbd_dev)) + return; + + spin_lock(&rbd_dev->lock_lists_lock); + have_requests = !list_empty(&rbd_dev->acquiring_list); + spin_unlock(&rbd_dev->lock_lists_lock); + if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) { + dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + } } static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, @@ -3131,22 +4323,17 @@ if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { down_write(&rbd_dev->lock_rwsem); if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { - /* - * we already know that the remote client is - * the owner - */ - up_write(&rbd_dev->lock_rwsem); - return; + dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n", + __func__, rbd_dev, cid.gid, cid.handle); + } else { + rbd_set_owner_cid(rbd_dev, &cid); } - - rbd_set_owner_cid(rbd_dev, &cid); downgrade_write(&rbd_dev->lock_rwsem); } else { down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3165,21 +4352,18 @@ if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { down_write(&rbd_dev->lock_rwsem); if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { - dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", + dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n", __func__, rbd_dev, cid.gid, cid.handle, rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); - up_write(&rbd_dev->lock_rwsem); - return; + } else { + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); } - - rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); downgrade_write(&rbd_dev->lock_rwsem); } else { down_read(&rbd_dev->lock_rwsem); } - if (!__rbd_is_lock_owner(rbd_dev)) - wake_requests(rbd_dev, false); + maybe_kick_acquire(rbd_dev); up_read(&rbd_dev->lock_rwsem); } @@ -3433,7 +4617,6 @@ */ static void rbd_unregister_watch(struct rbd_device *rbd_dev) { - WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3455,7 +4638,8 @@ char cookie[32]; int ret; - WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + if (!rbd_quiesce_lock(rbd_dev)) + return; format_lock_cookie(rbd_dev, cookie); ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid, @@ -3471,11 +4655,11 @@ * Lock cookie cannot be updated on older OSDs, so do * a manual release and queue an acquire. */ - if (rbd_release_lock(rbd_dev)) - queue_delayed_work(rbd_dev->task_wq, - &rbd_dev->lock_dwork, 0); + __rbd_release_lock(rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); } else { __rbd_lock(rbd_dev, cookie); + wake_lock_waiters(rbd_dev, 0); } } @@ -3496,15 +4680,18 @@ ret = __rbd_register_watch(rbd_dev); if (ret) { rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - if (ret == -EBLACKLISTED || ret == -ENOENT) { - set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); - wake_requests(rbd_dev, true); - } else { + if (ret != -EBLOCKLISTED && ret != -ENOENT) { queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, RBD_RETRY_DELAY); + mutex_unlock(&rbd_dev->watch_mutex); + return; } + mutex_unlock(&rbd_dev->watch_mutex); + down_write(&rbd_dev->lock_rwsem); + wake_lock_waiters(rbd_dev, ret); + up_write(&rbd_dev->lock_rwsem); return; } @@ -3567,7 +4754,7 @@ ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, CEPH_OSD_FLAG_READ, req_page, outbound_size, - reply_page, &inbound_size); + &reply_page, &inbound_size); if (!ret) { memcpy(inbound, page_address(reply_page), inbound_size); ret = inbound_size; @@ -3579,71 +4766,74 @@ return ret; } -/* - * lock_rwsem must be held for read - */ -static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire) -{ - DEFINE_WAIT(wait); - unsigned long timeout; - int ret = 0; - - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) - return -EBLACKLISTED; - - if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) - return 0; - - if (!may_acquire) { - rbd_warn(rbd_dev, "exclusive lock required"); - return -EROFS; - } - - do { - /* - * Note the use of mod_delayed_work() in rbd_acquire_lock() - * and cancel_delayed_work() in wake_requests(). - */ - dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); - queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); - prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, - TASK_UNINTERRUPTIBLE); - up_read(&rbd_dev->lock_rwsem); - timeout = schedule_timeout(ceph_timeout_jiffies( - rbd_dev->opts->lock_timeout)); - down_read(&rbd_dev->lock_rwsem); - if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { - ret = -EBLACKLISTED; - break; - } - if (!timeout) { - rbd_warn(rbd_dev, "timed out waiting for lock"); - ret = -ETIMEDOUT; - break; - } - } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); - - finish_wait(&rbd_dev->lock_waitq, &wait); - return ret; -} - static void rbd_queue_workfn(struct work_struct *work) { - struct request *rq = blk_mq_rq_from_pdu(work); - struct rbd_device *rbd_dev = rq->q->queuedata; - struct rbd_img_request *img_request; - struct ceph_snap_context *snapc = NULL; + struct rbd_img_request *img_request = + container_of(work, struct rbd_img_request, work); + struct rbd_device *rbd_dev = img_request->rbd_dev; + enum obj_operation_type op_type = img_request->op_type; + struct request *rq = blk_mq_rq_from_pdu(img_request); u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; u64 length = blk_rq_bytes(rq); - enum obj_operation_type op_type; u64 mapping_size; - bool must_be_locked; int result; - switch (req_op(rq)) { + /* Ignore/skip any zero-length requests */ + if (!length) { + dout("%s: zero-length request\n", __func__); + result = 0; + goto err_img_request; + } + + blk_mq_start_request(rq); + + down_read(&rbd_dev->header_rwsem); + mapping_size = rbd_dev->mapping.size; + rbd_img_capture_header(img_request); + up_read(&rbd_dev->header_rwsem); + + if (offset + length > mapping_size) { + rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, + length, mapping_size); + result = -EIO; + goto err_img_request; + } + + dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev, + img_request, obj_op_name(op_type), offset, length); + + if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT) + result = rbd_img_fill_nodata(img_request, offset, length); + else + result = rbd_img_fill_from_bio(img_request, offset, length, + rq->bio); + if (result) + goto err_img_request; + + rbd_img_handle_request(img_request, 0); + return; + +err_img_request: + rbd_img_request_destroy(img_request); + if (result) + rbd_warn(rbd_dev, "%s %llx at %llx result %d", + obj_op_name(op_type), length, offset, result); + blk_mq_end_request(rq, errno_to_blk_status(result)); +} + +static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, + const struct blk_mq_queue_data *bd) +{ + struct rbd_device *rbd_dev = hctx->queue->queuedata; + struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq); + enum obj_operation_type op_type; + + switch (req_op(bd->rq)) { case REQ_OP_DISCARD: - case REQ_OP_WRITE_ZEROES: op_type = OBJ_OP_DISCARD; + break; + case REQ_OP_WRITE_ZEROES: + op_type = OBJ_OP_ZEROOUT; break; case REQ_OP_WRITE: op_type = OBJ_OP_WRITE; @@ -3652,112 +4842,23 @@ op_type = OBJ_OP_READ; break; default: - dout("%s: non-fs request type %d\n", __func__, req_op(rq)); - result = -EIO; - goto err; + rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq)); + return BLK_STS_IOERR; } - /* Ignore/skip any zero-length requests */ + rbd_img_request_init(img_req, rbd_dev, op_type); - if (!length) { - dout("%s: zero-length request\n", __func__); - result = 0; - goto err_rq; + if (rbd_img_is_write(img_req)) { + if (rbd_is_ro(rbd_dev)) { + rbd_warn(rbd_dev, "%s on read-only mapping", + obj_op_name(img_req->op_type)); + return BLK_STS_IOERR; + } + rbd_assert(!rbd_is_snap(rbd_dev)); } - rbd_assert(op_type == OBJ_OP_READ || - rbd_dev->spec->snap_id == CEPH_NOSNAP); - - /* - * Quit early if the mapped snapshot no longer exists. It's - * still possible the snapshot will have disappeared by the - * time our request arrives at the osd, but there's no sense in - * sending it if we already know. - */ - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { - dout("request for non-existent snapshot"); - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - result = -ENXIO; - goto err_rq; - } - - if (offset && length > U64_MAX - offset + 1) { - rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, - length); - result = -EINVAL; - goto err_rq; /* Shouldn't happen */ - } - - blk_mq_start_request(rq); - - down_read(&rbd_dev->header_rwsem); - mapping_size = rbd_dev->mapping.size; - if (op_type != OBJ_OP_READ) { - snapc = rbd_dev->header.snapc; - ceph_get_snap_context(snapc); - } - up_read(&rbd_dev->header_rwsem); - - if (offset + length > mapping_size) { - rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, - length, mapping_size); - result = -EIO; - goto err_rq; - } - - must_be_locked = - (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && - (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read); - if (must_be_locked) { - down_read(&rbd_dev->lock_rwsem); - result = rbd_wait_state_locked(rbd_dev, - !rbd_dev->opts->exclusive); - if (result) - goto err_unlock; - } - - img_request = rbd_img_request_create(rbd_dev, op_type, snapc); - if (!img_request) { - result = -ENOMEM; - goto err_unlock; - } - img_request->rq = rq; - snapc = NULL; /* img_request consumes a ref */ - - if (op_type == OBJ_OP_DISCARD) - result = rbd_img_fill_nodata(img_request, offset, length); - else - result = rbd_img_fill_from_bio(img_request, offset, length, - rq->bio); - if (result) - goto err_img_request; - - rbd_img_request_submit(img_request); - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); - return; - -err_img_request: - rbd_img_request_put(img_request); -err_unlock: - if (must_be_locked) - up_read(&rbd_dev->lock_rwsem); -err_rq: - if (result) - rbd_warn(rbd_dev, "%s %llx at %llx result %d", - obj_op_name(op_type), length, offset, result); - ceph_put_snap_context(snapc); -err: - blk_mq_end_request(rq, errno_to_blk_status(result)); -} - -static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx, - const struct blk_mq_queue_data *bd) -{ - struct request *rq = bd->rq; - struct work_struct *work = blk_mq_rq_to_pdu(rq); - - queue_work(rbd_wq, work); + INIT_WORK(&img_req->work, rbd_queue_workfn); + queue_work(rbd_wq, &img_req->work); return BLK_STS_OK; } @@ -3789,10 +4890,6 @@ ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); - if (ret) - goto out_req; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -3802,6 +4899,10 @@ osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, true); + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); @@ -3818,7 +4919,9 @@ * return, the rbd_dev->header field will contain up-to-date * information about the image. */ -static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) +static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev, + struct rbd_image_header *header, + bool first_time) { struct rbd_image_header_ondisk *ondisk = NULL; u32 snap_count = 0; @@ -3866,30 +4969,11 @@ snap_count = le32_to_cpu(ondisk->snap_count); } while (snap_count != want_count); - ret = rbd_header_from_disk(rbd_dev, ondisk); + ret = rbd_header_from_disk(header, ondisk, first_time); out: kfree(ondisk); return ret; -} - -/* - * Clear the rbd device's EXISTS flag if the snapshot it's mapped to - * has disappeared from the (just updated) snapshot context. - */ -static void rbd_exists_validate(struct rbd_device *rbd_dev) -{ - u64 snap_id; - - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) - return; - - snap_id = rbd_dev->spec->snap_id; - if (snap_id == CEPH_NOSNAP) - return; - - if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) - clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); } static void rbd_dev_update_size(struct rbd_device *rbd_dev) @@ -3906,59 +4990,12 @@ size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; dout("setting size to %llu sectors", (unsigned long long)size); set_capacity(rbd_dev->disk, size); - revalidate_disk(rbd_dev->disk); + revalidate_disk_size(rbd_dev->disk, true); } -} - -static int rbd_dev_refresh(struct rbd_device *rbd_dev) -{ - u64 mapping_size; - int ret; - - down_write(&rbd_dev->header_rwsem); - mapping_size = rbd_dev->mapping.size; - - ret = rbd_dev_header_info(rbd_dev); - if (ret) - goto out; - - /* - * If there is a parent, see if it has disappeared due to the - * mapped image getting flattened. - */ - if (rbd_dev->parent) { - ret = rbd_dev_v2_parent_info(rbd_dev); - if (ret) - goto out; - } - - if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { - rbd_dev->mapping.size = rbd_dev->header.image_size; - } else { - /* validate mapped snapshot's EXISTS flag */ - rbd_exists_validate(rbd_dev); - } - -out: - up_write(&rbd_dev->header_rwsem); - if (!ret && mapping_size != rbd_dev->mapping.size) - rbd_dev_update_size(rbd_dev); - - return ret; -} - -static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq, - unsigned int hctx_idx, unsigned int numa_node) -{ - struct work_struct *work = blk_mq_rq_to_pdu(rq); - - INIT_WORK(work, rbd_queue_workfn); - return 0; } static const struct blk_mq_ops rbd_mq_ops = { .queue_rq = rbd_queue_rq, - .init_request = rbd_init_request, }; static int rbd_init_disk(struct rbd_device *rbd_dev) @@ -3989,9 +5026,9 @@ rbd_dev->tag_set.ops = &rbd_mq_ops; rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth; rbd_dev->tag_set.numa_node = NUMA_NO_NODE; - rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE; - rbd_dev->tag_set.nr_hw_queues = 1; - rbd_dev->tag_set.cmd_size = sizeof(struct work_struct); + rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE; + rbd_dev->tag_set.nr_hw_queues = num_present_cpus(); + rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request); err = blk_mq_alloc_tag_set(&rbd_dev->tag_set); if (err) @@ -4010,18 +5047,18 @@ q->limits.max_sectors = queue_max_hw_sectors(q); blk_queue_max_segments(q, USHRT_MAX); blk_queue_max_segment_size(q, UINT_MAX); - blk_queue_io_min(q, objset_bytes); - blk_queue_io_opt(q, objset_bytes); + blk_queue_io_min(q, rbd_dev->opts->alloc_size); + blk_queue_io_opt(q, rbd_dev->opts->alloc_size); if (rbd_dev->opts->trim) { blk_queue_flag_set(QUEUE_FLAG_DISCARD, q); - q->limits.discard_granularity = objset_bytes; + q->limits.discard_granularity = rbd_dev->opts->alloc_size; blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT); blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT); } if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC)) - q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES; + blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q); /* * disk_release() expects a queue ref from add_disk() and will @@ -4059,17 +5096,12 @@ (unsigned long long)rbd_dev->mapping.size); } -/* - * Note this shows the features for whatever's mapped, which is not - * necessarily the base image. - */ static ssize_t rbd_features_show(struct device *dev, struct device_attribute *attr, char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "0x%016llx\n", - (unsigned long long)rbd_dev->mapping.features); + return sprintf(buf, "0x%016llx\n", rbd_dev->header.features); } static ssize_t rbd_major_show(struct device *dev, @@ -4381,8 +5413,7 @@ module_put(THIS_MODULE); } -static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, - struct rbd_spec *spec) +static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec) { struct rbd_device *rbd_dev; @@ -4414,15 +5445,18 @@ INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); - init_waitqueue_head(&rbd_dev->lock_waitq); + spin_lock_init(&rbd_dev->lock_lists_lock); + INIT_LIST_HEAD(&rbd_dev->acquiring_list); + INIT_LIST_HEAD(&rbd_dev->running_list); + init_completion(&rbd_dev->acquire_wait); + init_completion(&rbd_dev->releasing_wait); + + spin_lock_init(&rbd_dev->object_map_lock); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; device_initialize(&rbd_dev->dev); - - rbd_dev->rbd_client = rbdc; - rbd_dev->spec = spec; return rbd_dev; } @@ -4436,11 +5470,9 @@ { struct rbd_device *rbd_dev; - rbd_dev = __rbd_dev_create(rbdc, spec); + rbd_dev = __rbd_dev_create(spec); if (!rbd_dev) return NULL; - - rbd_dev->opts = opts; /* get an id and fill in device name */ rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, @@ -4457,6 +5489,10 @@ /* we have a ref from do_rbd_add() */ __module_get(THIS_MODULE); + + rbd_dev->rbd_client = rbdc; + rbd_dev->spec = spec; + rbd_dev->opts = opts; dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); return rbd_dev; @@ -4512,41 +5548,39 @@ return 0; } -static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) +static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev, + char **pobject_prefix) { - return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, - &rbd_dev->header.obj_order, - &rbd_dev->header.image_size); -} - -static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) -{ + size_t size; void *reply_buf; + char *object_prefix; int ret; void *p; - reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); + /* Response will be an encoded string, which includes a length */ + size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX; + reply_buf = kzalloc(size, GFP_KERNEL); if (!reply_buf) return -ENOMEM; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, &rbd_dev->header_oloc, "get_object_prefix", - NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); + NULL, 0, reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; p = reply_buf; - rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, - p + ret, NULL, GFP_NOIO); + object_prefix = ceph_extract_encoded_string(&p, p + ret, NULL, + GFP_NOIO); + if (IS_ERR(object_prefix)) { + ret = PTR_ERR(object_prefix); + goto out; + } ret = 0; - if (IS_ERR(rbd_dev->header.object_prefix)) { - ret = PTR_ERR(rbd_dev->header.object_prefix); - rbd_dev->header.object_prefix = NULL; - } else { - dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); - } + *pobject_prefix = object_prefix; + dout(" object_prefix = %s\n", object_prefix); out: kfree(reply_buf); @@ -4554,9 +5588,12 @@ } static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, - u64 *snap_features) + bool read_only, u64 *snap_features) { - __le64 snapid = cpu_to_le64(snap_id); + struct { + __le64 snap_id; + u8 read_only; + } features_in; struct { __le64 features; __le64 incompat; @@ -4564,9 +5601,12 @@ u64 unsup; int ret; + features_in.snap_id = cpu_to_le64(snap_id); + features_in.read_only = read_only; + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, &rbd_dev->header_oloc, "get_features", - &snapid, sizeof(snapid), + &features_in, sizeof(features_in), &features_buf, sizeof(features_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) @@ -4591,10 +5631,30 @@ return 0; } -static int rbd_dev_v2_features(struct rbd_device *rbd_dev) +/* + * These are generic image flags, but since they are used only for + * object map, store them in rbd_dev->object_map_flags. + * + * For the same reason, this function is called only on object map + * (re)load and not on header refresh. + */ +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev) { - return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, - &rbd_dev->header.features); + __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id); + __le64 flags; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_flags", + &snapid, sizeof(snapid), + &flags, sizeof(flags)); + if (ret < 0) + return ret; + if (ret < sizeof(flags)) + return -EBADMSG; + + rbd_dev->object_map_flags = le64_to_cpu(flags); + return 0; } struct parent_image_info { @@ -4606,6 +5666,14 @@ bool has_overlap; u64 overlap; }; + +static void rbd_parent_info_cleanup(struct parent_image_info *pii) +{ + kfree(pii->pool_ns); + kfree(pii->image_id); + + memset(pii, 0, sizeof(*pii)); +} /* * The caller is responsible for @pii. @@ -4654,7 +5722,7 @@ ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret == -EOPNOTSUPP ? 1 : ret; @@ -4666,7 +5734,7 @@ ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -4676,6 +5744,9 @@ if (pii->has_overlap) ceph_decode_64_safe(&p, end, pii->overlap, e_inval); + dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", + __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id, + pii->has_overlap, pii->overlap); return 0; e_inval: @@ -4697,7 +5768,7 @@ ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "get_parent", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), reply_page, &reply_len); + req_page, sizeof(u64), &reply_page, &reply_len); if (ret) return ret; @@ -4714,14 +5785,17 @@ pii->has_overlap = true; ceph_decode_64_safe(&p, end, pii->overlap, e_inval); + dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", + __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id, + pii->has_overlap, pii->overlap); return 0; e_inval: return -EINVAL; } -static int get_parent_info(struct rbd_device *rbd_dev, - struct parent_image_info *pii) +static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev, + struct parent_image_info *pii) { struct page *req_page, *reply_page; void *p; @@ -4749,7 +5823,7 @@ return ret; } -static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) +static int rbd_dev_setup_parent(struct rbd_device *rbd_dev) { struct rbd_spec *parent_spec; struct parent_image_info pii = { 0 }; @@ -4759,37 +5833,12 @@ if (!parent_spec) return -ENOMEM; - ret = get_parent_info(rbd_dev, &pii); + ret = rbd_dev_v2_parent_info(rbd_dev, &pii); if (ret) goto out_err; - dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n", - __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id, - pii.has_overlap, pii.overlap); - - if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) { - /* - * Either the parent never existed, or we have - * record of it but the image got flattened so it no - * longer has a parent. When the parent of a - * layered image disappears we immediately set the - * overlap to 0. The effect of this is that all new - * requests will be treated as if the image had no - * parent. - * - * If !pii.has_overlap, the parent image spec is not - * applicable. It's there to avoid duplication in each - * snapshot record. - */ - if (rbd_dev->parent_overlap) { - rbd_dev->parent_overlap = 0; - rbd_dev_parent_put(rbd_dev); - pr_info("%s: clone image has been flattened\n", - rbd_dev->disk->disk_name); - } - + if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) goto out; /* No parent? No problem. */ - } /* The ceph file layout needs to fit pool id in 32 bits */ @@ -4801,58 +5850,46 @@ } /* - * The parent won't change (except when the clone is - * flattened, already handled that). So we only need to - * record the parent spec we have not already done so. + * The parent won't change except when the clone is flattened, + * so we only need to record the parent image spec once. */ - if (!rbd_dev->parent_spec) { - parent_spec->pool_id = pii.pool_id; - if (pii.pool_ns && *pii.pool_ns) { - parent_spec->pool_ns = pii.pool_ns; - pii.pool_ns = NULL; - } - parent_spec->image_id = pii.image_id; - pii.image_id = NULL; - parent_spec->snap_id = pii.snap_id; - - rbd_dev->parent_spec = parent_spec; - parent_spec = NULL; /* rbd_dev now owns this */ + parent_spec->pool_id = pii.pool_id; + if (pii.pool_ns && *pii.pool_ns) { + parent_spec->pool_ns = pii.pool_ns; + pii.pool_ns = NULL; } + parent_spec->image_id = pii.image_id; + pii.image_id = NULL; + parent_spec->snap_id = pii.snap_id; + + rbd_assert(!rbd_dev->parent_spec); + rbd_dev->parent_spec = parent_spec; + parent_spec = NULL; /* rbd_dev now owns this */ /* - * We always update the parent overlap. If it's zero we issue - * a warning, as we will proceed as if there was no parent. + * Record the parent overlap. If it's zero, issue a warning as + * we will proceed as if there is no parent. */ - if (!pii.overlap) { - if (parent_spec) { - /* refresh, careful to warn just once */ - if (rbd_dev->parent_overlap) - rbd_warn(rbd_dev, - "clone now standalone (overlap became 0)"); - } else { - /* initial probe */ - rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); - } - } + if (!pii.overlap) + rbd_warn(rbd_dev, "clone is standalone (overlap 0)"); rbd_dev->parent_overlap = pii.overlap; out: ret = 0; out_err: - kfree(pii.pool_ns); - kfree(pii.image_id); + rbd_parent_info_cleanup(&pii); rbd_spec_put(parent_spec); return ret; } -static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) +static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev, + u64 *stripe_unit, u64 *stripe_count) { struct { __le64 stripe_unit; __le64 stripe_count; } __attribute__ ((packed)) striping_info_buf = { 0 }; size_t size = sizeof (striping_info_buf); - void *p; int ret; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, @@ -4864,27 +5901,33 @@ if (ret < size) return -ERANGE; - p = &striping_info_buf; - rbd_dev->header.stripe_unit = ceph_decode_64(&p); - rbd_dev->header.stripe_count = ceph_decode_64(&p); + *stripe_unit = le64_to_cpu(striping_info_buf.stripe_unit); + *stripe_count = le64_to_cpu(striping_info_buf.stripe_count); + dout(" stripe_unit = %llu stripe_count = %llu\n", *stripe_unit, + *stripe_count); + return 0; } -static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) +static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev, s64 *data_pool_id) { - __le64 data_pool_id; + __le64 data_pool_buf; int ret; ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, &rbd_dev->header_oloc, "get_data_pool", - NULL, 0, &data_pool_id, sizeof(data_pool_id)); + NULL, 0, &data_pool_buf, + sizeof(data_pool_buf)); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; - if (ret < sizeof(data_pool_id)) + if (ret < sizeof(data_pool_buf)) return -EBADMSG; - rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); - WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); + *data_pool_id = le64_to_cpu(data_pool_buf); + dout(" data_pool_id = %lld\n", *data_pool_id); + WARN_ON(*data_pool_id == CEPH_NOPOOL); + return 0; } @@ -5076,7 +6119,8 @@ return ret; } -static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) +static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, + struct ceph_snap_context **psnapc) { size_t size; int ret; @@ -5137,9 +6181,7 @@ for (i = 0; i < snap_count; i++) snapc->snaps[i] = ceph_decode_64(&p); - ceph_put_snap_context(rbd_dev->header.snapc); - rbd_dev->header.snapc = snapc; - + *psnapc = snapc; dout(" snap context seq = %llu, snap_count = %u\n", (unsigned long long)seq, (unsigned int)snap_count); out: @@ -5188,38 +6230,42 @@ return snap_name; } -static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) +static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev, + struct rbd_image_header *header, + bool first_time) { - bool first_time = rbd_dev->header.object_prefix == NULL; int ret; - ret = rbd_dev_v2_image_size(rbd_dev); + ret = _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, + first_time ? &header->obj_order : NULL, + &header->image_size); if (ret) return ret; if (first_time) { - ret = rbd_dev_v2_header_onetime(rbd_dev); + ret = rbd_dev_v2_header_onetime(rbd_dev, header); if (ret) return ret; } - ret = rbd_dev_v2_snap_context(rbd_dev); - if (ret && first_time) { - kfree(rbd_dev->header.object_prefix); - rbd_dev->header.object_prefix = NULL; - } + ret = rbd_dev_v2_snap_context(rbd_dev, &header->snapc); + if (ret) + return ret; - return ret; + return 0; } -static int rbd_dev_header_info(struct rbd_device *rbd_dev) +static int rbd_dev_header_info(struct rbd_device *rbd_dev, + struct rbd_image_header *header, + bool first_time) { rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + rbd_assert(!header->object_prefix && !header->snapc); if (rbd_dev->image_format == 1) - return rbd_dev_v1_header_info(rbd_dev); + return rbd_dev_v1_header_info(rbd_dev, header, first_time); - return rbd_dev_v2_header_info(rbd_dev); + return rbd_dev_v2_header_info(rbd_dev, header, first_time); } /* @@ -5275,6 +6321,141 @@ return dup; } +static int rbd_parse_param(struct fs_parameter *param, + struct rbd_parse_opts_ctx *pctx) +{ + struct rbd_options *opt = pctx->opts; + struct fs_parse_result result; + struct p_log log = {.prefix = "rbd"}; + int token, ret; + + ret = ceph_parse_param(param, pctx->copts, NULL); + if (ret != -ENOPARAM) + return ret; + + token = __fs_parse(&log, rbd_parameters, param, &result); + dout("%s fs_parse '%s' token %d\n", __func__, param->key, token); + if (token < 0) { + if (token == -ENOPARAM) + return inval_plog(&log, "Unknown parameter '%s'", + param->key); + return token; + } + + switch (token) { + case Opt_queue_depth: + if (result.uint_32 < 1) + goto out_of_range; + opt->queue_depth = result.uint_32; + break; + case Opt_alloc_size: + if (result.uint_32 < SECTOR_SIZE) + goto out_of_range; + if (!is_power_of_2(result.uint_32)) + return inval_plog(&log, "alloc_size must be a power of 2"); + opt->alloc_size = result.uint_32; + break; + case Opt_lock_timeout: + /* 0 is "wait forever" (i.e. infinite timeout) */ + if (result.uint_32 > INT_MAX / 1000) + goto out_of_range; + opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000); + break; + case Opt_pool_ns: + kfree(pctx->spec->pool_ns); + pctx->spec->pool_ns = param->string; + param->string = NULL; + break; + case Opt_compression_hint: + switch (result.uint_32) { + case Opt_compression_hint_none: + opt->alloc_hint_flags &= + ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE | + CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE); + break; + case Opt_compression_hint_compressible: + opt->alloc_hint_flags |= + CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; + opt->alloc_hint_flags &= + ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; + break; + case Opt_compression_hint_incompressible: + opt->alloc_hint_flags |= + CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE; + opt->alloc_hint_flags &= + ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE; + break; + default: + BUG(); + } + break; + case Opt_read_only: + opt->read_only = true; + break; + case Opt_read_write: + opt->read_only = false; + break; + case Opt_lock_on_read: + opt->lock_on_read = true; + break; + case Opt_exclusive: + opt->exclusive = true; + break; + case Opt_notrim: + opt->trim = false; + break; + default: + BUG(); + } + + return 0; + +out_of_range: + return inval_plog(&log, "%s out of range", param->key); +} + +/* + * This duplicates most of generic_parse_monolithic(), untying it from + * fs_context and skipping standard superblock and security options. + */ +static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx) +{ + char *key; + int ret = 0; + + dout("%s '%s'\n", __func__, options); + while ((key = strsep(&options, ",")) != NULL) { + if (*key) { + struct fs_parameter param = { + .key = key, + .type = fs_value_is_flag, + }; + char *value = strchr(key, '='); + size_t v_len = 0; + + if (value) { + if (value == key) + continue; + *value++ = 0; + v_len = strlen(value); + param.string = kmemdup_nul(value, v_len, + GFP_KERNEL); + if (!param.string) + return -ENOMEM; + param.type = fs_value_is_string; + } + param.size = v_len; + + ret = rbd_parse_param(¶m, pctx); + kfree(param.string); + if (ret) + break; + } + } + + return ret; +} + /* * Parse the options provided for an "rbd add" (i.e., rbd image * mapping) request. These arrive via a write to /sys/bus/rbd/add, @@ -5326,8 +6507,7 @@ const char *mon_addrs; char *snap_name; size_t mon_addrs_size; - struct parse_rbd_opts_ctx pctx = { 0 }; - struct ceph_options *copts; + struct rbd_parse_opts_ctx pctx = { 0 }; int ret; /* The first four tokens are required */ @@ -5338,7 +6518,7 @@ return -EINVAL; } mon_addrs = buf; - mon_addrs_size = len + 1; + mon_addrs_size = len; buf += len; ret = -EINVAL; @@ -5388,6 +6568,10 @@ *(snap_name + len) = '\0'; pctx.spec->snap_name = snap_name; + pctx.copts = ceph_alloc_options(); + if (!pctx.copts) + goto out_mem; + /* Initialize all rbd options to the defaults */ pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL); @@ -5396,32 +6580,33 @@ pctx.opts->read_only = RBD_READ_ONLY_DEFAULT; pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; + pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT; pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT; pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT; pctx.opts->trim = RBD_TRIM_DEFAULT; - copts = ceph_parse_options(options, mon_addrs, - mon_addrs + mon_addrs_size - 1, - parse_rbd_opts_token, &pctx); - if (IS_ERR(copts)) { - ret = PTR_ERR(copts); + ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL); + if (ret) goto out_err; - } - kfree(options); - *ceph_opts = copts; + ret = rbd_parse_options(options, &pctx); + if (ret) + goto out_err; + + *ceph_opts = pctx.copts; *opts = pctx.opts; *rbd_spec = pctx.spec; - + kfree(options); return 0; + out_mem: ret = -ENOMEM; out_err: kfree(pctx.opts); + ceph_destroy_options(pctx.copts); rbd_spec_put(pctx.spec); kfree(options); - return ret; } @@ -5429,28 +6614,51 @@ { down_write(&rbd_dev->lock_rwsem); if (__rbd_is_lock_owner(rbd_dev)) - rbd_unlock(rbd_dev); + __rbd_release_lock(rbd_dev); up_write(&rbd_dev->lock_rwsem); } +/* + * If the wait is interrupted, an error is returned even if the lock + * was successfully acquired. rbd_dev_image_unlock() will release it + * if needed. + */ static int rbd_add_acquire_lock(struct rbd_device *rbd_dev) { - int ret; + long ret; if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) { + if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read) + return 0; + rbd_warn(rbd_dev, "exclusive-lock feature is not enabled"); return -EINVAL; } - /* FIXME: "rbd map --exclusive" should be in interruptible */ - down_read(&rbd_dev->lock_rwsem); - ret = rbd_wait_state_locked(rbd_dev, true); - up_read(&rbd_dev->lock_rwsem); - if (ret) { - rbd_warn(rbd_dev, "failed to acquire exclusive lock"); - return -EROFS; - } + if (rbd_is_ro(rbd_dev)) + return 0; + rbd_assert(!rbd_is_lock_owner(rbd_dev)); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait, + ceph_timeout_jiffies(rbd_dev->opts->lock_timeout)); + if (ret > 0) { + ret = rbd_dev->acquire_err; + } else { + cancel_delayed_work_sync(&rbd_dev->lock_dwork); + if (!ret) + ret = -ETIMEDOUT; + + rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret); + } + if (ret) + return ret; + + /* + * The lock may have been released by now, unless automatic lock + * transitions are disabled. + */ + rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev)); return 0; } @@ -5500,7 +6708,6 @@ dout("rbd id object name is %s\n", oid.name); /* Response will be an encoded string, which includes a length */ - size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; response = kzalloc(size, GFP_NOIO); if (!response) { @@ -5512,7 +6719,7 @@ ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, "get_id", NULL, 0, - response, RBD_IMAGE_ID_LEN_MAX); + response, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret == -ENOENT) { image_id = kstrdup("", GFP_KERNEL); @@ -5545,58 +6752,49 @@ */ static void rbd_dev_unprobe(struct rbd_device *rbd_dev) { - struct rbd_image_header *header; - rbd_dev_parent_put(rbd_dev); + rbd_object_map_free(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); /* Free dynamic fields from the header, then zero it out */ - header = &rbd_dev->header; - ceph_put_snap_context(header->snapc); - kfree(header->snap_sizes); - kfree(header->snap_names); - kfree(header->object_prefix); - memset(header, 0, sizeof (*header)); + rbd_image_header_cleanup(&rbd_dev->header); } -static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) +static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev, + struct rbd_image_header *header) { int ret; - ret = rbd_dev_v2_object_prefix(rbd_dev); + ret = rbd_dev_v2_object_prefix(rbd_dev, &header->object_prefix); if (ret) - goto out_err; + return ret; /* * Get the and check features for the image. Currently the * features are assumed to never change. */ - ret = rbd_dev_v2_features(rbd_dev); + ret = _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, + rbd_is_ro(rbd_dev), &header->features); if (ret) - goto out_err; + return ret; /* If the image supports fancy striping, get its parameters */ - if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { - ret = rbd_dev_v2_striping_info(rbd_dev); - if (ret < 0) - goto out_err; - } - - if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { - ret = rbd_dev_v2_data_pool(rbd_dev); + if (header->features & RBD_FEATURE_STRIPINGV2) { + ret = rbd_dev_v2_striping_info(rbd_dev, &header->stripe_unit, + &header->stripe_count); if (ret) - goto out_err; + return ret; } - rbd_init_layout(rbd_dev); - return 0; + if (header->features & RBD_FEATURE_DATA_POOL) { + ret = rbd_dev_v2_data_pool(rbd_dev, &header->data_pool_id); + if (ret) + return ret; + } -out_err: - rbd_dev->header.features = 0; - kfree(rbd_dev->header.object_prefix); - rbd_dev->header.object_prefix = NULL; - return ret; + return 0; } /* @@ -5618,7 +6816,7 @@ goto out_err; } - parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); + parent = __rbd_dev_create(rbd_dev->parent_spec); if (!parent) { ret = -ENOMEM; goto out_err; @@ -5628,8 +6826,10 @@ * Images related by parent/child relationships always share * rbd_client and spec/parent_spec, so bump their refcounts. */ - __rbd_get_client(rbd_dev->rbd_client); - rbd_spec_get(rbd_dev->parent_spec); + parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client); + parent->spec = rbd_spec_get(rbd_dev->parent_spec); + + __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags); ret = rbd_dev_image_probe(parent, depth); if (ret < 0) @@ -5648,7 +6848,6 @@ static void rbd_dev_device_release(struct rbd_device *rbd_dev) { clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); - rbd_dev_mapping_clear(rbd_dev); rbd_free_disk(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); @@ -5682,23 +6881,17 @@ if (ret) goto err_out_blkdev; - ret = rbd_dev_mapping_set(rbd_dev); - if (ret) - goto err_out_disk; - set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); - set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only); + set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev)); ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id); if (ret) - goto err_out_mapping; + goto err_out_disk; set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); return 0; -err_out_mapping: - rbd_dev_mapping_clear(rbd_dev); err_out_disk: rbd_free_disk(rbd_dev); err_out_blkdev: @@ -5727,9 +6920,27 @@ return ret; } +static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap) +{ + if (!is_snap) { + pr_info("image %s/%s%s%s does not exist\n", + rbd_dev->spec->pool_name, + rbd_dev->spec->pool_ns ?: "", + rbd_dev->spec->pool_ns ? "/" : "", + rbd_dev->spec->image_name); + } else { + pr_info("snap %s/%s%s%s@%s does not exist\n", + rbd_dev->spec->pool_name, + rbd_dev->spec->pool_ns ?: "", + rbd_dev->spec->pool_ns ? "/" : "", + rbd_dev->spec->image_name, + rbd_dev->spec->snap_name); + } +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { - if (rbd_dev->opts) + if (!rbd_is_ro(rbd_dev)) rbd_unregister_watch(rbd_dev); rbd_dev_unprobe(rbd_dev); @@ -5749,6 +6960,7 @@ */ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) { + bool need_watch = !rbd_is_ro(rbd_dev); int ret; /* @@ -5765,15 +6977,11 @@ if (ret) goto err_out_format; - if (!depth) { + if (need_watch) { ret = rbd_register_watch(rbd_dev); if (ret) { if (ret == -ENOENT) - pr_info("image %s/%s%s%s does not exist\n", - rbd_dev->spec->pool_name, - rbd_dev->spec->pool_ns ?: "", - rbd_dev->spec->pool_ns ? "/" : "", - rbd_dev->spec->image_name); + rbd_print_dne(rbd_dev, false); goto err_out_format; } } @@ -5781,9 +6989,14 @@ if (!depth) down_write(&rbd_dev->header_rwsem); - ret = rbd_dev_header_info(rbd_dev); - if (ret) + ret = rbd_dev_header_info(rbd_dev, &rbd_dev->header, true); + if (ret) { + if (ret == -ENOENT && !need_watch) + rbd_print_dne(rbd_dev, false); goto err_out_probe; + } + + rbd_init_layout(rbd_dev); /* * If this image is the one being mapped, we have pool name and @@ -5797,27 +7010,25 @@ ret = rbd_spec_fill_names(rbd_dev); if (ret) { if (ret == -ENOENT) - pr_info("snap %s/%s%s%s@%s does not exist\n", - rbd_dev->spec->pool_name, - rbd_dev->spec->pool_ns ?: "", - rbd_dev->spec->pool_ns ? "/" : "", - rbd_dev->spec->image_name, - rbd_dev->spec->snap_name); + rbd_print_dne(rbd_dev, true); goto err_out_probe; } - if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { - ret = rbd_dev_v2_parent_info(rbd_dev); + ret = rbd_dev_mapping_set(rbd_dev); + if (ret) + goto err_out_probe; + + if (rbd_is_snap(rbd_dev) && + (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) { + ret = rbd_object_map_load(rbd_dev); if (ret) goto err_out_probe; + } - /* - * Need to warn users if this image is the one being - * mapped and has a parent. - */ - if (!depth && rbd_dev->parent_spec) - rbd_warn(rbd_dev, - "WARNING: kernel layering is EXPERIMENTAL!"); + if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { + ret = rbd_dev_setup_parent(rbd_dev); + if (ret) + goto err_out_probe; } ret = rbd_dev_probe_parent(rbd_dev, depth); @@ -5831,13 +7042,114 @@ err_out_probe: if (!depth) up_write(&rbd_dev->header_rwsem); - if (!depth) + if (need_watch) rbd_unregister_watch(rbd_dev); rbd_dev_unprobe(rbd_dev); err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); rbd_dev->spec->image_id = NULL; + return ret; +} + +static void rbd_dev_update_header(struct rbd_device *rbd_dev, + struct rbd_image_header *header) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + rbd_assert(rbd_dev->header.object_prefix); /* !first_time */ + + if (rbd_dev->header.image_size != header->image_size) { + rbd_dev->header.image_size = header->image_size; + + if (!rbd_is_snap(rbd_dev)) { + rbd_dev->mapping.size = header->image_size; + rbd_dev_update_size(rbd_dev); + } + } + + ceph_put_snap_context(rbd_dev->header.snapc); + rbd_dev->header.snapc = header->snapc; + header->snapc = NULL; + + if (rbd_dev->image_format == 1) { + kfree(rbd_dev->header.snap_names); + rbd_dev->header.snap_names = header->snap_names; + header->snap_names = NULL; + + kfree(rbd_dev->header.snap_sizes); + rbd_dev->header.snap_sizes = header->snap_sizes; + header->snap_sizes = NULL; + } +} + +static void rbd_dev_update_parent(struct rbd_device *rbd_dev, + struct parent_image_info *pii) +{ + if (pii->pool_id == CEPH_NOPOOL || !pii->has_overlap) { + /* + * Either the parent never existed, or we have + * record of it but the image got flattened so it no + * longer has a parent. When the parent of a + * layered image disappears we immediately set the + * overlap to 0. The effect of this is that all new + * requests will be treated as if the image had no + * parent. + * + * If !pii.has_overlap, the parent image spec is not + * applicable. It's there to avoid duplication in each + * snapshot record. + */ + if (rbd_dev->parent_overlap) { + rbd_dev->parent_overlap = 0; + rbd_dev_parent_put(rbd_dev); + pr_info("%s: clone has been flattened\n", + rbd_dev->disk->disk_name); + } + } else { + rbd_assert(rbd_dev->parent_spec); + + /* + * Update the parent overlap. If it became zero, issue + * a warning as we will proceed as if there is no parent. + */ + if (!pii->overlap && rbd_dev->parent_overlap) + rbd_warn(rbd_dev, + "clone has become standalone (overlap 0)"); + rbd_dev->parent_overlap = pii->overlap; + } +} + +static int rbd_dev_refresh(struct rbd_device *rbd_dev) +{ + struct rbd_image_header header = { 0 }; + struct parent_image_info pii = { 0 }; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = rbd_dev_header_info(rbd_dev, &header, false); + if (ret) + goto out; + + /* + * If there is a parent, see if it has disappeared due to the + * mapped image getting flattened. + */ + if (rbd_dev->parent) { + ret = rbd_dev_v2_parent_info(rbd_dev, &pii); + if (ret) + goto out; + } + + down_write(&rbd_dev->header_rwsem); + rbd_dev_update_header(rbd_dev, &header); + if (rbd_dev->parent) + rbd_dev_update_parent(rbd_dev, &pii); + up_write(&rbd_dev->header_rwsem); + +out: + rbd_parent_info_cleanup(&pii); + rbd_image_header_cleanup(&header); return ret; } @@ -5887,6 +7199,11 @@ spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ + /* if we are mapping a snapshot it will be a read-only mapping */ + if (rbd_dev->opts->read_only || + strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) + __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags); + rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); if (!rbd_dev->config_info) { rc = -ENOMEM; @@ -5897,19 +7214,19 @@ if (rc < 0) goto err_out_rbd_dev; - /* If we are mapping a snapshot it must be marked read-only */ - if (rbd_dev->spec->snap_id != CEPH_NOSNAP) - rbd_dev->opts->read_only = true; + if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) { + rbd_warn(rbd_dev, "alloc_size adjusted to %u", + rbd_dev->layout.object_size); + rbd_dev->opts->alloc_size = rbd_dev->layout.object_size; + } rc = rbd_dev_device_setup(rbd_dev); if (rc) goto err_out_image_probe; - if (rbd_dev->opts->exclusive) { - rc = rbd_add_acquire_lock(rbd_dev); - if (rc) - goto err_out_device_setup; - } + rc = rbd_add_acquire_lock(rbd_dev); + if (rc) + goto err_out_image_lock; /* Everything's ready. Announce the disk to the world. */ @@ -5917,7 +7234,7 @@ if (rc) goto err_out_image_lock; - add_disk(rbd_dev->disk); + device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL); /* see rbd_init_disk() */ blk_put_queue(rbd_dev->disk->queue); @@ -5935,7 +7252,6 @@ err_out_image_lock: rbd_dev_image_unlock(rbd_dev); -err_out_device_setup: rbd_dev_device_release(rbd_dev); err_out_image_probe: rbd_dev_image_release(rbd_dev); @@ -5949,9 +7265,7 @@ goto out; } -static ssize_t rbd_add(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count) { if (single_major) return -EINVAL; @@ -5959,9 +7273,8 @@ return do_rbd_add(bus, buf, count); } -static ssize_t rbd_add_single_major(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t add_single_major_store(struct bus_type *bus, const char *buf, + size_t count) { return do_rbd_add(bus, buf, count); } @@ -6067,9 +7380,7 @@ return count; } -static ssize_t rbd_remove(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count) { if (single_major) return -EINVAL; @@ -6077,9 +7388,8 @@ return do_rbd_remove(bus, buf, count); } -static ssize_t rbd_remove_single_major(struct bus_type *bus, - const char *buf, - size_t count) +static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf, + size_t count) { return do_rbd_remove(bus, buf, count); } @@ -6088,7 +7398,7 @@ * create control files in sysfs * /sys/bus/rbd/... */ -static int rbd_sysfs_init(void) +static int __init rbd_sysfs_init(void) { int ret; @@ -6103,13 +7413,13 @@ return ret; } -static void rbd_sysfs_cleanup(void) +static void __exit rbd_sysfs_cleanup(void) { bus_unregister(&rbd_bus_type); device_unregister(&rbd_root_dev); } -static int rbd_slab_init(void) +static int __init rbd_slab_init(void) { rbd_assert(!rbd_img_request_cache); rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); -- Gitblit v1.6.2