From 9370bb92b2d16684ee45cf24e879c93c509162da Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Thu, 19 Dec 2024 01:47:39 +0000
Subject: [PATCH] add wifi6 8852be driver

---
 kernel/drivers/block/rbd.c | 3744 ++++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 2,527 insertions(+), 1,217 deletions(-)

diff --git a/kernel/drivers/block/rbd.c b/kernel/drivers/block/rbd.c
index 9f1265c..b0f7930 100644
--- a/kernel/drivers/block/rbd.c
+++ b/kernel/drivers/block/rbd.c
@@ -34,7 +34,7 @@
 #include <linux/ceph/cls_lock_client.h>
 #include <linux/ceph/striper.h>
 #include <linux/ceph/decode.h>
-#include <linux/parser.h>
+#include <linux/fs_parser.h>
 #include <linux/bsearch.h>
 
 #include <linux/kernel.h>
@@ -115,12 +115,18 @@
 #define RBD_FEATURE_LAYERING		(1ULL<<0)
 #define RBD_FEATURE_STRIPINGV2		(1ULL<<1)
 #define RBD_FEATURE_EXCLUSIVE_LOCK	(1ULL<<2)
+#define RBD_FEATURE_OBJECT_MAP		(1ULL<<3)
+#define RBD_FEATURE_FAST_DIFF		(1ULL<<4)
+#define RBD_FEATURE_DEEP_FLATTEN	(1ULL<<5)
 #define RBD_FEATURE_DATA_POOL		(1ULL<<7)
 #define RBD_FEATURE_OPERATIONS		(1ULL<<8)
 
 #define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
 				 RBD_FEATURE_STRIPINGV2 |	\
 				 RBD_FEATURE_EXCLUSIVE_LOCK |	\
+				 RBD_FEATURE_OBJECT_MAP |	\
+				 RBD_FEATURE_FAST_DIFF |	\
+				 RBD_FEATURE_DEEP_FLATTEN |	\
 				 RBD_FEATURE_DATA_POOL |	\
 				 RBD_FEATURE_OPERATIONS)
 
@@ -201,6 +207,11 @@
 	struct list_head	node;
 };
 
+struct pending_result {
+	int			result;		/* first nonzero result */
+	int			num_pending;
+};
+
 struct rbd_img_request;
 
 enum obj_request_type {
@@ -214,34 +225,69 @@
 	OBJ_OP_READ = 1,
 	OBJ_OP_WRITE,
 	OBJ_OP_DISCARD,
+	OBJ_OP_ZEROOUT,
+};
+
+#define RBD_OBJ_FLAG_DELETION			(1U << 0)
+#define RBD_OBJ_FLAG_COPYUP_ENABLED		(1U << 1)
+#define RBD_OBJ_FLAG_COPYUP_ZEROS		(1U << 2)
+#define RBD_OBJ_FLAG_MAY_EXIST			(1U << 3)
+#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT	(1U << 4)
+
+enum rbd_obj_read_state {
+	RBD_OBJ_READ_START = 1,
+	RBD_OBJ_READ_OBJECT,
+	RBD_OBJ_READ_PARENT,
 };
 
 /*
  * Writes go through the following state machine to deal with
  * layering:
  *
- *                       need copyup
- * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
- *        |     ^                              |
- *        v     \------------------------------/
- *      done
- *        ^
- *        |
- * RBD_OBJ_WRITE_FLAT
+ *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
+ *            .                 |                                    .
+ *            .                 v                                    .
+ *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
+ *            .                 |                    .               .
+ *            .                 v                    v (deep-copyup  .
+ *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
+ * flattened) v                 |                    .               .
+ *            .                 v                    .               .
+ *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
+ *                              |                        not needed) v
+ *                              v                                    .
+ *                            done . . . . . . . . . . . . . . . . . .
+ *                              ^
+ *                              |
+ *                     RBD_OBJ_WRITE_FLAT
  *
  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
- * there is a parent or not.
+ * assert_exists guard is needed or not (in some cases it's not needed
+ * even if there is a parent).
  */
 enum rbd_obj_write_state {
-	RBD_OBJ_WRITE_FLAT = 1,
-	RBD_OBJ_WRITE_GUARD,
+	RBD_OBJ_WRITE_START = 1,
+	RBD_OBJ_WRITE_PRE_OBJECT_MAP,
+	RBD_OBJ_WRITE_OBJECT,
+	__RBD_OBJ_WRITE_COPYUP,
 	RBD_OBJ_WRITE_COPYUP,
+	RBD_OBJ_WRITE_POST_OBJECT_MAP,
+};
+
+enum rbd_obj_copyup_state {
+	RBD_OBJ_COPYUP_START = 1,
+	RBD_OBJ_COPYUP_READ_PARENT,
+	__RBD_OBJ_COPYUP_OBJECT_MAPS,
+	RBD_OBJ_COPYUP_OBJECT_MAPS,
+	__RBD_OBJ_COPYUP_WRITE_OBJECT,
+	RBD_OBJ_COPYUP_WRITE_OBJECT,
 };
 
 struct rbd_obj_request {
 	struct ceph_object_extent ex;
+	unsigned int		flags;	/* RBD_OBJ_FLAG_* */
 	union {
-		bool			tried_parent;	/* for reads */
+		enum rbd_obj_read_state	 read_state;	/* for reads */
 		enum rbd_obj_write_state write_state;	/* for writes */
 	};
 
@@ -257,14 +303,15 @@
 			u32			bvec_idx;
 		};
 	};
+
+	enum rbd_obj_copyup_state copyup_state;
 	struct bio_vec		*copyup_bvecs;
 	u32			copyup_bvec_count;
 
-	struct ceph_osd_request	*osd_req;
+	struct list_head	osd_reqs;	/* w/ r_private_item */
 
-	u64			xferred;	/* bytes transferred */
-	int			result;
-
+	struct mutex		state_mutex;
+	struct pending_result	pending;
 	struct kref		kref;
 };
 
@@ -273,28 +320,32 @@
 	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
 };
 
+enum rbd_img_state {
+	RBD_IMG_START = 1,
+	RBD_IMG_EXCLUSIVE_LOCK,
+	__RBD_IMG_OBJECT_REQUESTS,
+	RBD_IMG_OBJECT_REQUESTS,
+};
+
 struct rbd_img_request {
 	struct rbd_device	*rbd_dev;
 	enum obj_operation_type	op_type;
 	enum obj_request_type	data_type;
 	unsigned long		flags;
+	enum rbd_img_state	state;
 	union {
 		u64			snap_id;	/* for reads */
 		struct ceph_snap_context *snapc;	/* for writes */
 	};
-	union {
-		struct request		*rq;		/* block request */
-		struct rbd_obj_request	*obj_request;	/* obj req initiator */
-	};
-	spinlock_t		completion_lock;
-	u64			xferred;/* aggregate bytes transferred */
-	int			result;	/* first nonzero obj_request result */
+	struct rbd_obj_request	*obj_request;	/* obj req initiator */
 
+	struct list_head	lock_item;
 	struct list_head	object_extents;	/* obj_req.ex structs */
-	u32			obj_request_count;
-	u32			pending_count;
 
-	struct kref		kref;
+	struct mutex		state_mutex;
+	struct pending_result	pending;
+	struct work_struct	work;
+	int			work_result;
 };
 
 #define for_each_obj_request(ireq, oreq) \
@@ -322,7 +373,6 @@
 
 struct rbd_mapping {
 	u64                     size;
-	u64                     features;
 };
 
 /*
@@ -367,7 +417,17 @@
 	struct work_struct	released_lock_work;
 	struct delayed_work	lock_dwork;
 	struct work_struct	unlock_work;
-	wait_queue_head_t	lock_waitq;
+	spinlock_t		lock_lists_lock;
+	struct list_head	acquiring_list;
+	struct list_head	running_list;
+	struct completion	acquire_wait;
+	int			acquire_err;
+	struct completion	releasing_wait;
+
+	spinlock_t		object_map_lock;
+	u8			*object_map;
+	u64			object_map_size;	/* in objects */
+	u64			object_map_flags;
 
 	struct workqueue_struct	*task_wq;
 
@@ -395,12 +455,11 @@
  * Flag bits for rbd_dev->flags:
  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
  *   by rbd_dev->lock
- * - BLACKLISTED is protected by rbd_dev->lock_rwsem
  */
 enum rbd_dev_flags {
-	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
+	RBD_DEV_FLAG_EXISTS,	/* rbd_dev_device_setup() ran */
 	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
-	RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
+	RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
 };
 
 static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
@@ -421,6 +480,10 @@
 
 static struct workqueue_struct *rbd_wq;
 
+static struct ceph_snap_context rbd_empty_snapc = {
+	.nref = REFCOUNT_INIT(1),
+};
+
 /*
  * single-major requires >= 0.75 version of userspace rbd utility.
  */
@@ -428,14 +491,13 @@
 module_param(single_major, bool, 0444);
 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 
-static ssize_t rbd_add(struct bus_type *bus, const char *buf,
-		       size_t count);
-static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
-			  size_t count);
-static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
-				    size_t count);
-static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
-				       size_t count);
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
+static ssize_t remove_store(struct bus_type *bus, const char *buf,
+			    size_t count);
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
+				      size_t count);
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
+					 size_t count);
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 
 static int rbd_dev_id_to_minor(int dev_id)
@@ -448,8 +510,20 @@
 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 }
 
+static bool rbd_is_ro(struct rbd_device *rbd_dev)
+{
+	return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
+}
+
+static bool rbd_is_snap(struct rbd_device *rbd_dev)
+{
+	return rbd_dev->spec->snap_id != CEPH_NOSNAP;
+}
+
 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 {
+	lockdep_assert_held(&rbd_dev->lock_rwsem);
+
 	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
 }
@@ -464,16 +538,16 @@
 	return is_lock_owner;
 }
 
-static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
+static ssize_t supported_features_show(struct bus_type *bus, char *buf)
 {
 	return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
 }
 
-static BUS_ATTR(add, 0200, NULL, rbd_add);
-static BUS_ATTR(remove, 0200, NULL, rbd_remove);
-static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
-static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
-static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
+static BUS_ATTR_WO(add);
+static BUS_ATTR_WO(remove);
+static BUS_ATTR_WO(add_single_major);
+static BUS_ATTR_WO(remove_single_major);
+static BUS_ATTR_RO(supported_features);
 
 static struct attribute *rbd_bus_attrs[] = {
 	&bus_attr_add.attr,
@@ -558,15 +632,32 @@
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
-static int rbd_dev_header_info(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev,
+				     struct rbd_image_header *header);
 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 					u64 snap_id);
 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 				u8 *order, u64 *snap_size);
-static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
-		u64 *snap_features);
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
+
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
+
+/*
+ * Return true if nothing else is pending.
+ */
+static bool pending_result_dec(struct pending_result *pending, int *result)
+{
+	rbd_assert(pending->num_pending > 0);
+
+	if (*result && !pending->result)
+		pending->result = *result;
+	if (--pending->num_pending)
+		return false;
+
+	*result = pending->result;
+	return true;
+}
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -607,9 +698,16 @@
 	if (get_user(ro, (int __user *)arg))
 		return -EFAULT;
 
-	/* Snapshots can't be marked read-write */
-	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
-		return -EROFS;
+	/*
+	 * Both images mapped read-only and snapshots can't be marked
+	 * read-write.
+	 */
+	if (!ro) {
+		if (rbd_is_ro(rbd_dev))
+			return -EROFS;
+
+		rbd_assert(!rbd_is_snap(rbd_dev));
+	}
 
 	/* Let blkdev_roset() handle it */
 	return -ENOTTY;
@@ -733,121 +831,74 @@
  */
 enum {
 	Opt_queue_depth,
+	Opt_alloc_size,
 	Opt_lock_timeout,
-	Opt_last_int,
 	/* int args above */
 	Opt_pool_ns,
-	Opt_last_string,
+	Opt_compression_hint,
 	/* string args above */
 	Opt_read_only,
 	Opt_read_write,
 	Opt_lock_on_read,
 	Opt_exclusive,
 	Opt_notrim,
-	Opt_err
 };
 
-static match_table_t rbd_opts_tokens = {
-	{Opt_queue_depth, "queue_depth=%d"},
-	{Opt_lock_timeout, "lock_timeout=%d"},
-	/* int args above */
-	{Opt_pool_ns, "_pool_ns=%s"},
-	/* string args above */
-	{Opt_read_only, "read_only"},
-	{Opt_read_only, "ro"},		/* Alternate spelling */
-	{Opt_read_write, "read_write"},
-	{Opt_read_write, "rw"},		/* Alternate spelling */
-	{Opt_lock_on_read, "lock_on_read"},
-	{Opt_exclusive, "exclusive"},
-	{Opt_notrim, "notrim"},
-	{Opt_err, NULL}
+enum {
+	Opt_compression_hint_none,
+	Opt_compression_hint_compressible,
+	Opt_compression_hint_incompressible,
+};
+
+static const struct constant_table rbd_param_compression_hint[] = {
+	{"none",		Opt_compression_hint_none},
+	{"compressible",	Opt_compression_hint_compressible},
+	{"incompressible",	Opt_compression_hint_incompressible},
+	{}
+};
+
+static const struct fs_parameter_spec rbd_parameters[] = {
+	fsparam_u32	("alloc_size",			Opt_alloc_size),
+	fsparam_enum	("compression_hint",		Opt_compression_hint,
+			 rbd_param_compression_hint),
+	fsparam_flag	("exclusive",			Opt_exclusive),
+	fsparam_flag	("lock_on_read",		Opt_lock_on_read),
+	fsparam_u32	("lock_timeout",		Opt_lock_timeout),
+	fsparam_flag	("notrim",			Opt_notrim),
+	fsparam_string	("_pool_ns",			Opt_pool_ns),
+	fsparam_u32	("queue_depth",			Opt_queue_depth),
+	fsparam_flag	("read_only",			Opt_read_only),
+	fsparam_flag	("read_write",			Opt_read_write),
+	fsparam_flag	("ro",				Opt_read_only),
+	fsparam_flag	("rw",				Opt_read_write),
+	{}
 };
 
 struct rbd_options {
 	int	queue_depth;
+	int	alloc_size;
 	unsigned long	lock_timeout;
 	bool	read_only;
 	bool	lock_on_read;
 	bool	exclusive;
 	bool	trim;
+
+	u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
 };
 
 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
+#define RBD_ALLOC_SIZE_DEFAULT	(64 * 1024)
 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
 #define RBD_READ_ONLY_DEFAULT	false
 #define RBD_LOCK_ON_READ_DEFAULT false
 #define RBD_EXCLUSIVE_DEFAULT	false
 #define RBD_TRIM_DEFAULT	true
 
-struct parse_rbd_opts_ctx {
+struct rbd_parse_opts_ctx {
 	struct rbd_spec		*spec;
+	struct ceph_options	*copts;
 	struct rbd_options	*opts;
 };
-
-static int parse_rbd_opts_token(char *c, void *private)
-{
-	struct parse_rbd_opts_ctx *pctx = private;
-	substring_t argstr[MAX_OPT_ARGS];
-	int token, intval, ret;
-
-	token = match_token(c, rbd_opts_tokens, argstr);
-	if (token < Opt_last_int) {
-		ret = match_int(&argstr[0], &intval);
-		if (ret < 0) {
-			pr_err("bad option arg (not int) at '%s'\n", c);
-			return ret;
-		}
-		dout("got int token %d val %d\n", token, intval);
-	} else if (token > Opt_last_int && token < Opt_last_string) {
-		dout("got string token %d val %s\n", token, argstr[0].from);
-	} else {
-		dout("got token %d\n", token);
-	}
-
-	switch (token) {
-	case Opt_queue_depth:
-		if (intval < 1) {
-			pr_err("queue_depth out of range\n");
-			return -EINVAL;
-		}
-		pctx->opts->queue_depth = intval;
-		break;
-	case Opt_lock_timeout:
-		/* 0 is "wait forever" (i.e. infinite timeout) */
-		if (intval < 0 || intval > INT_MAX / 1000) {
-			pr_err("lock_timeout out of range\n");
-			return -EINVAL;
-		}
-		pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
-		break;
-	case Opt_pool_ns:
-		kfree(pctx->spec->pool_ns);
-		pctx->spec->pool_ns = match_strdup(argstr);
-		if (!pctx->spec->pool_ns)
-			return -ENOMEM;
-		break;
-	case Opt_read_only:
-		pctx->opts->read_only = true;
-		break;
-	case Opt_read_write:
-		pctx->opts->read_only = false;
-		break;
-	case Opt_lock_on_read:
-		pctx->opts->lock_on_read = true;
-		break;
-	case Opt_exclusive:
-		pctx->opts->exclusive = true;
-		break;
-	case Opt_notrim:
-		pctx->opts->trim = false;
-		break;
-	default:
-		/* libceph prints "bad option" msg */
-		return -EINVAL;
-	}
-
-	return 0;
-}
 
 static char* obj_op_name(enum obj_operation_type op_type)
 {
@@ -858,6 +909,8 @@
 		return "write";
 	case OBJ_OP_DISCARD:
 		return "discard";
+	case OBJ_OP_ZEROOUT:
+		return "zeroout";
 	default:
 		return "???";
 	}
@@ -891,23 +944,6 @@
 		kref_put(&rbdc->kref, rbd_client_release);
 }
 
-static int wait_for_latest_osdmap(struct ceph_client *client)
-{
-	u64 newest_epoch;
-	int ret;
-
-	ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
-	if (ret)
-		return ret;
-
-	if (client->osdc.osdmap->epoch >= newest_epoch)
-		return 0;
-
-	ceph_osdc_maybe_request_map(&client->osdc);
-	return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
-				     client->options->mount_timeout);
-}
-
 /*
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.  Either way, ceph_opts is consumed by this
@@ -918,7 +954,7 @@
 	struct rbd_client *rbdc;
 	int ret;
 
-	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
+	mutex_lock(&client_mutex);
 	rbdc = rbd_client_find(ceph_opts);
 	if (rbdc) {
 		ceph_destroy_options(ceph_opts);
@@ -927,7 +963,8 @@
 		 * Using an existing client.  Make sure ->pg_pools is up to
 		 * date before we look up the pool id in do_rbd_add().
 		 */
-		ret = wait_for_latest_osdmap(rbdc->client);
+		ret = ceph_wait_for_latest_osdmap(rbdc->client,
+					rbdc->client->options->mount_timeout);
 		if (ret) {
 			rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
 			rbd_put_client(rbdc);
@@ -1009,15 +1046,24 @@
 	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 }
 
+static void rbd_image_header_cleanup(struct rbd_image_header *header)
+{
+	kfree(header->object_prefix);
+	ceph_put_snap_context(header->snapc);
+	kfree(header->snap_sizes);
+	kfree(header->snap_names);
+
+	memset(header, 0, sizeof(*header));
+}
+
 /*
  * Fill an rbd image header with information from the given format 1
  * on-disk header.
  */
-static int rbd_header_from_disk(struct rbd_device *rbd_dev,
-				 struct rbd_image_header_ondisk *ondisk)
+static int rbd_header_from_disk(struct rbd_image_header *header,
+				struct rbd_image_header_ondisk *ondisk,
+				bool first_time)
 {
-	struct rbd_image_header *header = &rbd_dev->header;
-	bool first_time = header->object_prefix == NULL;
 	struct ceph_snap_context *snapc;
 	char *object_prefix = NULL;
 	char *snap_names = NULL;
@@ -1084,11 +1130,6 @@
 	if (first_time) {
 		header->object_prefix = object_prefix;
 		header->obj_order = ondisk->options.order;
-		rbd_init_layout(rbd_dev);
-	} else {
-		ceph_put_snap_context(header->snapc);
-		kfree(header->snap_names);
-		kfree(header->snap_sizes);
 	}
 
 	/* The remaining fields always get updated (when we refresh) */
@@ -1213,51 +1254,23 @@
 	return 0;
 }
 
-static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
-			u64 *snap_features)
-{
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
-	if (snap_id == CEPH_NOSNAP) {
-		*snap_features = rbd_dev->header.features;
-	} else if (rbd_dev->image_format == 1) {
-		*snap_features = 0;	/* No features for format 1 */
-	} else {
-		u64 features = 0;
-		int ret;
-
-		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
-		if (ret)
-			return ret;
-
-		*snap_features = features;
-	}
-	return 0;
-}
-
 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
 {
 	u64 snap_id = rbd_dev->spec->snap_id;
 	u64 size = 0;
-	u64 features = 0;
 	int ret;
 
 	ret = rbd_snap_size(rbd_dev, snap_id, &size);
 	if (ret)
 		return ret;
-	ret = rbd_snap_features(rbd_dev, snap_id, &features);
-	if (ret)
-		return ret;
 
 	rbd_dev->mapping.size = size;
-	rbd_dev->mapping.features = features;
-
 	return 0;
 }
 
 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
 {
 	rbd_dev->mapping.size = 0;
-	rbd_dev->mapping.features = 0;
 }
 
 static void zero_bvec(struct bio_vec *bv)
@@ -1300,6 +1313,8 @@
 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
 			       u32 bytes)
 {
+	dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
+
 	switch (obj_req->img_request->data_type) {
 	case OBJ_REQUEST_BIO:
 		zero_bios(&obj_req->bio_pos, off, bytes);
@@ -1309,7 +1324,7 @@
 		zero_bvecs(&obj_req->bvec_pos, off, bytes);
 		break;
 	default:
-		rbd_assert(0);
+		BUG();
 	}
 }
 
@@ -1322,22 +1337,6 @@
 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
-	dout("%s: img %p (was %d)\n", __func__, img_request,
-	     kref_read(&img_request->kref));
-	kref_get(&img_request->kref);
-}
-
-static void rbd_img_request_destroy(struct kref *kref);
-static void rbd_img_request_put(struct rbd_img_request *img_request)
-{
-	rbd_assert(img_request != NULL);
-	dout("%s: img %p (was %d)\n", __func__, img_request,
-		kref_read(&img_request->kref));
-	kref_put(&img_request->kref, rbd_img_request_destroy);
-}
-
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
 					struct rbd_obj_request *obj_request)
 {
@@ -1345,8 +1344,6 @@
 
 	/* Image request now owns object's original reference */
 	obj_request->img_request = img_request;
-	img_request->obj_request_count++;
-	img_request->pending_count++;
 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
 }
 
@@ -1355,19 +1352,17 @@
 {
 	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
 	list_del(&obj_request->ex.oe_item);
-	rbd_assert(img_request->obj_request_count > 0);
-	img_request->obj_request_count--;
 	rbd_assert(obj_request->img_request == img_request);
 	rbd_obj_request_put(obj_request);
 }
 
-static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
+static void rbd_osd_submit(struct ceph_osd_request *osd_req)
 {
-	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
 
-	dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
-	     obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
-	     obj_request->ex.oe_len, osd_req);
+	dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
+	     __func__, osd_req, obj_req, obj_req->ex.oe_objno,
+	     obj_req->ex.oe_off, obj_req->ex.oe_len);
 	ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
 }
 
@@ -1379,18 +1374,10 @@
 static void img_request_layered_set(struct rbd_img_request *img_request)
 {
 	set_bit(IMG_REQ_LAYERED, &img_request->flags);
-	smp_mb();
-}
-
-static void img_request_layered_clear(struct rbd_img_request *img_request)
-{
-	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
-	smp_mb();
 }
 
 static bool img_request_layered_test(struct rbd_img_request *img_request)
 {
-	smp_mb();
 	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
 }
 
@@ -1410,6 +1397,35 @@
 					rbd_dev->layout.object_size;
 }
 
+/*
+ * Must be called after rbd_obj_calc_img_extents().
+ */
+static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
+{
+	rbd_assert(obj_req->img_request->snapc);
+
+	if (obj_req->img_request->op_type == OBJ_OP_DISCARD) {
+		dout("%s %p objno %llu discard\n", __func__, obj_req,
+		     obj_req->ex.oe_objno);
+		return;
+	}
+
+	if (!obj_req->num_img_extents) {
+		dout("%s %p objno %llu not overlapping\n", __func__, obj_req,
+		     obj_req->ex.oe_objno);
+		return;
+	}
+
+	if (rbd_obj_is_entire(obj_req) &&
+	    !obj_req->img_request->snapc->num_snaps) {
+		dout("%s %p objno %llu entire\n", __func__, obj_req,
+		     obj_req->ex.oe_objno);
+		return;
+	}
+
+	obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
+}
+
 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
 {
 	return ceph_file_extents_bytes(obj_req->img_extents,
@@ -1423,47 +1439,47 @@
 		return false;
 	case OBJ_OP_WRITE:
 	case OBJ_OP_DISCARD:
+	case OBJ_OP_ZEROOUT:
 		return true;
 	default:
 		BUG();
 	}
 }
 
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
-
 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
 {
 	struct rbd_obj_request *obj_req = osd_req->r_priv;
+	int result;
 
 	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
 	     osd_req->r_result, obj_req);
-	rbd_assert(osd_req == obj_req->osd_req);
 
-	obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
-	if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
-		obj_req->xferred = osd_req->r_result;
+	/*
+	 * Writes aren't allowed to return a data payload.  In some
+	 * guarded write cases (e.g. stat + zero on an empty object)
+	 * a stat response makes it through, but we don't care.
+	 */
+	if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
+		result = 0;
 	else
-		/*
-		 * Writes aren't allowed to return a data payload.  In some
-		 * guarded write cases (e.g. stat + zero on an empty object)
-		 * a stat response makes it through, but we don't care.
-		 */
-		obj_req->xferred = 0;
+		result = osd_req->r_result;
 
-	rbd_obj_handle_request(obj_req);
+	rbd_obj_handle_request(obj_req, result);
 }
 
-static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
 {
-	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct rbd_obj_request *obj_request = osd_req->r_priv;
+	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
+	struct ceph_options *opt = rbd_dev->rbd_client->client->options;
 
-	osd_req->r_flags = CEPH_OSD_FLAG_READ;
+	osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
 	osd_req->r_snapid = obj_request->img_request->snap_id;
 }
 
-static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
 {
-	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct rbd_obj_request *obj_request = osd_req->r_priv;
 
 	osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
 	ktime_get_real_ts64(&osd_req->r_mtime);
@@ -1471,21 +1487,21 @@
 }
 
 static struct ceph_osd_request *
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
+			  struct ceph_snap_context *snapc, int num_ops)
 {
-	struct rbd_img_request *img_req = obj_req->img_request;
-	struct rbd_device *rbd_dev = img_req->rbd_dev;
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_request *req;
 	const char *name_format = rbd_dev->image_format == 1 ?
 				      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
+	int ret;
 
-	req = ceph_osdc_alloc_request(osdc,
-			(rbd_img_is_write(img_req) ? img_req->snapc : NULL),
-			num_ops, false, GFP_NOIO);
+	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
 	if (!req)
-		return NULL;
+		return ERR_PTR(-ENOMEM);
 
+	list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
 	req->r_callback = rbd_osd_req_callback;
 	req->r_priv = obj_req;
 
@@ -1496,23 +1512,21 @@
 	ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
 	req->r_base_oloc.pool = rbd_dev->layout.pool_id;
 
-	if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
-			rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
-		goto err_req;
-
-	if (ceph_osdc_alloc_messages(req, GFP_NOIO))
-		goto err_req;
+	ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
+			       rbd_dev->header.object_prefix,
+			       obj_req->ex.oe_objno);
+	if (ret)
+		return ERR_PTR(ret);
 
 	return req;
-
-err_req:
-	ceph_osdc_put_request(req);
-	return NULL;
 }
 
-static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
+static struct ceph_osd_request *
+rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
 {
-	ceph_osdc_put_request(osd_req);
+	rbd_assert(obj_req->img_request->snapc);
+	return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
+					 num_ops);
 }
 
 static struct rbd_obj_request *rbd_obj_request_create(void)
@@ -1524,6 +1538,8 @@
 		return NULL;
 
 	ceph_object_extent_init(&obj_request->ex);
+	INIT_LIST_HEAD(&obj_request->osd_reqs);
+	mutex_init(&obj_request->state_mutex);
 	kref_init(&obj_request->kref);
 
 	dout("%s %p\n", __func__, obj_request);
@@ -1533,14 +1549,19 @@
 static void rbd_obj_request_destroy(struct kref *kref)
 {
 	struct rbd_obj_request *obj_request;
+	struct ceph_osd_request *osd_req;
 	u32 i;
 
 	obj_request = container_of(kref, struct rbd_obj_request, kref);
 
 	dout("%s: obj %p\n", __func__, obj_request);
 
-	if (obj_request->osd_req)
-		rbd_osd_req_destroy(obj_request->osd_req);
+	while (!list_empty(&obj_request->osd_reqs)) {
+		osd_req = list_first_entry(&obj_request->osd_reqs,
+				    struct ceph_osd_request, r_private_item);
+		list_del_init(&osd_req->r_private_item);
+		ceph_osdc_put_request(osd_req);
+	}
 
 	switch (obj_request->img_request->data_type) {
 	case OBJ_REQUEST_NODATA:
@@ -1551,7 +1572,7 @@
 		kfree(obj_request->bvec_pos.bvecs);
 		break;
 	default:
-		rbd_assert(0);
+		BUG();
 	}
 
 	kfree(obj_request->img_extents);
@@ -1617,10 +1638,8 @@
 	if (!rbd_dev->parent_spec)
 		return false;
 
-	down_read(&rbd_dev->header_rwsem);
 	if (rbd_dev->parent_overlap)
 		counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
-	up_read(&rbd_dev->header_rwsem);
 
 	if (counter < 0)
 		rbd_warn(rbd_dev, "parent reference overflow");
@@ -1628,64 +1647,528 @@
 	return counter > 0;
 }
 
-/*
- * Caller is responsible for filling in the list of object requests
- * that comprises the image request, and the Linux request pointer
- * (if there is one).
- */
-static struct rbd_img_request *rbd_img_request_create(
-					struct rbd_device *rbd_dev,
-					enum obj_operation_type op_type,
-					struct ceph_snap_context *snapc)
+static void rbd_img_request_init(struct rbd_img_request *img_request,
+				 struct rbd_device *rbd_dev,
+				 enum obj_operation_type op_type)
 {
-	struct rbd_img_request *img_request;
-
-	img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
-	if (!img_request)
-		return NULL;
+	memset(img_request, 0, sizeof(*img_request));
 
 	img_request->rbd_dev = rbd_dev;
 	img_request->op_type = op_type;
-	if (!rbd_img_is_write(img_request))
-		img_request->snap_id = rbd_dev->spec->snap_id;
-	else
-		img_request->snapc = snapc;
 
-	if (rbd_dev_parent_get(rbd_dev))
-		img_request_layered_set(img_request);
-
-	spin_lock_init(&img_request->completion_lock);
+	INIT_LIST_HEAD(&img_request->lock_item);
 	INIT_LIST_HEAD(&img_request->object_extents);
-	kref_init(&img_request->kref);
-
-	dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
-	     obj_op_name(op_type), img_request);
-	return img_request;
+	mutex_init(&img_request->state_mutex);
 }
 
-static void rbd_img_request_destroy(struct kref *kref)
+/*
+ * Only snap_id is captured here, for reads.  For writes, snapshot
+ * context is captured in rbd_img_object_requests() after exclusive
+ * lock is ensured to be held.
+ */
+static void rbd_img_capture_header(struct rbd_img_request *img_req)
 {
-	struct rbd_img_request *img_request;
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+
+	lockdep_assert_held(&rbd_dev->header_rwsem);
+
+	if (!rbd_img_is_write(img_req))
+		img_req->snap_id = rbd_dev->spec->snap_id;
+
+	if (rbd_dev_parent_get(rbd_dev))
+		img_request_layered_set(img_req);
+}
+
+static void rbd_img_request_destroy(struct rbd_img_request *img_request)
+{
 	struct rbd_obj_request *obj_request;
 	struct rbd_obj_request *next_obj_request;
 
-	img_request = container_of(kref, struct rbd_img_request, kref);
-
 	dout("%s: img %p\n", __func__, img_request);
 
+	WARN_ON(!list_empty(&img_request->lock_item));
 	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
 		rbd_img_obj_request_del(img_request, obj_request);
-	rbd_assert(img_request->obj_request_count == 0);
 
-	if (img_request_layered_test(img_request)) {
-		img_request_layered_clear(img_request);
+	if (img_request_layered_test(img_request))
 		rbd_dev_parent_put(img_request->rbd_dev);
-	}
 
 	if (rbd_img_is_write(img_request))
 		ceph_put_snap_context(img_request->snapc);
 
-	kmem_cache_free(rbd_img_request_cache, img_request);
+	if (test_bit(IMG_REQ_CHILD, &img_request->flags))
+		kmem_cache_free(rbd_img_request_cache, img_request);
+}
+
+#define BITS_PER_OBJ	2
+#define OBJS_PER_BYTE	(BITS_PER_BYTE / BITS_PER_OBJ)
+#define OBJ_MASK	((1 << BITS_PER_OBJ) - 1)
+
+static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
+				   u64 *index, u8 *shift)
+{
+	u32 off;
+
+	rbd_assert(objno < rbd_dev->object_map_size);
+	*index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
+	*shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
+}
+
+static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
+{
+	u64 index;
+	u8 shift;
+
+	lockdep_assert_held(&rbd_dev->object_map_lock);
+	__rbd_object_map_index(rbd_dev, objno, &index, &shift);
+	return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
+}
+
+static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
+{
+	u64 index;
+	u8 shift;
+	u8 *p;
+
+	lockdep_assert_held(&rbd_dev->object_map_lock);
+	rbd_assert(!(val & ~OBJ_MASK));
+
+	__rbd_object_map_index(rbd_dev, objno, &index, &shift);
+	p = &rbd_dev->object_map[index];
+	*p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
+}
+
+static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
+{
+	u8 state;
+
+	spin_lock(&rbd_dev->object_map_lock);
+	state = __rbd_object_map_get(rbd_dev, objno);
+	spin_unlock(&rbd_dev->object_map_lock);
+	return state;
+}
+
+static bool use_object_map(struct rbd_device *rbd_dev)
+{
+	/*
+	 * An image mapped read-only can't use the object map -- it isn't
+	 * loaded because the header lock isn't acquired.  Someone else can
+	 * write to the image and update the object map behind our back.
+	 *
+	 * A snapshot can't be written to, so using the object map is always
+	 * safe.
+	 */
+	if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
+		return false;
+
+	return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
+		!(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
+}
+
+static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
+{
+	u8 state;
+
+	/* fall back to default logic if object map is disabled or invalid */
+	if (!use_object_map(rbd_dev))
+		return true;
+
+	state = rbd_object_map_get(rbd_dev, objno);
+	return state != OBJECT_NONEXISTENT;
+}
+
+static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
+				struct ceph_object_id *oid)
+{
+	if (snap_id == CEPH_NOSNAP)
+		ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
+				rbd_dev->spec->image_id);
+	else
+		ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
+				rbd_dev->spec->image_id, snap_id);
+}
+
+static int rbd_object_map_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	CEPH_DEFINE_OID_ONSTACK(oid);
+	u8 lock_type;
+	char *lock_tag;
+	struct ceph_locker *lockers;
+	u32 num_lockers;
+	bool broke_lock = false;
+	int ret;
+
+	rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
+
+again:
+	ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
+			    CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
+	if (ret != -EBUSY || broke_lock) {
+		if (ret == -EEXIST)
+			ret = 0; /* already locked by myself */
+		if (ret)
+			rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
+		return ret;
+	}
+
+	ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
+				 RBD_LOCK_NAME, &lock_type, &lock_tag,
+				 &lockers, &num_lockers);
+	if (ret) {
+		if (ret == -ENOENT)
+			goto again;
+
+		rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
+		return ret;
+	}
+
+	kfree(lock_tag);
+	if (num_lockers == 0)
+		goto again;
+
+	rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
+		 ENTITY_NAME(lockers[0].id.name));
+
+	ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
+				  RBD_LOCK_NAME, lockers[0].id.cookie,
+				  &lockers[0].id.name);
+	ceph_free_lockers(lockers, num_lockers);
+	if (ret) {
+		if (ret == -ENOENT)
+			goto again;
+
+		rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
+		return ret;
+	}
+
+	broke_lock = true;
+	goto again;
+}
+
+static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	CEPH_DEFINE_OID_ONSTACK(oid);
+	int ret;
+
+	rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
+
+	ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
+			      "");
+	if (ret && ret != -ENOENT)
+		rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
+}
+
+static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
+{
+	u8 struct_v;
+	u32 struct_len;
+	u32 header_len;
+	void *header_end;
+	int ret;
+
+	ceph_decode_32_safe(p, end, header_len, e_inval);
+	header_end = *p + header_len;
+
+	ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
+				  &struct_len);
+	if (ret)
+		return ret;
+
+	ceph_decode_64_safe(p, end, *object_map_size, e_inval);
+
+	*p = header_end;
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+static int __rbd_object_map_load(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	CEPH_DEFINE_OID_ONSTACK(oid);
+	struct page **pages;
+	void *p, *end;
+	size_t reply_len;
+	u64 num_objects;
+	u64 object_map_bytes;
+	u64 object_map_size;
+	int num_pages;
+	int ret;
+
+	rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
+
+	num_objects = ceph_get_num_objects(&rbd_dev->layout,
+					   rbd_dev->mapping.size);
+	object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
+					    BITS_PER_BYTE);
+	num_pages = calc_pages_for(0, object_map_bytes) + 1;
+	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
+
+	reply_len = num_pages * PAGE_SIZE;
+	rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
+	ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
+			     "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
+			     NULL, 0, pages, &reply_len);
+	if (ret)
+		goto out;
+
+	p = page_address(pages[0]);
+	end = p + min(reply_len, (size_t)PAGE_SIZE);
+	ret = decode_object_map_header(&p, end, &object_map_size);
+	if (ret)
+		goto out;
+
+	if (object_map_size != num_objects) {
+		rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
+			 object_map_size, num_objects);
+		ret = -EINVAL;
+		goto out;
+	}
+
+	if (offset_in_page(p) + object_map_bytes > reply_len) {
+		ret = -EINVAL;
+		goto out;
+	}
+
+	rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
+	if (!rbd_dev->object_map) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	rbd_dev->object_map_size = object_map_size;
+	ceph_copy_from_page_vector(pages, rbd_dev->object_map,
+				   offset_in_page(p), object_map_bytes);
+
+out:
+	ceph_release_page_vector(pages, num_pages);
+	return ret;
+}
+
+static void rbd_object_map_free(struct rbd_device *rbd_dev)
+{
+	kvfree(rbd_dev->object_map);
+	rbd_dev->object_map = NULL;
+	rbd_dev->object_map_size = 0;
+}
+
+static int rbd_object_map_load(struct rbd_device *rbd_dev)
+{
+	int ret;
+
+	ret = __rbd_object_map_load(rbd_dev);
+	if (ret)
+		return ret;
+
+	ret = rbd_dev_v2_get_flags(rbd_dev);
+	if (ret) {
+		rbd_object_map_free(rbd_dev);
+		return ret;
+	}
+
+	if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
+		rbd_warn(rbd_dev, "object map is invalid");
+
+	return 0;
+}
+
+static int rbd_object_map_open(struct rbd_device *rbd_dev)
+{
+	int ret;
+
+	ret = rbd_object_map_lock(rbd_dev);
+	if (ret)
+		return ret;
+
+	ret = rbd_object_map_load(rbd_dev);
+	if (ret) {
+		rbd_object_map_unlock(rbd_dev);
+		return ret;
+	}
+
+	return 0;
+}
+
+static void rbd_object_map_close(struct rbd_device *rbd_dev)
+{
+	rbd_object_map_free(rbd_dev);
+	rbd_object_map_unlock(rbd_dev);
+}
+
+/*
+ * This function needs snap_id (or more precisely just something to
+ * distinguish between HEAD and snapshot object maps), new_state and
+ * current_state that were passed to rbd_object_map_update().
+ *
+ * To avoid allocating and stashing a context we piggyback on the OSD
+ * request.  A HEAD update has two ops (assert_locked).  For new_state
+ * and current_state we decode our own object_map_update op, encoded in
+ * rbd_cls_object_map_update().
+ */
+static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
+					struct ceph_osd_request *osd_req)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	struct ceph_osd_data *osd_data;
+	u64 objno;
+	u8 state, new_state, current_state;
+	bool has_current_state;
+	void *p;
+
+	if (osd_req->r_result)
+		return osd_req->r_result;
+
+	/*
+	 * Nothing to do for a snapshot object map.
+	 */
+	if (osd_req->r_num_ops == 1)
+		return 0;
+
+	/*
+	 * Update in-memory HEAD object map.
+	 */
+	rbd_assert(osd_req->r_num_ops == 2);
+	osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
+	rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
+
+	p = page_address(osd_data->pages[0]);
+	objno = ceph_decode_64(&p);
+	rbd_assert(objno == obj_req->ex.oe_objno);
+	rbd_assert(ceph_decode_64(&p) == objno + 1);
+	new_state = ceph_decode_8(&p);
+	has_current_state = ceph_decode_8(&p);
+	if (has_current_state)
+		current_state = ceph_decode_8(&p);
+
+	spin_lock(&rbd_dev->object_map_lock);
+	state = __rbd_object_map_get(rbd_dev, objno);
+	if (!has_current_state || current_state == state ||
+	    (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
+		__rbd_object_map_set(rbd_dev, objno, new_state);
+	spin_unlock(&rbd_dev->object_map_lock);
+
+	return 0;
+}
+
+static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
+{
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
+	int result;
+
+	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
+	     osd_req->r_result, obj_req);
+
+	result = rbd_object_map_update_finish(obj_req, osd_req);
+	rbd_obj_handle_request(obj_req, result);
+}
+
+static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
+{
+	u8 state = rbd_object_map_get(rbd_dev, objno);
+
+	if (state == new_state ||
+	    (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
+	    (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
+		return false;
+
+	return true;
+}
+
+static int rbd_cls_object_map_update(struct ceph_osd_request *req,
+				     int which, u64 objno, u8 new_state,
+				     const u8 *current_state)
+{
+	struct page **pages;
+	void *p, *start;
+	int ret;
+
+	ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
+	if (ret)
+		return ret;
+
+	pages = ceph_alloc_page_vector(1, GFP_NOIO);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
+
+	p = start = page_address(pages[0]);
+	ceph_encode_64(&p, objno);
+	ceph_encode_64(&p, objno + 1);
+	ceph_encode_8(&p, new_state);
+	if (current_state) {
+		ceph_encode_8(&p, 1);
+		ceph_encode_8(&p, *current_state);
+	} else {
+		ceph_encode_8(&p, 0);
+	}
+
+	osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
+					  false, true);
+	return 0;
+}
+
+/*
+ * Return:
+ *   0 - object map update sent
+ *   1 - object map update isn't needed
+ *  <0 - error
+ */
+static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
+				 u8 new_state, const u8 *current_state)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct ceph_osd_request *req;
+	int num_ops = 1;
+	int which = 0;
+	int ret;
+
+	if (snap_id == CEPH_NOSNAP) {
+		if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
+			return 1;
+
+		num_ops++; /* assert_locked */
+	}
+
+	req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
+	req->r_callback = rbd_object_map_callback;
+	req->r_priv = obj_req;
+
+	rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
+	ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+	ktime_get_real_ts64(&req->r_mtime);
+
+	if (snap_id == CEPH_NOSNAP) {
+		/*
+		 * Protect against possible race conditions during lock
+		 * ownership transitions.
+		 */
+		ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
+					     CEPH_CLS_LOCK_EXCLUSIVE, "", "");
+		if (ret)
+			return ret;
+	}
+
+	ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
+					new_state, current_state);
+	if (ret)
+		return ret;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		return ret;
+
+	ceph_osdc_start_request(osdc, req, false);
+	return 0;
 }
 
 static void prune_extents(struct ceph_file_extent *img_extents,
@@ -1735,11 +2218,13 @@
 	return 0;
 }
 
-static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
+static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
 {
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
+
 	switch (obj_req->img_request->data_type) {
 	case OBJ_REQUEST_BIO:
-		osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
+		osd_req_op_extent_osd_data_bio(osd_req, which,
 					       &obj_req->bio_pos,
 					       obj_req->ex.oe_len);
 		break;
@@ -1748,30 +2233,15 @@
 		rbd_assert(obj_req->bvec_pos.iter.bi_size ==
 							obj_req->ex.oe_len);
 		rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
-		osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
+		osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
 						    &obj_req->bvec_pos);
 		break;
 	default:
-		rbd_assert(0);
+		BUG();
 	}
 }
 
-static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
-{
-	obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
-
-	osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
-			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
-	rbd_osd_req_setup_data(obj_req, 0);
-
-	rbd_osd_req_format_read(obj_req);
-	return 0;
-}
-
-static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
-				unsigned int which)
+static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
 {
 	struct page **pages;
 
@@ -1787,39 +2257,61 @@
 	if (IS_ERR(pages))
 		return PTR_ERR(pages);
 
-	osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
-	osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
+	osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
+	osd_req_op_raw_data_in_pages(osd_req, which, pages,
 				     8 + sizeof(struct ceph_timespec),
 				     0, false, true);
 	return 0;
 }
 
-static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
-				  unsigned int which)
+static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
+				u32 bytes)
 {
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
+	int ret;
+
+	ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
+	if (ret)
+		return ret;
+
+	osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
+					  obj_req->copyup_bvec_count, bytes);
+	return 0;
+}
+
+static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
+{
+	obj_req->read_state = RBD_OBJ_READ_START;
+	return 0;
+}
+
+static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+				      int which)
+{
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	u16 opcode;
 
-	osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
-				   rbd_dev->layout.object_size,
-				   rbd_dev->layout.object_size);
+	if (!use_object_map(rbd_dev) ||
+	    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
+		osd_req_op_alloc_hint_init(osd_req, which++,
+					   rbd_dev->layout.object_size,
+					   rbd_dev->layout.object_size,
+					   rbd_dev->opts->alloc_hint_flags);
+	}
 
 	if (rbd_obj_is_entire(obj_req))
 		opcode = CEPH_OSD_OP_WRITEFULL;
 	else
 		opcode = CEPH_OSD_OP_WRITE;
 
-	osd_req_op_extent_init(obj_req->osd_req, which, opcode,
+	osd_req_op_extent_init(osd_req, which, opcode,
 			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
-	rbd_osd_req_setup_data(obj_req, which++);
-
-	rbd_assert(which == obj_req->osd_req->r_num_ops);
-	rbd_osd_req_format_write(obj_req);
+	rbd_osd_setup_data(osd_req, which);
 }
 
-static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
+static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
 {
-	unsigned int num_osd_ops, which = 0;
 	int ret;
 
 	/* reverse map the entire object onto the parent */
@@ -1827,61 +2319,104 @@
 	if (ret)
 		return ret;
 
-	if (obj_req->num_img_extents) {
-		obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-		num_osd_ops = 3; /* stat + setallochint + write/writefull */
-	} else {
-		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-		num_osd_ops = 2; /* setallochint + write/writefull */
-	}
-
-	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
-
-	if (obj_req->num_img_extents) {
-		ret = __rbd_obj_setup_stat(obj_req, which++);
-		if (ret)
-			return ret;
-	}
-
-	__rbd_obj_setup_write(obj_req, which);
+	obj_req->write_state = RBD_OBJ_WRITE_START;
 	return 0;
 }
 
-static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
-				    unsigned int which)
+static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
 {
+	return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
+					  CEPH_OSD_OP_ZERO;
+}
+
+static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
+					int which)
+{
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
+		rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
+		osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
+	} else {
+		osd_req_op_extent_init(osd_req, which,
+				       truncate_or_zero_opcode(obj_req),
+				       obj_req->ex.oe_off, obj_req->ex.oe_len,
+				       0, 0);
+	}
+}
+
+static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	u64 off, next_off;
+	int ret;
+
+	/*
+	 * Align the range to alloc_size boundary and punt on discards
+	 * that are too small to free up any space.
+	 *
+	 * alloc_size == object_size && is_tail() is a special case for
+	 * filestore with filestore_punch_hole = false, needed to allow
+	 * truncate (in addition to delete).
+	 */
+	if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
+	    !rbd_obj_is_tail(obj_req)) {
+		off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
+		next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
+				      rbd_dev->opts->alloc_size);
+		if (off >= next_off)
+			return 1;
+
+		dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
+		     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
+		     off, next_off - off);
+		obj_req->ex.oe_off = off;
+		obj_req->ex.oe_len = next_off - off;
+	}
+
+	/* reverse map the entire object onto the parent */
+	ret = rbd_obj_calc_img_extents(obj_req, true);
+	if (ret)
+		return ret;
+
+	obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
+	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
+		obj_req->flags |= RBD_OBJ_FLAG_DELETION;
+
+	obj_req->write_state = RBD_OBJ_WRITE_START;
+	return 0;
+}
+
+static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
+					int which)
+{
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
 	u16 opcode;
 
 	if (rbd_obj_is_entire(obj_req)) {
 		if (obj_req->num_img_extents) {
-			osd_req_op_init(obj_req->osd_req, which++,
-					CEPH_OSD_OP_CREATE, 0);
+			if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
+				osd_req_op_init(osd_req, which++,
+						CEPH_OSD_OP_CREATE, 0);
 			opcode = CEPH_OSD_OP_TRUNCATE;
 		} else {
-			osd_req_op_init(obj_req->osd_req, which++,
+			rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
+			osd_req_op_init(osd_req, which++,
 					CEPH_OSD_OP_DELETE, 0);
 			opcode = 0;
 		}
-	} else if (rbd_obj_is_tail(obj_req)) {
-		opcode = CEPH_OSD_OP_TRUNCATE;
 	} else {
-		opcode = CEPH_OSD_OP_ZERO;
+		opcode = truncate_or_zero_opcode(obj_req);
 	}
 
 	if (opcode)
-		osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
+		osd_req_op_extent_init(osd_req, which, opcode,
 				       obj_req->ex.oe_off, obj_req->ex.oe_len,
 				       0, 0);
-
-	rbd_assert(which == obj_req->osd_req->r_num_ops);
-	rbd_osd_req_format_write(obj_req);
 }
 
-static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
 {
-	unsigned int num_osd_ops, which = 0;
 	int ret;
 
 	/* reverse map the entire object onto the parent */
@@ -1889,64 +2424,96 @@
 	if (ret)
 		return ret;
 
-	if (rbd_obj_is_entire(obj_req)) {
-		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-		if (obj_req->num_img_extents)
-			num_osd_ops = 2; /* create + truncate */
-		else
-			num_osd_ops = 1; /* delete */
-	} else {
-		if (obj_req->num_img_extents) {
-			obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-			num_osd_ops = 2; /* stat + truncate/zero */
-		} else {
-			obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-			num_osd_ops = 1; /* truncate/zero */
-		}
+	if (!obj_req->num_img_extents) {
+		obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
+		if (rbd_obj_is_entire(obj_req))
+			obj_req->flags |= RBD_OBJ_FLAG_DELETION;
 	}
 
-	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
-
-	if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
-		ret = __rbd_obj_setup_stat(obj_req, which++);
-		if (ret)
-			return ret;
-	}
-
-	__rbd_obj_setup_discard(obj_req, which);
+	obj_req->write_state = RBD_OBJ_WRITE_START;
 	return 0;
 }
 
+static int count_write_ops(struct rbd_obj_request *obj_req)
+{
+	struct rbd_img_request *img_req = obj_req->img_request;
+
+	switch (img_req->op_type) {
+	case OBJ_OP_WRITE:
+		if (!use_object_map(img_req->rbd_dev) ||
+		    !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
+			return 2; /* setallochint + write/writefull */
+
+		return 1; /* write/writefull */
+	case OBJ_OP_DISCARD:
+		return 1; /* delete/truncate/zero */
+	case OBJ_OP_ZEROOUT:
+		if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
+		    !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
+			return 2; /* create + truncate */
+
+		return 1; /* delete/truncate/zero */
+	default:
+		BUG();
+	}
+}
+
+static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+				    int which)
+{
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
+
+	switch (obj_req->img_request->op_type) {
+	case OBJ_OP_WRITE:
+		__rbd_osd_setup_write_ops(osd_req, which);
+		break;
+	case OBJ_OP_DISCARD:
+		__rbd_osd_setup_discard_ops(osd_req, which);
+		break;
+	case OBJ_OP_ZEROOUT:
+		__rbd_osd_setup_zeroout_ops(osd_req, which);
+		break;
+	default:
+		BUG();
+	}
+}
+
 /*
- * For each object request in @img_req, allocate an OSD request, add
- * individual OSD ops and prepare them for submission.  The number of
- * OSD ops depends on op_type and the overlap point (if any).
+ * Prune the list of object requests (adjust offset and/or length, drop
+ * redundant requests).  Prepare object request state machines and image
+ * request state machine for execution.
  */
 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
 {
-	struct rbd_obj_request *obj_req;
+	struct rbd_obj_request *obj_req, *next_obj_req;
 	int ret;
 
-	for_each_obj_request(img_req, obj_req) {
+	for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
 		switch (img_req->op_type) {
 		case OBJ_OP_READ:
-			ret = rbd_obj_setup_read(obj_req);
+			ret = rbd_obj_init_read(obj_req);
 			break;
 		case OBJ_OP_WRITE:
-			ret = rbd_obj_setup_write(obj_req);
+			ret = rbd_obj_init_write(obj_req);
 			break;
 		case OBJ_OP_DISCARD:
-			ret = rbd_obj_setup_discard(obj_req);
+			ret = rbd_obj_init_discard(obj_req);
+			break;
+		case OBJ_OP_ZEROOUT:
+			ret = rbd_obj_init_zeroout(obj_req);
 			break;
 		default:
-			rbd_assert(0);
+			BUG();
 		}
-		if (ret)
+		if (ret < 0)
 			return ret;
+		if (ret > 0) {
+			rbd_img_obj_request_del(img_req, obj_req);
+			continue;
+		}
 	}
 
+	img_req->state = RBD_IMG_START;
 	return 0;
 }
 
@@ -2235,32 +2802,78 @@
 					 &it);
 }
 
-static void rbd_img_request_submit(struct rbd_img_request *img_request)
+static void rbd_img_handle_request_work(struct work_struct *work)
 {
-	struct rbd_obj_request *obj_request;
+	struct rbd_img_request *img_req =
+	    container_of(work, struct rbd_img_request, work);
 
-	dout("%s: img %p\n", __func__, img_request);
+	rbd_img_handle_request(img_req, img_req->work_result);
+}
 
-	rbd_img_request_get(img_request);
-	for_each_obj_request(img_request, obj_request)
-		rbd_obj_request_submit(obj_request);
+static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
+{
+	INIT_WORK(&img_req->work, rbd_img_handle_request_work);
+	img_req->work_result = result;
+	queue_work(rbd_wq, &img_req->work);
+}
 
-	rbd_img_request_put(img_request);
+static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+	if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
+		obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
+		return true;
+	}
+
+	dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
+	     obj_req->ex.oe_objno);
+	return false;
+}
+
+static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
+{
+	struct ceph_osd_request *osd_req;
+	int ret;
+
+	osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
+
+	osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
+			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
+	rbd_osd_setup_data(osd_req, 0);
+	rbd_osd_format_read(osd_req);
+
+	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+	if (ret)
+		return ret;
+
+	rbd_osd_submit(osd_req);
+	return 0;
 }
 
 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
 {
 	struct rbd_img_request *img_req = obj_req->img_request;
+	struct rbd_device *parent = img_req->rbd_dev->parent;
 	struct rbd_img_request *child_img_req;
 	int ret;
 
-	child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
-					       OBJ_OP_READ, NULL);
+	child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
 	if (!child_img_req)
 		return -ENOMEM;
 
+	rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
 	__set_bit(IMG_REQ_CHILD, &child_img_req->flags);
 	child_img_req->obj_request = obj_req;
+
+	down_read(&parent->header_rwsem);
+	rbd_img_capture_header(child_img_req);
+	up_read(&parent->header_rwsem);
+
+	dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
+	     obj_req);
 
 	if (!rbd_img_is_write(img_req)) {
 		switch (img_req->data_type) {
@@ -2278,7 +2891,7 @@
 						      &obj_req->bvec_pos);
 			break;
 		default:
-			rbd_assert(0);
+			BUG();
 		}
 	} else {
 		ret = rbd_img_fill_from_bvecs(child_img_req,
@@ -2287,55 +2900,159 @@
 					      obj_req->copyup_bvecs);
 	}
 	if (ret) {
-		rbd_img_request_put(child_img_req);
+		rbd_img_request_destroy(child_img_req);
 		return ret;
 	}
 
-	rbd_img_request_submit(child_img_req);
+	/* avoid parent chain recursion */
+	rbd_img_schedule(child_img_req, 0);
 	return 0;
 }
 
-static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
+static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
 {
 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	int ret;
 
-	if (obj_req->result == -ENOENT &&
-	    rbd_dev->parent_overlap && !obj_req->tried_parent) {
-		/* reverse map this object extent onto the parent */
-		ret = rbd_obj_calc_img_extents(obj_req, false);
+again:
+	switch (obj_req->read_state) {
+	case RBD_OBJ_READ_START:
+		rbd_assert(!*result);
+
+		if (!rbd_obj_may_exist(obj_req)) {
+			*result = -ENOENT;
+			obj_req->read_state = RBD_OBJ_READ_OBJECT;
+			goto again;
+		}
+
+		ret = rbd_obj_read_object(obj_req);
 		if (ret) {
-			obj_req->result = ret;
+			*result = ret;
 			return true;
 		}
-
-		if (obj_req->num_img_extents) {
-			obj_req->tried_parent = true;
-			ret = rbd_obj_read_from_parent(obj_req);
+		obj_req->read_state = RBD_OBJ_READ_OBJECT;
+		return false;
+	case RBD_OBJ_READ_OBJECT:
+		if (*result == -ENOENT && rbd_dev->parent_overlap) {
+			/* reverse map this object extent onto the parent */
+			ret = rbd_obj_calc_img_extents(obj_req, false);
 			if (ret) {
-				obj_req->result = ret;
+				*result = ret;
 				return true;
 			}
-			return false;
+			if (obj_req->num_img_extents) {
+				ret = rbd_obj_read_from_parent(obj_req);
+				if (ret) {
+					*result = ret;
+					return true;
+				}
+				obj_req->read_state = RBD_OBJ_READ_PARENT;
+				return false;
+			}
 		}
+
+		/*
+		 * -ENOENT means a hole in the image -- zero-fill the entire
+		 * length of the request.  A short read also implies zero-fill
+		 * to the end of the request.
+		 */
+		if (*result == -ENOENT) {
+			rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
+			*result = 0;
+		} else if (*result >= 0) {
+			if (*result < obj_req->ex.oe_len)
+				rbd_obj_zero_range(obj_req, *result,
+						obj_req->ex.oe_len - *result);
+			else
+				rbd_assert(*result == obj_req->ex.oe_len);
+			*result = 0;
+		}
+		return true;
+	case RBD_OBJ_READ_PARENT:
+		/*
+		 * The parent image is read only up to the overlap -- zero-fill
+		 * from the overlap to the end of the request.
+		 */
+		if (!*result) {
+			u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
+
+			if (obj_overlap < obj_req->ex.oe_len)
+				rbd_obj_zero_range(obj_req, obj_overlap,
+					    obj_req->ex.oe_len - obj_overlap);
+		}
+		return true;
+	default:
+		BUG();
+	}
+}
+
+static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+	if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
+		obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
+
+	if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
+	    (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
+		dout("%s %p noop for nonexistent\n", __func__, obj_req);
+		return true;
 	}
 
-	/*
-	 * -ENOENT means a hole in the image -- zero-fill the entire
-	 * length of the request.  A short read also implies zero-fill
-	 * to the end of the request.  In both cases we update xferred
-	 * count to indicate the whole request was satisfied.
-	 */
-	if (obj_req->result == -ENOENT ||
-	    (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
-		rbd_assert(!obj_req->xferred || !obj_req->result);
-		rbd_obj_zero_range(obj_req, obj_req->xferred,
-				   obj_req->ex.oe_len - obj_req->xferred);
-		obj_req->result = 0;
-		obj_req->xferred = obj_req->ex.oe_len;
+	return false;
+}
+
+/*
+ * Return:
+ *   0 - object map update sent
+ *   1 - object map update isn't needed
+ *  <0 - error
+ */
+static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	u8 new_state;
+
+	if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+		return 1;
+
+	if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
+		new_state = OBJECT_PENDING;
+	else
+		new_state = OBJECT_EXISTS;
+
+	return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
+}
+
+static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
+{
+	struct ceph_osd_request *osd_req;
+	int num_ops = count_write_ops(obj_req);
+	int which = 0;
+	int ret;
+
+	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
+		num_ops++; /* stat */
+
+	osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
+
+	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
+		ret = rbd_osd_setup_stat(osd_req, which++);
+		if (ret)
+			return ret;
 	}
 
-	return true;
+	rbd_osd_setup_write_ops(osd_req, which);
+	rbd_osd_format_write(osd_req);
+
+	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+	if (ret)
+		return ret;
+
+	rbd_osd_submit(osd_req);
+	return 0;
 }
 
 /*
@@ -2356,56 +3073,66 @@
 	return true;
 }
 
-static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+#define MODS_ONLY	U32_MAX
+
+static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
+				      u32 bytes)
 {
-	unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
+	struct ceph_osd_request *osd_req;
 	int ret;
 
 	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
-	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
-	rbd_osd_req_destroy(obj_req->osd_req);
+	rbd_assert(bytes > 0 && bytes != MODS_ONLY);
 
-	/*
-	 * Create a copyup request with the same number of OSD ops as
-	 * the original request.  The original request was stat + op(s),
-	 * the new copyup request will be copyup + the same op(s).
-	 */
-	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
 
-	ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
-				  "copyup");
+	ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
 	if (ret)
 		return ret;
 
-	/*
-	 * Only send non-zero copyup data to save some I/O and network
-	 * bandwidth -- zero copyup data is equivalent to the object not
-	 * existing.
-	 */
-	if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
-		dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
-		bytes = 0;
-	}
-	osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
-					  obj_req->copyup_bvecs,
-					  obj_req->copyup_bvec_count,
-					  bytes);
+	rbd_osd_format_write(osd_req);
 
-	switch (obj_req->img_request->op_type) {
-	case OBJ_OP_WRITE:
-		__rbd_obj_setup_write(obj_req, 1);
-		break;
-	case OBJ_OP_DISCARD:
-		rbd_assert(!rbd_obj_is_entire(obj_req));
-		__rbd_obj_setup_discard(obj_req, 1);
-		break;
-	default:
-		rbd_assert(0);
+	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+	if (ret)
+		return ret;
+
+	rbd_osd_submit(osd_req);
+	return 0;
+}
+
+static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
+					u32 bytes)
+{
+	struct ceph_osd_request *osd_req;
+	int num_ops = count_write_ops(obj_req);
+	int which = 0;
+	int ret;
+
+	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
+
+	if (bytes != MODS_ONLY)
+		num_ops++; /* copyup */
+
+	osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
+
+	if (bytes != MODS_ONLY) {
+		ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
+		if (ret)
+			return ret;
 	}
 
-	rbd_obj_request_submit(obj_req);
+	rbd_osd_setup_write_ops(osd_req, which);
+	rbd_osd_format_write(osd_req);
+
+	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
+	if (ret)
+		return ret;
+
+	rbd_osd_submit(osd_req);
 	return 0;
 }
 
@@ -2437,7 +3164,12 @@
 	return 0;
 }
 
-static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
+/*
+ * The target object doesn't exist.  Read the data for the entire
+ * target object up to the overlap point (if any) from the parent,
+ * so we can use it for a copyup.
+ */
+static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
 {
 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	int ret;
@@ -2448,174 +3180,503 @@
 	if (!obj_req->num_img_extents) {
 		/*
 		 * The overlap has become 0 (most likely because the
-		 * image has been flattened).  Use rbd_obj_issue_copyup()
-		 * to re-submit the original write request -- the copyup
-		 * operation itself will be a no-op, since someone must
-		 * have populated the child object while we weren't
-		 * looking.  Move to WRITE_FLAT state as we'll be done
-		 * with the operation once the null copyup completes.
+		 * image has been flattened).  Re-submit the original write
+		 * request -- pass MODS_ONLY since the copyup isn't needed
+		 * anymore.
 		 */
-		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
-		return rbd_obj_issue_copyup(obj_req, 0);
+		return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
 	}
 
 	ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
 	if (ret)
 		return ret;
 
-	obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
 	return rbd_obj_read_from_parent(obj_req);
 }
 
-static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
+static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
 {
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	struct ceph_snap_context *snapc = obj_req->img_request->snapc;
+	u8 new_state;
+	u32 i;
+	int ret;
+
+	rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
+
+	if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+		return;
+
+	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
+		return;
+
+	for (i = 0; i < snapc->num_snaps; i++) {
+		if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
+		    i + 1 < snapc->num_snaps)
+			new_state = OBJECT_EXISTS_CLEAN;
+		else
+			new_state = OBJECT_EXISTS;
+
+		ret = rbd_object_map_update(obj_req, snapc->snaps[i],
+					    new_state, NULL);
+		if (ret < 0) {
+			obj_req->pending.result = ret;
+			return;
+		}
+
+		rbd_assert(!ret);
+		obj_req->pending.num_pending++;
+	}
+}
+
+static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
+{
+	u32 bytes = rbd_obj_img_extents_bytes(obj_req);
+	int ret;
+
+	rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
+
+	/*
+	 * Only send non-zero copyup data to save some I/O and network
+	 * bandwidth -- zero copyup data is equivalent to the object not
+	 * existing.
+	 */
+	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
+		bytes = 0;
+
+	if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
+		/*
+		 * Send a copyup request with an empty snapshot context to
+		 * deep-copyup the object through all existing snapshots.
+		 * A second request with the current snapshot context will be
+		 * sent for the actual modification.
+		 */
+		ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
+		if (ret) {
+			obj_req->pending.result = ret;
+			return;
+		}
+
+		obj_req->pending.num_pending++;
+		bytes = MODS_ONLY;
+	}
+
+	ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
+	if (ret) {
+		obj_req->pending.result = ret;
+		return;
+	}
+
+	obj_req->pending.num_pending++;
+}
+
+static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	int ret;
 
 again:
-	switch (obj_req->write_state) {
-	case RBD_OBJ_WRITE_GUARD:
-		rbd_assert(!obj_req->xferred);
-		if (obj_req->result == -ENOENT) {
-			/*
-			 * The target object doesn't exist.  Read the data for
-			 * the entire target object up to the overlap point (if
-			 * any) from the parent, so we can use it for a copyup.
-			 */
-			ret = rbd_obj_handle_write_guard(obj_req);
-			if (ret) {
-				obj_req->result = ret;
-				return true;
-			}
-			return false;
-		}
-		/* fall through */
-	case RBD_OBJ_WRITE_FLAT:
-		if (!obj_req->result)
-			/*
-			 * There is no such thing as a successful short
-			 * write -- indicate the whole request was satisfied.
-			 */
-			obj_req->xferred = obj_req->ex.oe_len;
-		return true;
-	case RBD_OBJ_WRITE_COPYUP:
-		obj_req->write_state = RBD_OBJ_WRITE_GUARD;
-		if (obj_req->result)
-			goto again;
+	switch (obj_req->copyup_state) {
+	case RBD_OBJ_COPYUP_START:
+		rbd_assert(!*result);
 
-		rbd_assert(obj_req->xferred);
-		ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
+		ret = rbd_obj_copyup_read_parent(obj_req);
 		if (ret) {
-			obj_req->result = ret;
-			obj_req->xferred = 0;
+			*result = ret;
 			return true;
 		}
+		if (obj_req->num_img_extents)
+			obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
+		else
+			obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
 		return false;
+	case RBD_OBJ_COPYUP_READ_PARENT:
+		if (*result)
+			return true;
+
+		if (is_zero_bvecs(obj_req->copyup_bvecs,
+				  rbd_obj_img_extents_bytes(obj_req))) {
+			dout("%s %p detected zeros\n", __func__, obj_req);
+			obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
+		}
+
+		rbd_obj_copyup_object_maps(obj_req);
+		if (!obj_req->pending.num_pending) {
+			*result = obj_req->pending.result;
+			obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
+			goto again;
+		}
+		obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
+		return false;
+	case __RBD_OBJ_COPYUP_OBJECT_MAPS:
+		if (!pending_result_dec(&obj_req->pending, result))
+			return false;
+		fallthrough;
+	case RBD_OBJ_COPYUP_OBJECT_MAPS:
+		if (*result) {
+			rbd_warn(rbd_dev, "snap object map update failed: %d",
+				 *result);
+			return true;
+		}
+
+		rbd_obj_copyup_write_object(obj_req);
+		if (!obj_req->pending.num_pending) {
+			*result = obj_req->pending.result;
+			obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
+			goto again;
+		}
+		obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
+		return false;
+	case __RBD_OBJ_COPYUP_WRITE_OBJECT:
+		if (!pending_result_dec(&obj_req->pending, result))
+			return false;
+		fallthrough;
+	case RBD_OBJ_COPYUP_WRITE_OBJECT:
+		return true;
 	default:
 		BUG();
 	}
 }
 
 /*
- * Returns true if @obj_req is completed, or false otherwise.
+ * Return:
+ *   0 - object map update sent
+ *   1 - object map update isn't needed
+ *  <0 - error
  */
-static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
 {
-	switch (obj_req->img_request->op_type) {
-	case OBJ_OP_READ:
-		return rbd_obj_handle_read(obj_req);
-	case OBJ_OP_WRITE:
-		return rbd_obj_handle_write(obj_req);
-	case OBJ_OP_DISCARD:
-		if (rbd_obj_handle_write(obj_req)) {
-			/*
-			 * Hide -ENOENT from delete/truncate/zero -- discarding
-			 * a non-existent object is not a problem.
-			 */
-			if (obj_req->result == -ENOENT) {
-				obj_req->result = 0;
-				obj_req->xferred = obj_req->ex.oe_len;
-			}
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	u8 current_state = OBJECT_PENDING;
+
+	if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+		return 1;
+
+	if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
+		return 1;
+
+	return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
+				     &current_state);
+}
+
+static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
+{
+	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	int ret;
+
+again:
+	switch (obj_req->write_state) {
+	case RBD_OBJ_WRITE_START:
+		rbd_assert(!*result);
+
+		rbd_obj_set_copyup_enabled(obj_req);
+		if (rbd_obj_write_is_noop(obj_req))
+			return true;
+
+		ret = rbd_obj_write_pre_object_map(obj_req);
+		if (ret < 0) {
+			*result = ret;
 			return true;
 		}
+		obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
+		if (ret > 0)
+			goto again;
 		return false;
+	case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
+		if (*result) {
+			rbd_warn(rbd_dev, "pre object map update failed: %d",
+				 *result);
+			return true;
+		}
+		ret = rbd_obj_write_object(obj_req);
+		if (ret) {
+			*result = ret;
+			return true;
+		}
+		obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
+		return false;
+	case RBD_OBJ_WRITE_OBJECT:
+		if (*result == -ENOENT) {
+			if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
+				*result = 0;
+				obj_req->copyup_state = RBD_OBJ_COPYUP_START;
+				obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
+				goto again;
+			}
+			/*
+			 * On a non-existent object:
+			 *   delete - -ENOENT, truncate/zero - 0
+			 */
+			if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
+				*result = 0;
+		}
+		if (*result)
+			return true;
+
+		obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+		goto again;
+	case __RBD_OBJ_WRITE_COPYUP:
+		if (!rbd_obj_advance_copyup(obj_req, result))
+			return false;
+		fallthrough;
+	case RBD_OBJ_WRITE_COPYUP:
+		if (*result) {
+			rbd_warn(rbd_dev, "copyup failed: %d", *result);
+			return true;
+		}
+		ret = rbd_obj_write_post_object_map(obj_req);
+		if (ret < 0) {
+			*result = ret;
+			return true;
+		}
+		obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
+		if (ret > 0)
+			goto again;
+		return false;
+	case RBD_OBJ_WRITE_POST_OBJECT_MAP:
+		if (*result)
+			rbd_warn(rbd_dev, "post object map update failed: %d",
+				 *result);
+		return true;
 	default:
 		BUG();
 	}
 }
 
-static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
+/*
+ * Return true if @obj_req is completed.
+ */
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
+				     int *result)
 {
 	struct rbd_img_request *img_req = obj_req->img_request;
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+	bool done;
 
-	rbd_assert((!obj_req->result &&
-		    obj_req->xferred == obj_req->ex.oe_len) ||
-		   (obj_req->result < 0 && !obj_req->xferred));
-	if (!obj_req->result) {
-		img_req->xferred += obj_req->xferred;
-		return;
-	}
+	mutex_lock(&obj_req->state_mutex);
+	if (!rbd_img_is_write(img_req))
+		done = rbd_obj_advance_read(obj_req, result);
+	else
+		done = rbd_obj_advance_write(obj_req, result);
+	mutex_unlock(&obj_req->state_mutex);
 
-	rbd_warn(img_req->rbd_dev,
-		 "%s at objno %llu %llu~%llu result %d xferred %llu",
-		 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
-		 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
-		 obj_req->xferred);
-	if (!img_req->result) {
-		img_req->result = obj_req->result;
-		img_req->xferred = 0;
+	if (done && *result) {
+		rbd_assert(*result < 0);
+		rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
+			 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
+			 obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
 	}
+	return done;
 }
 
-static void rbd_img_end_child_request(struct rbd_img_request *img_req)
+/*
+ * This is open-coded in rbd_img_handle_request() to avoid parent chain
+ * recursion.
+ */
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
 {
-	struct rbd_obj_request *obj_req = img_req->obj_request;
-
-	rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
-	rbd_assert((!img_req->result &&
-		    img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
-		   (img_req->result < 0 && !img_req->xferred));
-
-	obj_req->result = img_req->result;
-	obj_req->xferred = img_req->xferred;
-	rbd_img_request_put(img_req);
+	if (__rbd_obj_handle_request(obj_req, &result))
+		rbd_img_handle_request(obj_req->img_request, result);
 }
 
-static void rbd_img_end_request(struct rbd_img_request *img_req)
+static bool need_exclusive_lock(struct rbd_img_request *img_req)
 {
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+
+	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
+		return false;
+
+	if (rbd_is_ro(rbd_dev))
+		return false;
+
 	rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
-	rbd_assert((!img_req->result &&
-		    img_req->xferred == blk_rq_bytes(img_req->rq)) ||
-		   (img_req->result < 0 && !img_req->xferred));
+	if (rbd_dev->opts->lock_on_read ||
+	    (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
+		return true;
 
-	blk_mq_end_request(img_req->rq,
-			   errno_to_blk_status(img_req->result));
-	rbd_img_request_put(img_req);
+	return rbd_img_is_write(img_req);
 }
 
-static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+static bool rbd_lock_add_request(struct rbd_img_request *img_req)
 {
-	struct rbd_img_request *img_req;
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+	bool locked;
+
+	lockdep_assert_held(&rbd_dev->lock_rwsem);
+	locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
+	spin_lock(&rbd_dev->lock_lists_lock);
+	rbd_assert(list_empty(&img_req->lock_item));
+	if (!locked)
+		list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
+	else
+		list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
+	spin_unlock(&rbd_dev->lock_lists_lock);
+	return locked;
+}
+
+static void rbd_lock_del_request(struct rbd_img_request *img_req)
+{
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+	bool need_wakeup;
+
+	lockdep_assert_held(&rbd_dev->lock_rwsem);
+	spin_lock(&rbd_dev->lock_lists_lock);
+	rbd_assert(!list_empty(&img_req->lock_item));
+	list_del_init(&img_req->lock_item);
+	need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
+		       list_empty(&rbd_dev->running_list));
+	spin_unlock(&rbd_dev->lock_lists_lock);
+	if (need_wakeup)
+		complete(&rbd_dev->releasing_wait);
+}
+
+static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
+{
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+
+	if (!need_exclusive_lock(img_req))
+		return 1;
+
+	if (rbd_lock_add_request(img_req))
+		return 1;
+
+	if (rbd_dev->opts->exclusive) {
+		WARN_ON(1); /* lock got released? */
+		return -EROFS;
+	}
+
+	/*
+	 * Note the use of mod_delayed_work() in rbd_acquire_lock()
+	 * and cancel_delayed_work() in wake_lock_waiters().
+	 */
+	dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+	queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+	return 0;
+}
+
+static void rbd_img_object_requests(struct rbd_img_request *img_req)
+{
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+	struct rbd_obj_request *obj_req;
+
+	rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
+	rbd_assert(!need_exclusive_lock(img_req) ||
+		   __rbd_is_lock_owner(rbd_dev));
+
+	if (rbd_img_is_write(img_req)) {
+		rbd_assert(!img_req->snapc);
+		down_read(&rbd_dev->header_rwsem);
+		img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+		up_read(&rbd_dev->header_rwsem);
+	}
+
+	for_each_obj_request(img_req, obj_req) {
+		int result = 0;
+
+		if (__rbd_obj_handle_request(obj_req, &result)) {
+			if (result) {
+				img_req->pending.result = result;
+				return;
+			}
+		} else {
+			img_req->pending.num_pending++;
+		}
+	}
+}
+
+static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
+{
+	int ret;
 
 again:
-	if (!__rbd_obj_handle_request(obj_req))
-		return;
+	switch (img_req->state) {
+	case RBD_IMG_START:
+		rbd_assert(!*result);
 
-	img_req = obj_req->img_request;
-	spin_lock(&img_req->completion_lock);
-	rbd_obj_end_request(obj_req);
-	rbd_assert(img_req->pending_count);
-	if (--img_req->pending_count) {
-		spin_unlock(&img_req->completion_lock);
-		return;
+		ret = rbd_img_exclusive_lock(img_req);
+		if (ret < 0) {
+			*result = ret;
+			return true;
+		}
+		img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
+		if (ret > 0)
+			goto again;
+		return false;
+	case RBD_IMG_EXCLUSIVE_LOCK:
+		if (*result)
+			return true;
+
+		rbd_img_object_requests(img_req);
+		if (!img_req->pending.num_pending) {
+			*result = img_req->pending.result;
+			img_req->state = RBD_IMG_OBJECT_REQUESTS;
+			goto again;
+		}
+		img_req->state = __RBD_IMG_OBJECT_REQUESTS;
+		return false;
+	case __RBD_IMG_OBJECT_REQUESTS:
+		if (!pending_result_dec(&img_req->pending, result))
+			return false;
+		fallthrough;
+	case RBD_IMG_OBJECT_REQUESTS:
+		return true;
+	default:
+		BUG();
+	}
+}
+
+/*
+ * Return true if @img_req is completed.
+ */
+static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
+				     int *result)
+{
+	struct rbd_device *rbd_dev = img_req->rbd_dev;
+	bool done;
+
+	if (need_exclusive_lock(img_req)) {
+		down_read(&rbd_dev->lock_rwsem);
+		mutex_lock(&img_req->state_mutex);
+		done = rbd_img_advance(img_req, result);
+		if (done)
+			rbd_lock_del_request(img_req);
+		mutex_unlock(&img_req->state_mutex);
+		up_read(&rbd_dev->lock_rwsem);
+	} else {
+		mutex_lock(&img_req->state_mutex);
+		done = rbd_img_advance(img_req, result);
+		mutex_unlock(&img_req->state_mutex);
 	}
 
-	spin_unlock(&img_req->completion_lock);
+	if (done && *result) {
+		rbd_assert(*result < 0);
+		rbd_warn(rbd_dev, "%s%s result %d",
+		      test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
+		      obj_op_name(img_req->op_type), *result);
+	}
+	return done;
+}
+
+static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
+{
+again:
+	if (!__rbd_img_handle_request(img_req, &result))
+		return;
+
 	if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
-		obj_req = img_req->obj_request;
-		rbd_img_end_child_request(img_req);
-		goto again;
+		struct rbd_obj_request *obj_req = img_req->obj_request;
+
+		rbd_img_request_destroy(img_req);
+		if (__rbd_obj_handle_request(obj_req, &result)) {
+			img_req = obj_req->img_request;
+			goto again;
+		}
+	} else {
+		struct request *rq = blk_mq_rq_from_pdu(img_req);
+
+		rbd_img_request_destroy(img_req);
+		blk_mq_end_request(rq, errno_to_blk_status(result));
 	}
-	rbd_img_end_request(img_req);
 }
 
 static const struct rbd_client_id rbd_empty_cid;
@@ -2660,6 +3721,7 @@
 {
 	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
 
+	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
 	strcpy(rbd_dev->lock_cookie, cookie);
 	rbd_set_owner_cid(rbd_dev, &cid);
 	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
@@ -2681,10 +3743,9 @@
 	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
 			    RBD_LOCK_TAG, "", 0);
-	if (ret)
+	if (ret && ret != -EEXIST)
 		return ret;
 
-	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
 	__rbd_lock(rbd_dev, cookie);
 	return 0;
 }
@@ -2703,7 +3764,7 @@
 	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 			      RBD_LOCK_NAME, rbd_dev->lock_cookie);
 	if (ret && ret != -ENOENT)
-		rbd_warn(rbd_dev, "failed to unlock: %d", ret);
+		rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
 
 	/* treat errors as the image is unlocked */
 	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
@@ -2739,11 +3800,7 @@
 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
 			       enum rbd_notify_op notify_op)
 {
-	struct page **reply_pages;
-	size_t reply_len;
-
-	__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
-	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+	__rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
 }
 
 static void rbd_notify_acquired_lock(struct work_struct *work)
@@ -2830,21 +3887,56 @@
 	goto out;
 }
 
-static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+/*
+ * Either image request state machine(s) or rbd_add_acquire_lock()
+ * (i.e. "rbd map").
+ */
+static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
 {
-	dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+	struct rbd_img_request *img_req;
+
+	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+	lockdep_assert_held_write(&rbd_dev->lock_rwsem);
 
 	cancel_delayed_work(&rbd_dev->lock_dwork);
-	if (wake_all)
-		wake_up_all(&rbd_dev->lock_waitq);
-	else
-		wake_up(&rbd_dev->lock_waitq);
+	if (!completion_done(&rbd_dev->acquire_wait)) {
+		rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
+			   list_empty(&rbd_dev->running_list));
+		rbd_dev->acquire_err = result;
+		complete_all(&rbd_dev->acquire_wait);
+		return;
+	}
+
+	list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
+		mutex_lock(&img_req->state_mutex);
+		rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
+		rbd_img_schedule(img_req, result);
+		mutex_unlock(&img_req->state_mutex);
+	}
+
+	list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
 }
 
-static int get_lock_owner_info(struct rbd_device *rbd_dev,
-			       struct ceph_locker **lockers, u32 *num_lockers)
+static bool locker_equal(const struct ceph_locker *lhs,
+			 const struct ceph_locker *rhs)
+{
+	return lhs->id.name.type == rhs->id.name.type &&
+	       lhs->id.name.num == rhs->id.name.num &&
+	       !strcmp(lhs->id.cookie, rhs->id.cookie) &&
+	       ceph_addr_equal_no_type(&lhs->info.addr, &rhs->info.addr);
+}
+
+static void free_locker(struct ceph_locker *locker)
+{
+	if (locker)
+		ceph_free_lockers(locker, 1);
+}
+
+static struct ceph_locker *get_lock_owner_info(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct ceph_locker *lockers;
+	u32 num_lockers;
 	u8 lock_type;
 	char *lock_tag;
 	int ret;
@@ -2853,39 +3945,45 @@
 
 	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
 				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
-				 &lock_type, &lock_tag, lockers, num_lockers);
-	if (ret)
-		return ret;
+				 &lock_type, &lock_tag, &lockers, &num_lockers);
+	if (ret) {
+		rbd_warn(rbd_dev, "failed to get header lockers: %d", ret);
+		return ERR_PTR(ret);
+	}
 
-	if (*num_lockers == 0) {
+	if (num_lockers == 0) {
 		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
+		lockers = NULL;
 		goto out;
 	}
 
 	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
 		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
 			 lock_tag);
-		ret = -EBUSY;
-		goto out;
+		goto err_busy;
 	}
 
 	if (lock_type == CEPH_CLS_LOCK_SHARED) {
 		rbd_warn(rbd_dev, "shared lock type detected");
-		ret = -EBUSY;
-		goto out;
+		goto err_busy;
 	}
 
-	if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
+	WARN_ON(num_lockers != 1);
+	if (strncmp(lockers[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
 		    strlen(RBD_LOCK_COOKIE_PREFIX))) {
 		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
-			 (*lockers)[0].id.cookie);
-		ret = -EBUSY;
-		goto out;
+			 lockers[0].id.cookie);
+		goto err_busy;
 	}
 
 out:
 	kfree(lock_tag);
-	return ret;
+	return lockers;
+
+err_busy:
+	kfree(lock_tag);
+	ceph_free_lockers(lockers, num_lockers);
+	return ERR_PTR(-EBUSY);
 }
 
 static int find_watcher(struct rbd_device *rbd_dev,
@@ -2901,13 +3999,19 @@
 	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
 				      &rbd_dev->header_oloc, &watchers,
 				      &num_watchers);
-	if (ret)
+	if (ret) {
+		rbd_warn(rbd_dev, "failed to get watchers: %d", ret);
 		return ret;
+	}
 
 	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
 	for (i = 0; i < num_watchers; i++) {
-		if (!memcmp(&watchers[i].addr, &locker->info.addr,
-			    sizeof(locker->info.addr)) &&
+		/*
+		 * Ignore addr->type while comparing.  This mimics
+		 * entity_addr_t::get_legacy_str() + strcmp().
+		 */
+		if (ceph_addr_equal_no_type(&watchers[i].addr,
+					    &locker->info.addr) &&
 		    watchers[i].cookie == cookie) {
 			struct rbd_client_id cid = {
 				.gid = le64_to_cpu(watchers[i].name.num),
@@ -2935,104 +4039,160 @@
 static int rbd_try_lock(struct rbd_device *rbd_dev)
 {
 	struct ceph_client *client = rbd_dev->rbd_client->client;
-	struct ceph_locker *lockers;
-	u32 num_lockers;
+	struct ceph_locker *locker, *refreshed_locker;
 	int ret;
 
 	for (;;) {
+		locker = refreshed_locker = NULL;
+
 		ret = rbd_lock(rbd_dev);
-		if (ret != -EBUSY)
-			return ret;
-
-		/* determine if the current lock holder is still alive */
-		ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
-		if (ret)
-			return ret;
-
-		if (num_lockers == 0)
-			goto again;
-
-		ret = find_watcher(rbd_dev, lockers);
-		if (ret) {
-			if (ret > 0)
-				ret = 0; /* have to request lock */
+		if (!ret)
+			goto out;
+		if (ret != -EBUSY) {
+			rbd_warn(rbd_dev, "failed to lock header: %d", ret);
 			goto out;
 		}
 
-		rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
-			 ENTITY_NAME(lockers[0].id.name));
+		/* determine if the current lock holder is still alive */
+		locker = get_lock_owner_info(rbd_dev);
+		if (IS_ERR(locker)) {
+			ret = PTR_ERR(locker);
+			locker = NULL;
+			goto out;
+		}
+		if (!locker)
+			goto again;
 
-		ret = ceph_monc_blacklist_add(&client->monc,
-					      &lockers[0].info.addr);
+		ret = find_watcher(rbd_dev, locker);
+		if (ret)
+			goto out; /* request lock or error */
+
+		refreshed_locker = get_lock_owner_info(rbd_dev);
+		if (IS_ERR(refreshed_locker)) {
+			ret = PTR_ERR(refreshed_locker);
+			refreshed_locker = NULL;
+			goto out;
+		}
+		if (!refreshed_locker ||
+		    !locker_equal(locker, refreshed_locker))
+			goto again;
+
+		rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
+			 ENTITY_NAME(locker->id.name));
+
+		ret = ceph_monc_blocklist_add(&client->monc,
+					      &locker->info.addr);
 		if (ret) {
-			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
-				 ENTITY_NAME(lockers[0].id.name), ret);
+			rbd_warn(rbd_dev, "failed to blocklist %s%llu: %d",
+				 ENTITY_NAME(locker->id.name), ret);
 			goto out;
 		}
 
 		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
 					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
-					  lockers[0].id.cookie,
-					  &lockers[0].id.name);
-		if (ret && ret != -ENOENT)
+					  locker->id.cookie, &locker->id.name);
+		if (ret && ret != -ENOENT) {
+			rbd_warn(rbd_dev, "failed to break header lock: %d",
+				 ret);
 			goto out;
+		}
 
 again:
-		ceph_free_lockers(lockers, num_lockers);
+		free_locker(refreshed_locker);
+		free_locker(locker);
 	}
 
 out:
-	ceph_free_lockers(lockers, num_lockers);
+	free_locker(refreshed_locker);
+	free_locker(locker);
 	return ret;
 }
 
-/*
- * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
- */
-static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
-						int *pret)
+static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
 {
-	enum rbd_lock_state lock_state;
+	int ret;
+
+	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		return ret;
+
+	if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
+		ret = rbd_object_map_open(rbd_dev);
+		if (ret)
+			return ret;
+	}
+
+	return 0;
+}
+
+/*
+ * Return:
+ *   0 - lock acquired
+ *   1 - caller should call rbd_request_lock()
+ *  <0 - error
+ */
+static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
+{
+	int ret;
 
 	down_read(&rbd_dev->lock_rwsem);
 	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
 	     rbd_dev->lock_state);
 	if (__rbd_is_lock_owner(rbd_dev)) {
-		lock_state = rbd_dev->lock_state;
 		up_read(&rbd_dev->lock_rwsem);
-		return lock_state;
+		return 0;
 	}
 
 	up_read(&rbd_dev->lock_rwsem);
 	down_write(&rbd_dev->lock_rwsem);
 	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
 	     rbd_dev->lock_state);
-	if (!__rbd_is_lock_owner(rbd_dev)) {
-		*pret = rbd_try_lock(rbd_dev);
-		if (*pret)
-			rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+	if (__rbd_is_lock_owner(rbd_dev)) {
+		up_write(&rbd_dev->lock_rwsem);
+		return 0;
 	}
 
-	lock_state = rbd_dev->lock_state;
+	ret = rbd_try_lock(rbd_dev);
+	if (ret < 0) {
+		rbd_warn(rbd_dev, "failed to acquire lock: %d", ret);
+		goto out;
+	}
+	if (ret > 0) {
+		up_write(&rbd_dev->lock_rwsem);
+		return ret;
+	}
+
+	rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
+	rbd_assert(list_empty(&rbd_dev->running_list));
+
+	ret = rbd_post_acquire_action(rbd_dev);
+	if (ret) {
+		rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
+		/*
+		 * Can't stay in RBD_LOCK_STATE_LOCKED because
+		 * rbd_lock_add_request() would let the request through,
+		 * assuming that e.g. object map is locked and loaded.
+		 */
+		rbd_unlock(rbd_dev);
+	}
+
+out:
+	wake_lock_waiters(rbd_dev, ret);
 	up_write(&rbd_dev->lock_rwsem);
-	return lock_state;
+	return ret;
 }
 
 static void rbd_acquire_lock(struct work_struct *work)
 {
 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
 					    struct rbd_device, lock_dwork);
-	enum rbd_lock_state lock_state;
-	int ret = 0;
+	int ret;
 
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 again:
-	lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
-	if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
-		if (lock_state == RBD_LOCK_STATE_LOCKED)
-			wake_requests(rbd_dev, true);
-		dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
-		     rbd_dev, lock_state, ret);
+	ret = rbd_try_acquire_lock(rbd_dev);
+	if (ret <= 0) {
+		dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
 		return;
 	}
 
@@ -3041,16 +4201,9 @@
 		goto again; /* treat this as a dead client */
 	} else if (ret == -EROFS) {
 		rbd_warn(rbd_dev, "peer will not release lock");
-		/*
-		 * If this is rbd_add_acquire_lock(), we want to fail
-		 * immediately -- reuse BLACKLISTED flag.  Otherwise we
-		 * want to block.
-		 */
-		if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
-			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-			/* wake "rbd map --exclusive" process */
-			wake_requests(rbd_dev, false);
-		}
+		down_write(&rbd_dev->lock_rwsem);
+		wake_lock_waiters(rbd_dev, ret);
+		up_write(&rbd_dev->lock_rwsem);
 	} else if (ret < 0) {
 		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3060,50 +4213,72 @@
 		 * lock owner acked, but resend if we don't see them
 		 * release the lock
 		 */
-		dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
+		dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
 		     rbd_dev);
 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
 		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
 	}
 }
 
-/*
- * lock_rwsem must be held for write
- */
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
 {
-	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
-	     rbd_dev->lock_state);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+	lockdep_assert_held_write(&rbd_dev->lock_rwsem);
+
 	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
 		return false;
 
-	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
-	downgrade_write(&rbd_dev->lock_rwsem);
 	/*
 	 * Ensure that all in-flight IO is flushed.
-	 *
-	 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
-	 * may be shared with other devices.
 	 */
-	ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
-	up_read(&rbd_dev->lock_rwsem);
+	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+	rbd_assert(!completion_done(&rbd_dev->releasing_wait));
+	if (list_empty(&rbd_dev->running_list))
+		return true;
+
+	up_write(&rbd_dev->lock_rwsem);
+	wait_for_completion(&rbd_dev->releasing_wait);
 
 	down_write(&rbd_dev->lock_rwsem);
-	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
-	     rbd_dev->lock_state);
 	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
 		return false;
 
+	rbd_assert(list_empty(&rbd_dev->running_list));
+	return true;
+}
+
+static void rbd_pre_release_action(struct rbd_device *rbd_dev)
+{
+	if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
+		rbd_object_map_close(rbd_dev);
+}
+
+static void __rbd_release_lock(struct rbd_device *rbd_dev)
+{
+	rbd_assert(list_empty(&rbd_dev->running_list));
+
+	rbd_pre_release_action(rbd_dev);
 	rbd_unlock(rbd_dev);
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_release_lock(struct rbd_device *rbd_dev)
+{
+	if (!rbd_quiesce_lock(rbd_dev))
+		return;
+
+	__rbd_release_lock(rbd_dev);
+
 	/*
 	 * Give others a chance to grab the lock - we would re-acquire
-	 * almost immediately if we got new IO during ceph_osdc_sync()
-	 * otherwise.  We need to ack our own notifications, so this
-	 * lock_dwork will be requeued from rbd_wait_state_locked()
-	 * after wake_requests() in rbd_handle_released_lock().
+	 * almost immediately if we got new IO while draining the running
+	 * list otherwise.  We need to ack our own notifications, so this
+	 * lock_dwork will be requeued from rbd_handle_released_lock() by
+	 * way of maybe_kick_acquire().
 	 */
 	cancel_delayed_work(&rbd_dev->lock_dwork);
-	return true;
 }
 
 static void rbd_release_lock_work(struct work_struct *work)
@@ -3114,6 +4289,23 @@
 	down_write(&rbd_dev->lock_rwsem);
 	rbd_release_lock(rbd_dev);
 	up_write(&rbd_dev->lock_rwsem);
+}
+
+static void maybe_kick_acquire(struct rbd_device *rbd_dev)
+{
+	bool have_requests;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+	if (__rbd_is_lock_owner(rbd_dev))
+		return;
+
+	spin_lock(&rbd_dev->lock_lists_lock);
+	have_requests = !list_empty(&rbd_dev->acquiring_list);
+	spin_unlock(&rbd_dev->lock_lists_lock);
+	if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
+		dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+	}
 }
 
 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
@@ -3131,22 +4323,17 @@
 	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
 		down_write(&rbd_dev->lock_rwsem);
 		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
-			/*
-			 * we already know that the remote client is
-			 * the owner
-			 */
-			up_write(&rbd_dev->lock_rwsem);
-			return;
+			dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
+			     __func__, rbd_dev, cid.gid, cid.handle);
+		} else {
+			rbd_set_owner_cid(rbd_dev, &cid);
 		}
-
-		rbd_set_owner_cid(rbd_dev, &cid);
 		downgrade_write(&rbd_dev->lock_rwsem);
 	} else {
 		down_read(&rbd_dev->lock_rwsem);
 	}
 
-	if (!__rbd_is_lock_owner(rbd_dev))
-		wake_requests(rbd_dev, false);
+	maybe_kick_acquire(rbd_dev);
 	up_read(&rbd_dev->lock_rwsem);
 }
 
@@ -3165,21 +4352,18 @@
 	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
 		down_write(&rbd_dev->lock_rwsem);
 		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
-			dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
+			dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
 			     __func__, rbd_dev, cid.gid, cid.handle,
 			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
-			up_write(&rbd_dev->lock_rwsem);
-			return;
+		} else {
+			rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
 		}
-
-		rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
 		downgrade_write(&rbd_dev->lock_rwsem);
 	} else {
 		down_read(&rbd_dev->lock_rwsem);
 	}
 
-	if (!__rbd_is_lock_owner(rbd_dev))
-		wake_requests(rbd_dev, false);
+	maybe_kick_acquire(rbd_dev);
 	up_read(&rbd_dev->lock_rwsem);
 }
 
@@ -3433,7 +4617,6 @@
  */
 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
-	WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
 	cancel_tasks_sync(rbd_dev);
 
 	mutex_lock(&rbd_dev->watch_mutex);
@@ -3455,7 +4638,8 @@
 	char cookie[32];
 	int ret;
 
-	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+	if (!rbd_quiesce_lock(rbd_dev))
+		return;
 
 	format_lock_cookie(rbd_dev, cookie);
 	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
@@ -3471,11 +4655,11 @@
 		 * Lock cookie cannot be updated on older OSDs, so do
 		 * a manual release and queue an acquire.
 		 */
-		if (rbd_release_lock(rbd_dev))
-			queue_delayed_work(rbd_dev->task_wq,
-					   &rbd_dev->lock_dwork, 0);
+		__rbd_release_lock(rbd_dev);
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
 	} else {
 		__rbd_lock(rbd_dev, cookie);
+		wake_lock_waiters(rbd_dev, 0);
 	}
 }
 
@@ -3496,15 +4680,18 @@
 	ret = __rbd_register_watch(rbd_dev);
 	if (ret) {
 		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
-		if (ret == -EBLACKLISTED || ret == -ENOENT) {
-			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-			wake_requests(rbd_dev, true);
-		} else {
+		if (ret != -EBLOCKLISTED && ret != -ENOENT) {
 			queue_delayed_work(rbd_dev->task_wq,
 					   &rbd_dev->watch_dwork,
 					   RBD_RETRY_DELAY);
+			mutex_unlock(&rbd_dev->watch_mutex);
+			return;
 		}
+
 		mutex_unlock(&rbd_dev->watch_mutex);
+		down_write(&rbd_dev->lock_rwsem);
+		wake_lock_waiters(rbd_dev, ret);
+		up_write(&rbd_dev->lock_rwsem);
 		return;
 	}
 
@@ -3567,7 +4754,7 @@
 
 	ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
 			     CEPH_OSD_FLAG_READ, req_page, outbound_size,
-			     reply_page, &inbound_size);
+			     &reply_page, &inbound_size);
 	if (!ret) {
 		memcpy(inbound, page_address(reply_page), inbound_size);
 		ret = inbound_size;
@@ -3579,71 +4766,74 @@
 	return ret;
 }
 
-/*
- * lock_rwsem must be held for read
- */
-static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
-{
-	DEFINE_WAIT(wait);
-	unsigned long timeout;
-	int ret = 0;
-
-	if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
-		return -EBLACKLISTED;
-
-	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
-		return 0;
-
-	if (!may_acquire) {
-		rbd_warn(rbd_dev, "exclusive lock required");
-		return -EROFS;
-	}
-
-	do {
-		/*
-		 * Note the use of mod_delayed_work() in rbd_acquire_lock()
-		 * and cancel_delayed_work() in wake_requests().
-		 */
-		dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
-		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
-		prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
-					  TASK_UNINTERRUPTIBLE);
-		up_read(&rbd_dev->lock_rwsem);
-		timeout = schedule_timeout(ceph_timeout_jiffies(
-						rbd_dev->opts->lock_timeout));
-		down_read(&rbd_dev->lock_rwsem);
-		if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
-			ret = -EBLACKLISTED;
-			break;
-		}
-		if (!timeout) {
-			rbd_warn(rbd_dev, "timed out waiting for lock");
-			ret = -ETIMEDOUT;
-			break;
-		}
-	} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
-
-	finish_wait(&rbd_dev->lock_waitq, &wait);
-	return ret;
-}
-
 static void rbd_queue_workfn(struct work_struct *work)
 {
-	struct request *rq = blk_mq_rq_from_pdu(work);
-	struct rbd_device *rbd_dev = rq->q->queuedata;
-	struct rbd_img_request *img_request;
-	struct ceph_snap_context *snapc = NULL;
+	struct rbd_img_request *img_request =
+	    container_of(work, struct rbd_img_request, work);
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
+	enum obj_operation_type op_type = img_request->op_type;
+	struct request *rq = blk_mq_rq_from_pdu(img_request);
 	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
 	u64 length = blk_rq_bytes(rq);
-	enum obj_operation_type op_type;
 	u64 mapping_size;
-	bool must_be_locked;
 	int result;
 
-	switch (req_op(rq)) {
+	/* Ignore/skip any zero-length requests */
+	if (!length) {
+		dout("%s: zero-length request\n", __func__);
+		result = 0;
+		goto err_img_request;
+	}
+
+	blk_mq_start_request(rq);
+
+	down_read(&rbd_dev->header_rwsem);
+	mapping_size = rbd_dev->mapping.size;
+	rbd_img_capture_header(img_request);
+	up_read(&rbd_dev->header_rwsem);
+
+	if (offset + length > mapping_size) {
+		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+			 length, mapping_size);
+		result = -EIO;
+		goto err_img_request;
+	}
+
+	dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
+	     img_request, obj_op_name(op_type), offset, length);
+
+	if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
+		result = rbd_img_fill_nodata(img_request, offset, length);
+	else
+		result = rbd_img_fill_from_bio(img_request, offset, length,
+					       rq->bio);
+	if (result)
+		goto err_img_request;
+
+	rbd_img_handle_request(img_request, 0);
+	return;
+
+err_img_request:
+	rbd_img_request_destroy(img_request);
+	if (result)
+		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+			 obj_op_name(op_type), length, offset, result);
+	blk_mq_end_request(rq, errno_to_blk_status(result));
+}
+
+static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
+		const struct blk_mq_queue_data *bd)
+{
+	struct rbd_device *rbd_dev = hctx->queue->queuedata;
+	struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
+	enum obj_operation_type op_type;
+
+	switch (req_op(bd->rq)) {
 	case REQ_OP_DISCARD:
-	case REQ_OP_WRITE_ZEROES:
 		op_type = OBJ_OP_DISCARD;
+		break;
+	case REQ_OP_WRITE_ZEROES:
+		op_type = OBJ_OP_ZEROOUT;
 		break;
 	case REQ_OP_WRITE:
 		op_type = OBJ_OP_WRITE;
@@ -3652,112 +4842,23 @@
 		op_type = OBJ_OP_READ;
 		break;
 	default:
-		dout("%s: non-fs request type %d\n", __func__, req_op(rq));
-		result = -EIO;
-		goto err;
+		rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
+		return BLK_STS_IOERR;
 	}
 
-	/* Ignore/skip any zero-length requests */
+	rbd_img_request_init(img_req, rbd_dev, op_type);
 
-	if (!length) {
-		dout("%s: zero-length request\n", __func__);
-		result = 0;
-		goto err_rq;
+	if (rbd_img_is_write(img_req)) {
+		if (rbd_is_ro(rbd_dev)) {
+			rbd_warn(rbd_dev, "%s on read-only mapping",
+				 obj_op_name(img_req->op_type));
+			return BLK_STS_IOERR;
+		}
+		rbd_assert(!rbd_is_snap(rbd_dev));
 	}
 
-	rbd_assert(op_type == OBJ_OP_READ ||
-		   rbd_dev->spec->snap_id == CEPH_NOSNAP);
-
-	/*
-	 * Quit early if the mapped snapshot no longer exists.  It's
-	 * still possible the snapshot will have disappeared by the
-	 * time our request arrives at the osd, but there's no sense in
-	 * sending it if we already know.
-	 */
-	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
-		dout("request for non-existent snapshot");
-		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-		result = -ENXIO;
-		goto err_rq;
-	}
-
-	if (offset && length > U64_MAX - offset + 1) {
-		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
-			 length);
-		result = -EINVAL;
-		goto err_rq;	/* Shouldn't happen */
-	}
-
-	blk_mq_start_request(rq);
-
-	down_read(&rbd_dev->header_rwsem);
-	mapping_size = rbd_dev->mapping.size;
-	if (op_type != OBJ_OP_READ) {
-		snapc = rbd_dev->header.snapc;
-		ceph_get_snap_context(snapc);
-	}
-	up_read(&rbd_dev->header_rwsem);
-
-	if (offset + length > mapping_size) {
-		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
-			 length, mapping_size);
-		result = -EIO;
-		goto err_rq;
-	}
-
-	must_be_locked =
-	    (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
-	    (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
-	if (must_be_locked) {
-		down_read(&rbd_dev->lock_rwsem);
-		result = rbd_wait_state_locked(rbd_dev,
-					       !rbd_dev->opts->exclusive);
-		if (result)
-			goto err_unlock;
-	}
-
-	img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
-	if (!img_request) {
-		result = -ENOMEM;
-		goto err_unlock;
-	}
-	img_request->rq = rq;
-	snapc = NULL; /* img_request consumes a ref */
-
-	if (op_type == OBJ_OP_DISCARD)
-		result = rbd_img_fill_nodata(img_request, offset, length);
-	else
-		result = rbd_img_fill_from_bio(img_request, offset, length,
-					       rq->bio);
-	if (result)
-		goto err_img_request;
-
-	rbd_img_request_submit(img_request);
-	if (must_be_locked)
-		up_read(&rbd_dev->lock_rwsem);
-	return;
-
-err_img_request:
-	rbd_img_request_put(img_request);
-err_unlock:
-	if (must_be_locked)
-		up_read(&rbd_dev->lock_rwsem);
-err_rq:
-	if (result)
-		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
-			 obj_op_name(op_type), length, offset, result);
-	ceph_put_snap_context(snapc);
-err:
-	blk_mq_end_request(rq, errno_to_blk_status(result));
-}
-
-static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
-		const struct blk_mq_queue_data *bd)
-{
-	struct request *rq = bd->rq;
-	struct work_struct *work = blk_mq_rq_to_pdu(rq);
-
-	queue_work(rbd_wq, work);
+	INIT_WORK(&img_req->work, rbd_queue_workfn);
+	queue_work(rbd_wq, &img_req->work);
 	return BLK_STS_OK;
 }
 
@@ -3789,10 +4890,6 @@
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
-	if (ret)
-		goto out_req;
-
 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 	if (IS_ERR(pages)) {
 		ret = PTR_ERR(pages);
@@ -3802,6 +4899,10 @@
 	osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
 	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
 					 true);
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out_req;
 
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
@@ -3818,7 +4919,9 @@
  * return, the rbd_dev->header field will contain up-to-date
  * information about the image.
  */
-static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
+static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
+				  struct rbd_image_header *header,
+				  bool first_time)
 {
 	struct rbd_image_header_ondisk *ondisk = NULL;
 	u32 snap_count = 0;
@@ -3866,30 +4969,11 @@
 		snap_count = le32_to_cpu(ondisk->snap_count);
 	} while (snap_count != want_count);
 
-	ret = rbd_header_from_disk(rbd_dev, ondisk);
+	ret = rbd_header_from_disk(header, ondisk, first_time);
 out:
 	kfree(ondisk);
 
 	return ret;
-}
-
-/*
- * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
- * has disappeared from the (just updated) snapshot context.
- */
-static void rbd_exists_validate(struct rbd_device *rbd_dev)
-{
-	u64 snap_id;
-
-	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
-		return;
-
-	snap_id = rbd_dev->spec->snap_id;
-	if (snap_id == CEPH_NOSNAP)
-		return;
-
-	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
-		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 }
 
 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
@@ -3906,59 +4990,12 @@
 		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
 		dout("setting size to %llu sectors", (unsigned long long)size);
 		set_capacity(rbd_dev->disk, size);
-		revalidate_disk(rbd_dev->disk);
+		revalidate_disk_size(rbd_dev->disk, true);
 	}
-}
-
-static int rbd_dev_refresh(struct rbd_device *rbd_dev)
-{
-	u64 mapping_size;
-	int ret;
-
-	down_write(&rbd_dev->header_rwsem);
-	mapping_size = rbd_dev->mapping.size;
-
-	ret = rbd_dev_header_info(rbd_dev);
-	if (ret)
-		goto out;
-
-	/*
-	 * If there is a parent, see if it has disappeared due to the
-	 * mapped image getting flattened.
-	 */
-	if (rbd_dev->parent) {
-		ret = rbd_dev_v2_parent_info(rbd_dev);
-		if (ret)
-			goto out;
-	}
-
-	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
-		rbd_dev->mapping.size = rbd_dev->header.image_size;
-	} else {
-		/* validate mapped snapshot's EXISTS flag */
-		rbd_exists_validate(rbd_dev);
-	}
-
-out:
-	up_write(&rbd_dev->header_rwsem);
-	if (!ret && mapping_size != rbd_dev->mapping.size)
-		rbd_dev_update_size(rbd_dev);
-
-	return ret;
-}
-
-static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
-		unsigned int hctx_idx, unsigned int numa_node)
-{
-	struct work_struct *work = blk_mq_rq_to_pdu(rq);
-
-	INIT_WORK(work, rbd_queue_workfn);
-	return 0;
 }
 
 static const struct blk_mq_ops rbd_mq_ops = {
 	.queue_rq	= rbd_queue_rq,
-	.init_request	= rbd_init_request,
 };
 
 static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3989,9 +5026,9 @@
 	rbd_dev->tag_set.ops = &rbd_mq_ops;
 	rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
 	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
-	rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
-	rbd_dev->tag_set.nr_hw_queues = 1;
-	rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
+	rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
+	rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
+	rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
 
 	err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
 	if (err)
@@ -4010,18 +5047,18 @@
 	q->limits.max_sectors = queue_max_hw_sectors(q);
 	blk_queue_max_segments(q, USHRT_MAX);
 	blk_queue_max_segment_size(q, UINT_MAX);
-	blk_queue_io_min(q, objset_bytes);
-	blk_queue_io_opt(q, objset_bytes);
+	blk_queue_io_min(q, rbd_dev->opts->alloc_size);
+	blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
 
 	if (rbd_dev->opts->trim) {
 		blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
-		q->limits.discard_granularity = objset_bytes;
+		q->limits.discard_granularity = rbd_dev->opts->alloc_size;
 		blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
 		blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
 	}
 
 	if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
-		q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
+		blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
 
 	/*
 	 * disk_release() expects a queue ref from add_disk() and will
@@ -4059,17 +5096,12 @@
 		(unsigned long long)rbd_dev->mapping.size);
 }
 
-/*
- * Note this shows the features for whatever's mapped, which is not
- * necessarily the base image.
- */
 static ssize_t rbd_features_show(struct device *dev,
 			     struct device_attribute *attr, char *buf)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "0x%016llx\n",
-			(unsigned long long)rbd_dev->mapping.features);
+	return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
 }
 
 static ssize_t rbd_major_show(struct device *dev,
@@ -4381,8 +5413,7 @@
 		module_put(THIS_MODULE);
 }
 
-static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
-					   struct rbd_spec *spec)
+static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
 {
 	struct rbd_device *rbd_dev;
 
@@ -4414,15 +5445,18 @@
 	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
 	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
 	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
-	init_waitqueue_head(&rbd_dev->lock_waitq);
+	spin_lock_init(&rbd_dev->lock_lists_lock);
+	INIT_LIST_HEAD(&rbd_dev->acquiring_list);
+	INIT_LIST_HEAD(&rbd_dev->running_list);
+	init_completion(&rbd_dev->acquire_wait);
+	init_completion(&rbd_dev->releasing_wait);
+
+	spin_lock_init(&rbd_dev->object_map_lock);
 
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
 	rbd_dev->dev.parent = &rbd_root_dev;
 	device_initialize(&rbd_dev->dev);
-
-	rbd_dev->rbd_client = rbdc;
-	rbd_dev->spec = spec;
 
 	return rbd_dev;
 }
@@ -4436,11 +5470,9 @@
 {
 	struct rbd_device *rbd_dev;
 
-	rbd_dev = __rbd_dev_create(rbdc, spec);
+	rbd_dev = __rbd_dev_create(spec);
 	if (!rbd_dev)
 		return NULL;
-
-	rbd_dev->opts = opts;
 
 	/* get an id and fill in device name */
 	rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
@@ -4457,6 +5489,10 @@
 
 	/* we have a ref from do_rbd_add() */
 	__module_get(THIS_MODULE);
+
+	rbd_dev->rbd_client = rbdc;
+	rbd_dev->spec = spec;
+	rbd_dev->opts = opts;
 
 	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
 	return rbd_dev;
@@ -4512,41 +5548,39 @@
 	return 0;
 }
 
-static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev,
+				    char **pobject_prefix)
 {
-	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
-					&rbd_dev->header.obj_order,
-					&rbd_dev->header.image_size);
-}
-
-static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
-{
+	size_t size;
 	void *reply_buf;
+	char *object_prefix;
 	int ret;
 	void *p;
 
-	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+	/* Response will be an encoded string, which includes a length */
+	size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
+	reply_buf = kzalloc(size, GFP_KERNEL);
 	if (!reply_buf)
 		return -ENOMEM;
 
 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
 				  &rbd_dev->header_oloc, "get_object_prefix",
-				  NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
+				  NULL, 0, reply_buf, size);
 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
 
 	p = reply_buf;
-	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
-						p + ret, NULL, GFP_NOIO);
+	object_prefix = ceph_extract_encoded_string(&p, p + ret, NULL,
+						    GFP_NOIO);
+	if (IS_ERR(object_prefix)) {
+		ret = PTR_ERR(object_prefix);
+		goto out;
+	}
 	ret = 0;
 
-	if (IS_ERR(rbd_dev->header.object_prefix)) {
-		ret = PTR_ERR(rbd_dev->header.object_prefix);
-		rbd_dev->header.object_prefix = NULL;
-	} else {
-		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
-	}
+	*pobject_prefix = object_prefix;
+	dout("  object_prefix = %s\n", object_prefix);
 out:
 	kfree(reply_buf);
 
@@ -4554,9 +5588,12 @@
 }
 
 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
-		u64 *snap_features)
+				     bool read_only, u64 *snap_features)
 {
-	__le64 snapid = cpu_to_le64(snap_id);
+	struct {
+		__le64 snap_id;
+		u8 read_only;
+	} features_in;
 	struct {
 		__le64 features;
 		__le64 incompat;
@@ -4564,9 +5601,12 @@
 	u64 unsup;
 	int ret;
 
+	features_in.snap_id = cpu_to_le64(snap_id);
+	features_in.read_only = read_only;
+
 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
 				  &rbd_dev->header_oloc, "get_features",
-				  &snapid, sizeof(snapid),
+				  &features_in, sizeof(features_in),
 				  &features_buf, sizeof(features_buf));
 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
@@ -4591,10 +5631,30 @@
 	return 0;
 }
 
-static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
+/*
+ * These are generic image flags, but since they are used only for
+ * object map, store them in rbd_dev->object_map_flags.
+ *
+ * For the same reason, this function is called only on object map
+ * (re)load and not on header refresh.
+ */
+static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
 {
-	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
-						&rbd_dev->header.features);
+	__le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
+	__le64 flags;
+	int ret;
+
+	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
+				  &rbd_dev->header_oloc, "get_flags",
+				  &snapid, sizeof(snapid),
+				  &flags, sizeof(flags));
+	if (ret < 0)
+		return ret;
+	if (ret < sizeof(flags))
+		return -EBADMSG;
+
+	rbd_dev->object_map_flags = le64_to_cpu(flags);
+	return 0;
 }
 
 struct parent_image_info {
@@ -4606,6 +5666,14 @@
 	bool		has_overlap;
 	u64		overlap;
 };
+
+static void rbd_parent_info_cleanup(struct parent_image_info *pii)
+{
+	kfree(pii->pool_ns);
+	kfree(pii->image_id);
+
+	memset(pii, 0, sizeof(*pii));
+}
 
 /*
  * The caller is responsible for @pii.
@@ -4654,7 +5722,7 @@
 
 	ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 			     "rbd", "parent_get", CEPH_OSD_FLAG_READ,
-			     req_page, sizeof(u64), reply_page, &reply_len);
+			     req_page, sizeof(u64), &reply_page, &reply_len);
 	if (ret)
 		return ret == -EOPNOTSUPP ? 1 : ret;
 
@@ -4666,7 +5734,7 @@
 
 	ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 			     "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
-			     req_page, sizeof(u64), reply_page, &reply_len);
+			     req_page, sizeof(u64), &reply_page, &reply_len);
 	if (ret)
 		return ret;
 
@@ -4676,6 +5744,9 @@
 	if (pii->has_overlap)
 		ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
 
+	dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
+	     __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
+	     pii->has_overlap, pii->overlap);
 	return 0;
 
 e_inval:
@@ -4697,7 +5768,7 @@
 
 	ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 			     "rbd", "get_parent", CEPH_OSD_FLAG_READ,
-			     req_page, sizeof(u64), reply_page, &reply_len);
+			     req_page, sizeof(u64), &reply_page, &reply_len);
 	if (ret)
 		return ret;
 
@@ -4714,14 +5785,17 @@
 	pii->has_overlap = true;
 	ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
 
+	dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
+	     __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
+	     pii->has_overlap, pii->overlap);
 	return 0;
 
 e_inval:
 	return -EINVAL;
 }
 
-static int get_parent_info(struct rbd_device *rbd_dev,
-			   struct parent_image_info *pii)
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
+				  struct parent_image_info *pii)
 {
 	struct page *req_page, *reply_page;
 	void *p;
@@ -4749,7 +5823,7 @@
 	return ret;
 }
 
-static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
+static int rbd_dev_setup_parent(struct rbd_device *rbd_dev)
 {
 	struct rbd_spec *parent_spec;
 	struct parent_image_info pii = { 0 };
@@ -4759,37 +5833,12 @@
 	if (!parent_spec)
 		return -ENOMEM;
 
-	ret = get_parent_info(rbd_dev, &pii);
+	ret = rbd_dev_v2_parent_info(rbd_dev, &pii);
 	if (ret)
 		goto out_err;
 
-	dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
-	     __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
-	     pii.has_overlap, pii.overlap);
-
-	if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
-		/*
-		 * Either the parent never existed, or we have
-		 * record of it but the image got flattened so it no
-		 * longer has a parent.  When the parent of a
-		 * layered image disappears we immediately set the
-		 * overlap to 0.  The effect of this is that all new
-		 * requests will be treated as if the image had no
-		 * parent.
-		 *
-		 * If !pii.has_overlap, the parent image spec is not
-		 * applicable.  It's there to avoid duplication in each
-		 * snapshot record.
-		 */
-		if (rbd_dev->parent_overlap) {
-			rbd_dev->parent_overlap = 0;
-			rbd_dev_parent_put(rbd_dev);
-			pr_info("%s: clone image has been flattened\n",
-				rbd_dev->disk->disk_name);
-		}
-
+	if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap)
 		goto out;	/* No parent?  No problem. */
-	}
 
 	/* The ceph file layout needs to fit pool id in 32 bits */
 
@@ -4801,58 +5850,46 @@
 	}
 
 	/*
-	 * The parent won't change (except when the clone is
-	 * flattened, already handled that).  So we only need to
-	 * record the parent spec we have not already done so.
+	 * The parent won't change except when the clone is flattened,
+	 * so we only need to record the parent image spec once.
 	 */
-	if (!rbd_dev->parent_spec) {
-		parent_spec->pool_id = pii.pool_id;
-		if (pii.pool_ns && *pii.pool_ns) {
-			parent_spec->pool_ns = pii.pool_ns;
-			pii.pool_ns = NULL;
-		}
-		parent_spec->image_id = pii.image_id;
-		pii.image_id = NULL;
-		parent_spec->snap_id = pii.snap_id;
-
-		rbd_dev->parent_spec = parent_spec;
-		parent_spec = NULL;	/* rbd_dev now owns this */
+	parent_spec->pool_id = pii.pool_id;
+	if (pii.pool_ns && *pii.pool_ns) {
+		parent_spec->pool_ns = pii.pool_ns;
+		pii.pool_ns = NULL;
 	}
+	parent_spec->image_id = pii.image_id;
+	pii.image_id = NULL;
+	parent_spec->snap_id = pii.snap_id;
+
+	rbd_assert(!rbd_dev->parent_spec);
+	rbd_dev->parent_spec = parent_spec;
+	parent_spec = NULL;	/* rbd_dev now owns this */
 
 	/*
-	 * We always update the parent overlap.  If it's zero we issue
-	 * a warning, as we will proceed as if there was no parent.
+	 * Record the parent overlap.  If it's zero, issue a warning as
+	 * we will proceed as if there is no parent.
 	 */
-	if (!pii.overlap) {
-		if (parent_spec) {
-			/* refresh, careful to warn just once */
-			if (rbd_dev->parent_overlap)
-				rbd_warn(rbd_dev,
-				    "clone now standalone (overlap became 0)");
-		} else {
-			/* initial probe */
-			rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
-		}
-	}
+	if (!pii.overlap)
+		rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
 	rbd_dev->parent_overlap = pii.overlap;
 
 out:
 	ret = 0;
 out_err:
-	kfree(pii.pool_ns);
-	kfree(pii.image_id);
+	rbd_parent_info_cleanup(&pii);
 	rbd_spec_put(parent_spec);
 	return ret;
 }
 
-static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev,
+				    u64 *stripe_unit, u64 *stripe_count)
 {
 	struct {
 		__le64 stripe_unit;
 		__le64 stripe_count;
 	} __attribute__ ((packed)) striping_info_buf = { 0 };
 	size_t size = sizeof (striping_info_buf);
-	void *p;
 	int ret;
 
 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
@@ -4864,27 +5901,33 @@
 	if (ret < size)
 		return -ERANGE;
 
-	p = &striping_info_buf;
-	rbd_dev->header.stripe_unit = ceph_decode_64(&p);
-	rbd_dev->header.stripe_count = ceph_decode_64(&p);
+	*stripe_unit = le64_to_cpu(striping_info_buf.stripe_unit);
+	*stripe_count = le64_to_cpu(striping_info_buf.stripe_count);
+	dout("  stripe_unit = %llu stripe_count = %llu\n", *stripe_unit,
+	     *stripe_count);
+
 	return 0;
 }
 
-static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev, s64 *data_pool_id)
 {
-	__le64 data_pool_id;
+	__le64 data_pool_buf;
 	int ret;
 
 	ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
 				  &rbd_dev->header_oloc, "get_data_pool",
-				  NULL, 0, &data_pool_id, sizeof(data_pool_id));
+				  NULL, 0, &data_pool_buf,
+				  sizeof(data_pool_buf));
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		return ret;
-	if (ret < sizeof(data_pool_id))
+	if (ret < sizeof(data_pool_buf))
 		return -EBADMSG;
 
-	rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
-	WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
+	*data_pool_id = le64_to_cpu(data_pool_buf);
+	dout("  data_pool_id = %lld\n", *data_pool_id);
+	WARN_ON(*data_pool_id == CEPH_NOPOOL);
+
 	return 0;
 }
 
@@ -5076,7 +6119,8 @@
 	return ret;
 }
 
-static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev,
+				   struct ceph_snap_context **psnapc)
 {
 	size_t size;
 	int ret;
@@ -5137,9 +6181,7 @@
 	for (i = 0; i < snap_count; i++)
 		snapc->snaps[i] = ceph_decode_64(&p);
 
-	ceph_put_snap_context(rbd_dev->header.snapc);
-	rbd_dev->header.snapc = snapc;
-
+	*psnapc = snapc;
 	dout("  snap context seq = %llu, snap_count = %u\n",
 		(unsigned long long)seq, (unsigned int)snap_count);
 out:
@@ -5188,38 +6230,42 @@
 	return snap_name;
 }
 
-static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev,
+				  struct rbd_image_header *header,
+				  bool first_time)
 {
-	bool first_time = rbd_dev->header.object_prefix == NULL;
 	int ret;
 
-	ret = rbd_dev_v2_image_size(rbd_dev);
+	ret = _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
+				    first_time ? &header->obj_order : NULL,
+				    &header->image_size);
 	if (ret)
 		return ret;
 
 	if (first_time) {
-		ret = rbd_dev_v2_header_onetime(rbd_dev);
+		ret = rbd_dev_v2_header_onetime(rbd_dev, header);
 		if (ret)
 			return ret;
 	}
 
-	ret = rbd_dev_v2_snap_context(rbd_dev);
-	if (ret && first_time) {
-		kfree(rbd_dev->header.object_prefix);
-		rbd_dev->header.object_prefix = NULL;
-	}
+	ret = rbd_dev_v2_snap_context(rbd_dev, &header->snapc);
+	if (ret)
+		return ret;
 
-	return ret;
+	return 0;
 }
 
-static int rbd_dev_header_info(struct rbd_device *rbd_dev)
+static int rbd_dev_header_info(struct rbd_device *rbd_dev,
+			       struct rbd_image_header *header,
+			       bool first_time)
 {
 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+	rbd_assert(!header->object_prefix && !header->snapc);
 
 	if (rbd_dev->image_format == 1)
-		return rbd_dev_v1_header_info(rbd_dev);
+		return rbd_dev_v1_header_info(rbd_dev, header, first_time);
 
-	return rbd_dev_v2_header_info(rbd_dev);
+	return rbd_dev_v2_header_info(rbd_dev, header, first_time);
 }
 
 /*
@@ -5275,6 +6321,141 @@
 	return dup;
 }
 
+static int rbd_parse_param(struct fs_parameter *param,
+			    struct rbd_parse_opts_ctx *pctx)
+{
+	struct rbd_options *opt = pctx->opts;
+	struct fs_parse_result result;
+	struct p_log log = {.prefix = "rbd"};
+	int token, ret;
+
+	ret = ceph_parse_param(param, pctx->copts, NULL);
+	if (ret != -ENOPARAM)
+		return ret;
+
+	token = __fs_parse(&log, rbd_parameters, param, &result);
+	dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
+	if (token < 0) {
+		if (token == -ENOPARAM)
+			return inval_plog(&log, "Unknown parameter '%s'",
+					  param->key);
+		return token;
+	}
+
+	switch (token) {
+	case Opt_queue_depth:
+		if (result.uint_32 < 1)
+			goto out_of_range;
+		opt->queue_depth = result.uint_32;
+		break;
+	case Opt_alloc_size:
+		if (result.uint_32 < SECTOR_SIZE)
+			goto out_of_range;
+		if (!is_power_of_2(result.uint_32))
+			return inval_plog(&log, "alloc_size must be a power of 2");
+		opt->alloc_size = result.uint_32;
+		break;
+	case Opt_lock_timeout:
+		/* 0 is "wait forever" (i.e. infinite timeout) */
+		if (result.uint_32 > INT_MAX / 1000)
+			goto out_of_range;
+		opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
+		break;
+	case Opt_pool_ns:
+		kfree(pctx->spec->pool_ns);
+		pctx->spec->pool_ns = param->string;
+		param->string = NULL;
+		break;
+	case Opt_compression_hint:
+		switch (result.uint_32) {
+		case Opt_compression_hint_none:
+			opt->alloc_hint_flags &=
+			    ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
+			      CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
+			break;
+		case Opt_compression_hint_compressible:
+			opt->alloc_hint_flags |=
+			    CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
+			opt->alloc_hint_flags &=
+			    ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+			break;
+		case Opt_compression_hint_incompressible:
+			opt->alloc_hint_flags |=
+			    CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+			opt->alloc_hint_flags &=
+			    ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
+			break;
+		default:
+			BUG();
+		}
+		break;
+	case Opt_read_only:
+		opt->read_only = true;
+		break;
+	case Opt_read_write:
+		opt->read_only = false;
+		break;
+	case Opt_lock_on_read:
+		opt->lock_on_read = true;
+		break;
+	case Opt_exclusive:
+		opt->exclusive = true;
+		break;
+	case Opt_notrim:
+		opt->trim = false;
+		break;
+	default:
+		BUG();
+	}
+
+	return 0;
+
+out_of_range:
+	return inval_plog(&log, "%s out of range", param->key);
+}
+
+/*
+ * This duplicates most of generic_parse_monolithic(), untying it from
+ * fs_context and skipping standard superblock and security options.
+ */
+static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
+{
+	char *key;
+	int ret = 0;
+
+	dout("%s '%s'\n", __func__, options);
+	while ((key = strsep(&options, ",")) != NULL) {
+		if (*key) {
+			struct fs_parameter param = {
+				.key	= key,
+				.type	= fs_value_is_flag,
+			};
+			char *value = strchr(key, '=');
+			size_t v_len = 0;
+
+			if (value) {
+				if (value == key)
+					continue;
+				*value++ = 0;
+				v_len = strlen(value);
+				param.string = kmemdup_nul(value, v_len,
+							   GFP_KERNEL);
+				if (!param.string)
+					return -ENOMEM;
+				param.type = fs_value_is_string;
+			}
+			param.size = v_len;
+
+			ret = rbd_parse_param(&param, pctx);
+			kfree(param.string);
+			if (ret)
+				break;
+		}
+	}
+
+	return ret;
+}
+
 /*
  * Parse the options provided for an "rbd add" (i.e., rbd image
  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
@@ -5326,8 +6507,7 @@
 	const char *mon_addrs;
 	char *snap_name;
 	size_t mon_addrs_size;
-	struct parse_rbd_opts_ctx pctx = { 0 };
-	struct ceph_options *copts;
+	struct rbd_parse_opts_ctx pctx = { 0 };
 	int ret;
 
 	/* The first four tokens are required */
@@ -5338,7 +6518,7 @@
 		return -EINVAL;
 	}
 	mon_addrs = buf;
-	mon_addrs_size = len + 1;
+	mon_addrs_size = len;
 	buf += len;
 
 	ret = -EINVAL;
@@ -5388,6 +6568,10 @@
 	*(snap_name + len) = '\0';
 	pctx.spec->snap_name = snap_name;
 
+	pctx.copts = ceph_alloc_options();
+	if (!pctx.copts)
+		goto out_mem;
+
 	/* Initialize all rbd options to the defaults */
 
 	pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
@@ -5396,32 +6580,33 @@
 
 	pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
 	pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
+	pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
 	pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
 	pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
 	pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
 	pctx.opts->trim = RBD_TRIM_DEFAULT;
 
-	copts = ceph_parse_options(options, mon_addrs,
-				   mon_addrs + mon_addrs_size - 1,
-				   parse_rbd_opts_token, &pctx);
-	if (IS_ERR(copts)) {
-		ret = PTR_ERR(copts);
+	ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
+	if (ret)
 		goto out_err;
-	}
-	kfree(options);
 
-	*ceph_opts = copts;
+	ret = rbd_parse_options(options, &pctx);
+	if (ret)
+		goto out_err;
+
+	*ceph_opts = pctx.copts;
 	*opts = pctx.opts;
 	*rbd_spec = pctx.spec;
-
+	kfree(options);
 	return 0;
+
 out_mem:
 	ret = -ENOMEM;
 out_err:
 	kfree(pctx.opts);
+	ceph_destroy_options(pctx.copts);
 	rbd_spec_put(pctx.spec);
 	kfree(options);
-
 	return ret;
 }
 
@@ -5429,28 +6614,51 @@
 {
 	down_write(&rbd_dev->lock_rwsem);
 	if (__rbd_is_lock_owner(rbd_dev))
-		rbd_unlock(rbd_dev);
+		__rbd_release_lock(rbd_dev);
 	up_write(&rbd_dev->lock_rwsem);
 }
 
+/*
+ * If the wait is interrupted, an error is returned even if the lock
+ * was successfully acquired.  rbd_dev_image_unlock() will release it
+ * if needed.
+ */
 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
 {
-	int ret;
+	long ret;
 
 	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+		if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
+			return 0;
+
 		rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
 		return -EINVAL;
 	}
 
-	/* FIXME: "rbd map --exclusive" should be in interruptible */
-	down_read(&rbd_dev->lock_rwsem);
-	ret = rbd_wait_state_locked(rbd_dev, true);
-	up_read(&rbd_dev->lock_rwsem);
-	if (ret) {
-		rbd_warn(rbd_dev, "failed to acquire exclusive lock");
-		return -EROFS;
-	}
+	if (rbd_is_ro(rbd_dev))
+		return 0;
 
+	rbd_assert(!rbd_is_lock_owner(rbd_dev));
+	queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+	ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
+			    ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
+	if (ret > 0) {
+		ret = rbd_dev->acquire_err;
+	} else {
+		cancel_delayed_work_sync(&rbd_dev->lock_dwork);
+		if (!ret)
+			ret = -ETIMEDOUT;
+
+		rbd_warn(rbd_dev, "failed to acquire lock: %ld", ret);
+	}
+	if (ret)
+		return ret;
+
+	/*
+	 * The lock may have been released by now, unless automatic lock
+	 * transitions are disabled.
+	 */
+	rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
 	return 0;
 }
 
@@ -5500,7 +6708,6 @@
 	dout("rbd id object name is %s\n", oid.name);
 
 	/* Response will be an encoded string, which includes a length */
-
 	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
 	response = kzalloc(size, GFP_NOIO);
 	if (!response) {
@@ -5512,7 +6719,7 @@
 
 	ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
 				  "get_id", NULL, 0,
-				  response, RBD_IMAGE_ID_LEN_MAX);
+				  response, size);
 	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret == -ENOENT) {
 		image_id = kstrdup("", GFP_KERNEL);
@@ -5545,58 +6752,49 @@
  */
 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
-	struct rbd_image_header	*header;
-
 	rbd_dev_parent_put(rbd_dev);
+	rbd_object_map_free(rbd_dev);
+	rbd_dev_mapping_clear(rbd_dev);
 
 	/* Free dynamic fields from the header, then zero it out */
 
-	header = &rbd_dev->header;
-	ceph_put_snap_context(header->snapc);
-	kfree(header->snap_sizes);
-	kfree(header->snap_names);
-	kfree(header->object_prefix);
-	memset(header, 0, sizeof (*header));
+	rbd_image_header_cleanup(&rbd_dev->header);
 }
 
-static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev,
+				     struct rbd_image_header *header)
 {
 	int ret;
 
-	ret = rbd_dev_v2_object_prefix(rbd_dev);
+	ret = rbd_dev_v2_object_prefix(rbd_dev, &header->object_prefix);
 	if (ret)
-		goto out_err;
+		return ret;
 
 	/*
 	 * Get the and check features for the image.  Currently the
 	 * features are assumed to never change.
 	 */
-	ret = rbd_dev_v2_features(rbd_dev);
+	ret = _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
+					rbd_is_ro(rbd_dev), &header->features);
 	if (ret)
-		goto out_err;
+		return ret;
 
 	/* If the image supports fancy striping, get its parameters */
 
-	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
-		ret = rbd_dev_v2_striping_info(rbd_dev);
-		if (ret < 0)
-			goto out_err;
-	}
-
-	if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
-		ret = rbd_dev_v2_data_pool(rbd_dev);
+	if (header->features & RBD_FEATURE_STRIPINGV2) {
+		ret = rbd_dev_v2_striping_info(rbd_dev, &header->stripe_unit,
+					       &header->stripe_count);
 		if (ret)
-			goto out_err;
+			return ret;
 	}
 
-	rbd_init_layout(rbd_dev);
-	return 0;
+	if (header->features & RBD_FEATURE_DATA_POOL) {
+		ret = rbd_dev_v2_data_pool(rbd_dev, &header->data_pool_id);
+		if (ret)
+			return ret;
+	}
 
-out_err:
-	rbd_dev->header.features = 0;
-	kfree(rbd_dev->header.object_prefix);
-	rbd_dev->header.object_prefix = NULL;
-	return ret;
+	return 0;
 }
 
 /*
@@ -5618,7 +6816,7 @@
 		goto out_err;
 	}
 
-	parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
+	parent = __rbd_dev_create(rbd_dev->parent_spec);
 	if (!parent) {
 		ret = -ENOMEM;
 		goto out_err;
@@ -5628,8 +6826,10 @@
 	 * Images related by parent/child relationships always share
 	 * rbd_client and spec/parent_spec, so bump their refcounts.
 	 */
-	__rbd_get_client(rbd_dev->rbd_client);
-	rbd_spec_get(rbd_dev->parent_spec);
+	parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
+	parent->spec = rbd_spec_get(rbd_dev->parent_spec);
+
+	__set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
 
 	ret = rbd_dev_image_probe(parent, depth);
 	if (ret < 0)
@@ -5648,7 +6848,6 @@
 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-	rbd_dev_mapping_clear(rbd_dev);
 	rbd_free_disk(rbd_dev);
 	if (!single_major)
 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
@@ -5682,23 +6881,17 @@
 	if (ret)
 		goto err_out_blkdev;
 
-	ret = rbd_dev_mapping_set(rbd_dev);
-	if (ret)
-		goto err_out_disk;
-
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
-	set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
+	set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
 
 	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
 	if (ret)
-		goto err_out_mapping;
+		goto err_out_disk;
 
 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	up_write(&rbd_dev->header_rwsem);
 	return 0;
 
-err_out_mapping:
-	rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
 	rbd_free_disk(rbd_dev);
 err_out_blkdev:
@@ -5727,9 +6920,27 @@
 	return ret;
 }
 
+static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
+{
+	if (!is_snap) {
+		pr_info("image %s/%s%s%s does not exist\n",
+			rbd_dev->spec->pool_name,
+			rbd_dev->spec->pool_ns ?: "",
+			rbd_dev->spec->pool_ns ? "/" : "",
+			rbd_dev->spec->image_name);
+	} else {
+		pr_info("snap %s/%s%s%s@%s does not exist\n",
+			rbd_dev->spec->pool_name,
+			rbd_dev->spec->pool_ns ?: "",
+			rbd_dev->spec->pool_ns ? "/" : "",
+			rbd_dev->spec->image_name,
+			rbd_dev->spec->snap_name);
+	}
+}
+
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 {
-	if (rbd_dev->opts)
+	if (!rbd_is_ro(rbd_dev))
 		rbd_unregister_watch(rbd_dev);
 
 	rbd_dev_unprobe(rbd_dev);
@@ -5749,6 +6960,7 @@
  */
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
 {
+	bool need_watch = !rbd_is_ro(rbd_dev);
 	int ret;
 
 	/*
@@ -5765,15 +6977,11 @@
 	if (ret)
 		goto err_out_format;
 
-	if (!depth) {
+	if (need_watch) {
 		ret = rbd_register_watch(rbd_dev);
 		if (ret) {
 			if (ret == -ENOENT)
-				pr_info("image %s/%s%s%s does not exist\n",
-					rbd_dev->spec->pool_name,
-					rbd_dev->spec->pool_ns ?: "",
-					rbd_dev->spec->pool_ns ? "/" : "",
-					rbd_dev->spec->image_name);
+				rbd_print_dne(rbd_dev, false);
 			goto err_out_format;
 		}
 	}
@@ -5781,9 +6989,14 @@
 	if (!depth)
 		down_write(&rbd_dev->header_rwsem);
 
-	ret = rbd_dev_header_info(rbd_dev);
-	if (ret)
+	ret = rbd_dev_header_info(rbd_dev, &rbd_dev->header, true);
+	if (ret) {
+		if (ret == -ENOENT && !need_watch)
+			rbd_print_dne(rbd_dev, false);
 		goto err_out_probe;
+	}
+
+	rbd_init_layout(rbd_dev);
 
 	/*
 	 * If this image is the one being mapped, we have pool name and
@@ -5797,27 +7010,25 @@
 		ret = rbd_spec_fill_names(rbd_dev);
 	if (ret) {
 		if (ret == -ENOENT)
-			pr_info("snap %s/%s%s%s@%s does not exist\n",
-				rbd_dev->spec->pool_name,
-				rbd_dev->spec->pool_ns ?: "",
-				rbd_dev->spec->pool_ns ? "/" : "",
-				rbd_dev->spec->image_name,
-				rbd_dev->spec->snap_name);
+			rbd_print_dne(rbd_dev, true);
 		goto err_out_probe;
 	}
 
-	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
-		ret = rbd_dev_v2_parent_info(rbd_dev);
+	ret = rbd_dev_mapping_set(rbd_dev);
+	if (ret)
+		goto err_out_probe;
+
+	if (rbd_is_snap(rbd_dev) &&
+	    (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
+		ret = rbd_object_map_load(rbd_dev);
 		if (ret)
 			goto err_out_probe;
+	}
 
-		/*
-		 * Need to warn users if this image is the one being
-		 * mapped and has a parent.
-		 */
-		if (!depth && rbd_dev->parent_spec)
-			rbd_warn(rbd_dev,
-				 "WARNING: kernel layering is EXPERIMENTAL!");
+	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+		ret = rbd_dev_setup_parent(rbd_dev);
+		if (ret)
+			goto err_out_probe;
 	}
 
 	ret = rbd_dev_probe_parent(rbd_dev, depth);
@@ -5831,13 +7042,114 @@
 err_out_probe:
 	if (!depth)
 		up_write(&rbd_dev->header_rwsem);
-	if (!depth)
+	if (need_watch)
 		rbd_unregister_watch(rbd_dev);
 	rbd_dev_unprobe(rbd_dev);
 err_out_format:
 	rbd_dev->image_format = 0;
 	kfree(rbd_dev->spec->image_id);
 	rbd_dev->spec->image_id = NULL;
+	return ret;
+}
+
+static void rbd_dev_update_header(struct rbd_device *rbd_dev,
+				  struct rbd_image_header *header)
+{
+	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+	rbd_assert(rbd_dev->header.object_prefix); /* !first_time */
+
+	if (rbd_dev->header.image_size != header->image_size) {
+		rbd_dev->header.image_size = header->image_size;
+
+		if (!rbd_is_snap(rbd_dev)) {
+			rbd_dev->mapping.size = header->image_size;
+			rbd_dev_update_size(rbd_dev);
+		}
+	}
+
+	ceph_put_snap_context(rbd_dev->header.snapc);
+	rbd_dev->header.snapc = header->snapc;
+	header->snapc = NULL;
+
+	if (rbd_dev->image_format == 1) {
+		kfree(rbd_dev->header.snap_names);
+		rbd_dev->header.snap_names = header->snap_names;
+		header->snap_names = NULL;
+
+		kfree(rbd_dev->header.snap_sizes);
+		rbd_dev->header.snap_sizes = header->snap_sizes;
+		header->snap_sizes = NULL;
+	}
+}
+
+static void rbd_dev_update_parent(struct rbd_device *rbd_dev,
+				  struct parent_image_info *pii)
+{
+	if (pii->pool_id == CEPH_NOPOOL || !pii->has_overlap) {
+		/*
+		 * Either the parent never existed, or we have
+		 * record of it but the image got flattened so it no
+		 * longer has a parent.  When the parent of a
+		 * layered image disappears we immediately set the
+		 * overlap to 0.  The effect of this is that all new
+		 * requests will be treated as if the image had no
+		 * parent.
+		 *
+		 * If !pii.has_overlap, the parent image spec is not
+		 * applicable.  It's there to avoid duplication in each
+		 * snapshot record.
+		 */
+		if (rbd_dev->parent_overlap) {
+			rbd_dev->parent_overlap = 0;
+			rbd_dev_parent_put(rbd_dev);
+			pr_info("%s: clone has been flattened\n",
+				rbd_dev->disk->disk_name);
+		}
+	} else {
+		rbd_assert(rbd_dev->parent_spec);
+
+		/*
+		 * Update the parent overlap.  If it became zero, issue
+		 * a warning as we will proceed as if there is no parent.
+		 */
+		if (!pii->overlap && rbd_dev->parent_overlap)
+			rbd_warn(rbd_dev,
+				 "clone has become standalone (overlap 0)");
+		rbd_dev->parent_overlap = pii->overlap;
+	}
+}
+
+static int rbd_dev_refresh(struct rbd_device *rbd_dev)
+{
+	struct rbd_image_header	header = { 0 };
+	struct parent_image_info pii = { 0 };
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = rbd_dev_header_info(rbd_dev, &header, false);
+	if (ret)
+		goto out;
+
+	/*
+	 * If there is a parent, see if it has disappeared due to the
+	 * mapped image getting flattened.
+	 */
+	if (rbd_dev->parent) {
+		ret = rbd_dev_v2_parent_info(rbd_dev, &pii);
+		if (ret)
+			goto out;
+	}
+
+	down_write(&rbd_dev->header_rwsem);
+	rbd_dev_update_header(rbd_dev, &header);
+	if (rbd_dev->parent)
+		rbd_dev_update_parent(rbd_dev, &pii);
+	up_write(&rbd_dev->header_rwsem);
+
+out:
+	rbd_parent_info_cleanup(&pii);
+	rbd_image_header_cleanup(&header);
 	return ret;
 }
 
@@ -5887,6 +7199,11 @@
 	spec = NULL;		/* rbd_dev now owns this */
 	rbd_opts = NULL;	/* rbd_dev now owns this */
 
+	/* if we are mapping a snapshot it will be a read-only mapping */
+	if (rbd_dev->opts->read_only ||
+	    strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
+		__set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
+
 	rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
 	if (!rbd_dev->config_info) {
 		rc = -ENOMEM;
@@ -5897,19 +7214,19 @@
 	if (rc < 0)
 		goto err_out_rbd_dev;
 
-	/* If we are mapping a snapshot it must be marked read-only */
-	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
-		rbd_dev->opts->read_only = true;
+	if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
+		rbd_warn(rbd_dev, "alloc_size adjusted to %u",
+			 rbd_dev->layout.object_size);
+		rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
+	}
 
 	rc = rbd_dev_device_setup(rbd_dev);
 	if (rc)
 		goto err_out_image_probe;
 
-	if (rbd_dev->opts->exclusive) {
-		rc = rbd_add_acquire_lock(rbd_dev);
-		if (rc)
-			goto err_out_device_setup;
-	}
+	rc = rbd_add_acquire_lock(rbd_dev);
+	if (rc)
+		goto err_out_image_lock;
 
 	/* Everything's ready.  Announce the disk to the world. */
 
@@ -5917,7 +7234,7 @@
 	if (rc)
 		goto err_out_image_lock;
 
-	add_disk(rbd_dev->disk);
+	device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
 	/* see rbd_init_disk() */
 	blk_put_queue(rbd_dev->disk->queue);
 
@@ -5935,7 +7252,6 @@
 
 err_out_image_lock:
 	rbd_dev_image_unlock(rbd_dev);
-err_out_device_setup:
 	rbd_dev_device_release(rbd_dev);
 err_out_image_probe:
 	rbd_dev_image_release(rbd_dev);
@@ -5949,9 +7265,7 @@
 	goto out;
 }
 
-static ssize_t rbd_add(struct bus_type *bus,
-		       const char *buf,
-		       size_t count)
+static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
 {
 	if (single_major)
 		return -EINVAL;
@@ -5959,9 +7273,8 @@
 	return do_rbd_add(bus, buf, count);
 }
 
-static ssize_t rbd_add_single_major(struct bus_type *bus,
-				    const char *buf,
-				    size_t count)
+static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
+				      size_t count)
 {
 	return do_rbd_add(bus, buf, count);
 }
@@ -6067,9 +7380,7 @@
 	return count;
 }
 
-static ssize_t rbd_remove(struct bus_type *bus,
-			  const char *buf,
-			  size_t count)
+static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
 {
 	if (single_major)
 		return -EINVAL;
@@ -6077,9 +7388,8 @@
 	return do_rbd_remove(bus, buf, count);
 }
 
-static ssize_t rbd_remove_single_major(struct bus_type *bus,
-				       const char *buf,
-				       size_t count)
+static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
+					 size_t count)
 {
 	return do_rbd_remove(bus, buf, count);
 }
@@ -6088,7 +7398,7 @@
  * create control files in sysfs
  * /sys/bus/rbd/...
  */
-static int rbd_sysfs_init(void)
+static int __init rbd_sysfs_init(void)
 {
 	int ret;
 
@@ -6103,13 +7413,13 @@
 	return ret;
 }
 
-static void rbd_sysfs_cleanup(void)
+static void __exit rbd_sysfs_cleanup(void)
 {
 	bus_unregister(&rbd_bus_type);
 	device_unregister(&rbd_root_dev);
 }
 
-static int rbd_slab_init(void)
+static int __init rbd_slab_init(void)
 {
 	rbd_assert(!rbd_img_request_cache);
 	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);

--
Gitblit v1.6.2