From 102a0743326a03cd1a1202ceda21e175b7d3575c Mon Sep 17 00:00:00 2001
From: hc <hc@nodka.com>
Date: Tue, 20 Feb 2024 01:20:52 +0000
Subject: [PATCH] add new system file

---
 kernel/net/ceph/osdmap.c |  628 +++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 523 insertions(+), 105 deletions(-)

diff --git a/kernel/net/ceph/osdmap.c b/kernel/net/ceph/osdmap.c
index 7cb6025..fa08c15 100644
--- a/kernel/net/ceph/osdmap.c
+++ b/kernel/net/ceph/osdmap.c
@@ -138,6 +138,79 @@
 	return -EINVAL;
 }
 
+struct crush_name_node {
+	struct rb_node cn_node;
+	int cn_id;
+	char cn_name[];
+};
+
+static struct crush_name_node *alloc_crush_name(size_t name_len)
+{
+	struct crush_name_node *cn;
+
+	cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
+	if (!cn)
+		return NULL;
+
+	RB_CLEAR_NODE(&cn->cn_node);
+	return cn;
+}
+
+static void free_crush_name(struct crush_name_node *cn)
+{
+	WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
+
+	kfree(cn);
+}
+
+DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
+
+static int decode_crush_names(void **p, void *end, struct rb_root *root)
+{
+	u32 n;
+
+	ceph_decode_32_safe(p, end, n, e_inval);
+	while (n--) {
+		struct crush_name_node *cn;
+		int id;
+		u32 name_len;
+
+		ceph_decode_32_safe(p, end, id, e_inval);
+		ceph_decode_32_safe(p, end, name_len, e_inval);
+		ceph_decode_need(p, end, name_len, e_inval);
+
+		cn = alloc_crush_name(name_len);
+		if (!cn)
+			return -ENOMEM;
+
+		cn->cn_id = id;
+		memcpy(cn->cn_name, *p, name_len);
+		cn->cn_name[name_len] = '\0';
+		*p += name_len;
+
+		if (!__insert_crush_name(root, cn)) {
+			free_crush_name(cn);
+			return -EEXIST;
+		}
+	}
+
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+void clear_crush_names(struct rb_root *root)
+{
+	while (!RB_EMPTY_ROOT(root)) {
+		struct crush_name_node *cn =
+		    rb_entry(rb_first(root), struct crush_name_node, cn_node);
+
+		erase_crush_name(root, cn);
+		free_crush_name(cn);
+	}
+}
+
 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
 {
 	struct crush_choose_arg_map *arg_map;
@@ -354,6 +427,8 @@
 	if (c == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	c->type_names = RB_ROOT;
+	c->names = RB_ROOT;
 	c->choose_args = RB_ROOT;
 
         /* set tunables to default values */
@@ -495,9 +570,8 @@
 			  / sizeof(struct crush_rule_step))
 			goto bad;
 #endif
-		r = c->rules[i] = kmalloc(sizeof(*r) +
-					  yes*sizeof(struct crush_rule_step),
-					  GFP_NOFS);
+		r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
+		c->rules[i] = r;
 		if (r == NULL)
 			goto badmem;
 		dout(" rule %d is at %p\n", i, r);
@@ -511,8 +585,14 @@
 		}
 	}
 
-	ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
-	ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
+	err = decode_crush_names(p, end, &c->type_names);
+	if (err)
+		goto fail;
+
+	err = decode_crush_names(p, end, &c->names);
+	if (err)
+		goto fail;
+
 	ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
 
         /* tunables */
@@ -637,48 +717,11 @@
 /*
  * rbtree of pg pool info
  */
-static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
-{
-	struct rb_node **p = &root->rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_pg_pool_info *pi = NULL;
-
-	while (*p) {
-		parent = *p;
-		pi = rb_entry(parent, struct ceph_pg_pool_info, node);
-		if (new->id < pi->id)
-			p = &(*p)->rb_left;
-		else if (new->id > pi->id)
-			p = &(*p)->rb_right;
-		else
-			return -EEXIST;
-	}
-
-	rb_link_node(&new->node, parent, p);
-	rb_insert_color(&new->node, root);
-	return 0;
-}
-
-static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
-{
-	struct ceph_pg_pool_info *pi;
-	struct rb_node *n = root->rb_node;
-
-	while (n) {
-		pi = rb_entry(n, struct ceph_pg_pool_info, node);
-		if (id < pi->id)
-			n = n->rb_left;
-		else if (id > pi->id)
-			n = n->rb_right;
-		else
-			return pi;
-	}
-	return NULL;
-}
+DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
 
 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
 {
-	return __lookup_pg_pool(&map->pg_pools, id);
+	return lookup_pg_pool(&map->pg_pools, id);
 }
 
 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
@@ -691,8 +734,7 @@
 	if (WARN_ON_ONCE(id > (u64) INT_MAX))
 		return NULL;
 
-	pi = __lookup_pg_pool(&map->pg_pools, (int) id);
-
+	pi = lookup_pg_pool(&map->pg_pools, id);
 	return pi ? pi->name : NULL;
 }
 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
@@ -715,14 +757,14 @@
 {
 	struct ceph_pg_pool_info *pi;
 
-	pi = __lookup_pg_pool(&map->pg_pools, id);
+	pi = lookup_pg_pool(&map->pg_pools, id);
 	return pi ? pi->flags : 0;
 }
 EXPORT_SYMBOL(ceph_pg_pool_flags);
 
 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
 {
-	rb_erase(&pi->node, root);
+	erase_pg_pool(root, pi);
 	kfree(pi->name);
 	kfree(pi);
 }
@@ -904,7 +946,7 @@
 		ceph_decode_32_safe(p, end, len, bad);
 		dout("  pool %llu len %d\n", pool, len);
 		ceph_decode_need(p, end, len, bad);
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (pi) {
 			char *name = kstrndup(*p, len, GFP_NOFS);
 
@@ -920,6 +962,143 @@
 
 bad:
 	return -EINVAL;
+}
+
+/*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+	struct crush_work *work;
+	size_t work_size;
+
+	WARN_ON(!c->working_size);
+	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+	dout("%s work_size %zu bytes\n", __func__, work_size);
+
+	work = ceph_kvmalloc(work_size, GFP_NOIO);
+	if (!work)
+		return NULL;
+
+	INIT_LIST_HEAD(&work->item);
+	crush_init_workspace(c, work);
+	return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+	WARN_ON(!list_empty(&work->item));
+	kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+	INIT_LIST_HEAD(&wsm->idle_ws);
+	spin_lock_init(&wsm->ws_lock);
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+	init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+				  struct crush_work *work)
+{
+	WARN_ON(!list_empty(&wsm->idle_ws));
+
+	list_add(&work->item, &wsm->idle_ws);
+	atomic_set(&wsm->total_ws, 1);
+	wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+	struct crush_work *work;
+
+	while (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		free_workspace(work);
+	}
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+					const struct crush_map *c)
+{
+	struct crush_work *work;
+	int cpus = num_online_cpus();
+
+again:
+	spin_lock(&wsm->ws_lock);
+	if (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		wsm->free_ws--;
+		spin_unlock(&wsm->ws_lock);
+		return work;
+
+	}
+	if (atomic_read(&wsm->total_ws) > cpus) {
+		DEFINE_WAIT(wait);
+
+		spin_unlock(&wsm->ws_lock);
+		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+			schedule();
+		finish_wait(&wsm->ws_wait, &wait);
+		goto again;
+	}
+	atomic_inc(&wsm->total_ws);
+	spin_unlock(&wsm->ws_lock);
+
+	work = alloc_workspace(c);
+	if (!work) {
+		atomic_dec(&wsm->total_ws);
+		wake_up(&wsm->ws_wait);
+
+		/*
+		 * Do not return the error but go back to waiting.  We
+		 * have the inital workspace and the CRUSH computation
+		 * time is bounded so we will get it eventually.
+		 */
+		WARN_ON(atomic_read(&wsm->total_ws) < 1);
+		goto again;
+	}
+	return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+			  struct crush_work *work)
+{
+	spin_lock(&wsm->ws_lock);
+	if (wsm->free_ws <= num_online_cpus()) {
+		list_add(&work->item, &wsm->idle_ws);
+		wsm->free_ws++;
+		spin_unlock(&wsm->ws_lock);
+		goto wake;
+	}
+	spin_unlock(&wsm->ws_lock);
+
+	free_workspace(work);
+	atomic_dec(&wsm->total_ws);
+wake:
+	if (wq_has_sleeper(&wsm->ws_wait))
+		wake_up(&wsm->ws_wait);
 }
 
 /*
@@ -939,7 +1118,8 @@
 	map->primary_temp = RB_ROOT;
 	map->pg_upmap = RB_ROOT;
 	map->pg_upmap_items = RB_ROOT;
-	mutex_init(&map->crush_workspace_mutex);
+
+	init_workspace_manager(&map->crush_wsm);
 
 	return map;
 }
@@ -947,8 +1127,11 @@
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
 	dout("osdmap_destroy %p\n", map);
+
 	if (map->crush)
 		crush_destroy(map->crush);
+	cleanup_workspace_manager(&map->crush_wsm);
+
 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 		struct ceph_pg_mapping *pg =
 			rb_entry(rb_first(&map->pg_temp),
@@ -983,11 +1166,10 @@
 				 struct ceph_pg_pool_info, node);
 		__remove_pg_pool(&map->pg_pools, pi);
 	}
-	kfree(map->osd_state);
-	kfree(map->osd_weight);
-	kfree(map->osd_addr);
-	kfree(map->osd_primary_affinity);
-	kfree(map->crush_workspace);
+	kvfree(map->osd_state);
+	kvfree(map->osd_weight);
+	kvfree(map->osd_addr);
+	kvfree(map->osd_primary_affinity);
 	kfree(map);
 }
 
@@ -996,28 +1178,41 @@
  *
  * The new elements are properly initialized.
  */
-static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
+static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 {
 	u32 *state;
 	u32 *weight;
 	struct ceph_entity_addr *addr;
+	u32 to_copy;
 	int i;
 
-	state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
-	if (!state)
+	dout("%s old %u new %u\n", __func__, map->max_osd, max);
+	if (max == map->max_osd)
+		return 0;
+
+	state = ceph_kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
+	weight = ceph_kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
+	addr = ceph_kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
+	if (!state || !weight || !addr) {
+		kvfree(state);
+		kvfree(weight);
+		kvfree(addr);
 		return -ENOMEM;
+	}
+
+	to_copy = min(map->max_osd, max);
+	if (map->osd_state) {
+		memcpy(state, map->osd_state, to_copy * sizeof(*state));
+		memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
+		memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
+		kvfree(map->osd_state);
+		kvfree(map->osd_weight);
+		kvfree(map->osd_addr);
+	}
+
 	map->osd_state = state;
-
-	weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
-	if (!weight)
-		return -ENOMEM;
 	map->osd_weight = weight;
-
-	addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
-	if (!addr)
-		return -ENOMEM;
 	map->osd_addr = addr;
-
 	for (i = map->max_osd; i < max; i++) {
 		map->osd_state[i] = 0;
 		map->osd_weight[i] = CEPH_OSD_OUT;
@@ -1027,12 +1222,16 @@
 	if (map->osd_primary_affinity) {
 		u32 *affinity;
 
-		affinity = krealloc(map->osd_primary_affinity,
-				    max*sizeof(*affinity), GFP_NOFS);
+		affinity = ceph_kvmalloc(array_size(max, sizeof(*affinity)),
+					 GFP_NOFS);
 		if (!affinity)
 			return -ENOMEM;
-		map->osd_primary_affinity = affinity;
 
+		memcpy(affinity, map->osd_primary_affinity,
+		       to_copy * sizeof(*affinity));
+		kvfree(map->osd_primary_affinity);
+
+		map->osd_primary_affinity = affinity;
 		for (i = map->max_osd; i < max; i++)
 			map->osd_primary_affinity[i] =
 			    CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
@@ -1045,26 +1244,22 @@
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-	void *workspace;
-	size_t work_size;
+	struct crush_work *work;
 
 	if (IS_ERR(crush))
 		return PTR_ERR(crush);
 
-	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-	dout("%s work_size %zu bytes\n", __func__, work_size);
-	workspace = kmalloc(work_size, GFP_NOIO);
-	if (!workspace) {
+	work = alloc_workspace(crush);
+	if (!work) {
 		crush_destroy(crush);
 		return -ENOMEM;
 	}
-	crush_init_workspace(crush, workspace);
 
 	if (map->crush)
 		crush_destroy(map->crush);
-	kfree(map->crush_workspace);
+	cleanup_workspace_manager(&map->crush_wsm);
 	map->crush = crush;
-	map->crush_workspace = workspace;
+	add_initial_workspace(&map->crush_wsm, work);
 	return 0;
 }
 
@@ -1138,18 +1333,18 @@
 
 		ceph_decode_64_safe(p, end, pool, e_inval);
 
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (!incremental || !pi) {
 			pi = kzalloc(sizeof(*pi), GFP_NOFS);
 			if (!pi)
 				return -ENOMEM;
 
+			RB_CLEAR_NODE(&pi->node);
 			pi->id = pool;
 
-			ret = __insert_pg_pool(&map->pg_pools, pi);
-			if (ret) {
+			if (!__insert_pg_pool(&map->pg_pools, pi)) {
 				kfree(pi);
-				return ret;
+				return -EEXIST;
 			}
 		}
 
@@ -1308,9 +1503,9 @@
 	if (!map->osd_primary_affinity) {
 		int i;
 
-		map->osd_primary_affinity = kmalloc_array(map->max_osd,
-							  sizeof(u32),
-							  GFP_NOFS);
+		map->osd_primary_affinity = ceph_kvmalloc(
+		    array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
+		    GFP_NOFS);
 		if (!map->osd_primary_affinity)
 			return -ENOMEM;
 
@@ -1331,7 +1526,7 @@
 
 	ceph_decode_32_safe(p, end, len, e_inval);
 	if (len == 0) {
-		kfree(map->osd_primary_affinity);
+		kvfree(map->osd_primary_affinity);
 		map->osd_primary_affinity = NULL;
 		return 0;
 	}
@@ -1499,11 +1694,9 @@
 
 	/* osd_state, osd_weight, osd_addrs->client_addr */
 	ceph_decode_need(p, end, 3*sizeof(u32) +
-			 map->max_osd*((struct_v >= 5 ? sizeof(u32) :
-							sizeof(u8)) +
-				       sizeof(*map->osd_weight) +
-				       sizeof(*map->osd_addr)), e_inval);
-
+			 map->max_osd*(struct_v >= 5 ? sizeof(u32) :
+						       sizeof(u8)) +
+				       sizeof(*map->osd_weight), e_inval);
 	if (ceph_decode_32(p) != map->max_osd)
 		goto e_inval;
 
@@ -1524,9 +1717,11 @@
 	if (ceph_decode_32(p) != map->max_osd)
 		goto e_inval;
 
-	ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
-	for (i = 0; i < map->max_osd; i++)
-		ceph_decode_addr(&map->osd_addr[i]);
+	for (i = 0; i < map->max_osd; i++) {
+		err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]);
+		if (err)
+			goto bad;
+	}
 
 	/* pg_temp */
 	err = decode_pg_temp(p, end, map);
@@ -1628,12 +1823,17 @@
 	void *new_state;
 	void *new_weight_end;
 	u32 len;
+	int i;
 
 	new_up_client = *p;
 	ceph_decode_32_safe(p, end, len, e_inval);
-	len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
-	ceph_decode_need(p, end, len, e_inval);
-	*p += len;
+	for (i = 0; i < len; ++i) {
+		struct ceph_entity_addr addr;
+
+		ceph_decode_skip_32(p, end, e_inval);
+		if (ceph_decode_entity_addr(p, end, &addr))
+			goto e_inval;
+	}
 
 	new_state = *p;
 	ceph_decode_32_safe(p, end, len, e_inval);
@@ -1709,9 +1909,9 @@
 		struct ceph_entity_addr addr;
 
 		osd = ceph_decode_32(p);
-		ceph_decode_copy(p, &addr, sizeof(addr));
-		ceph_decode_addr(&addr);
 		BUG_ON(osd >= map->max_osd);
+		if (ceph_decode_entity_addr(p, end, &addr))
+			goto e_inval;
 		pr_info("osd%d up\n", osd);
 		map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
 		map->osd_addr[osd] = addr;
@@ -1808,7 +2008,7 @@
 		struct ceph_pg_pool_info *pi;
 
 		ceph_decode_64_safe(p, end, pool, e_inval);
-		pi = __lookup_pg_pool(&map->pg_pools, pool);
+		pi = lookup_pg_pool(&map->pg_pools, pool);
 		if (pi)
 			__remove_pg_pool(&map->pg_pools, pi);
 	}
@@ -2258,6 +2458,7 @@
 		    s64 choose_args_index)
 {
 	struct crush_choose_arg_map *arg_map;
+	struct crush_work *work;
 	int r;
 
 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2268,12 +2469,11 @@
 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
 						CEPH_DEFAULT_CHOOSE_ARGS);
 
-	mutex_lock(&map->crush_workspace_mutex);
+	work = get_workspace(&map->crush_wsm, map->crush);
 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-			  weight, weight_max, map->crush_workspace,
+			  weight, weight_max, work,
 			  arg_map ? arg_map->args : NULL);
-	mutex_unlock(&map->crush_workspace_mutex);
-
+	put_workspace(&map->crush_wsm, work);
 	return r;
 }
 
@@ -2651,3 +2851,221 @@
 	return acting.primary;
 }
 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
+
+static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
+					      size_t name_len)
+{
+	struct crush_loc_node *loc;
+
+	loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
+	if (!loc)
+		return NULL;
+
+	RB_CLEAR_NODE(&loc->cl_node);
+	return loc;
+}
+
+static void free_crush_loc(struct crush_loc_node *loc)
+{
+	WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
+
+	kfree(loc);
+}
+
+static int crush_loc_compare(const struct crush_loc *loc1,
+			     const struct crush_loc *loc2)
+{
+	return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
+	       strcmp(loc1->cl_name, loc2->cl_name);
+}
+
+DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
+		 RB_BYPTR, const struct crush_loc *, cl_node)
+
+/*
+ * Parses a set of <bucket type name>':'<bucket name> pairs separated
+ * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
+ *
+ * Note that @crush_location is modified by strsep().
+ */
+int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
+{
+	struct crush_loc_node *loc;
+	const char *type_name, *name, *colon;
+	size_t type_name_len, name_len;
+
+	dout("%s '%s'\n", __func__, crush_location);
+	while ((type_name = strsep(&crush_location, "|"))) {
+		colon = strchr(type_name, ':');
+		if (!colon)
+			return -EINVAL;
+
+		type_name_len = colon - type_name;
+		if (type_name_len == 0)
+			return -EINVAL;
+
+		name = colon + 1;
+		name_len = strlen(name);
+		if (name_len == 0)
+			return -EINVAL;
+
+		loc = alloc_crush_loc(type_name_len, name_len);
+		if (!loc)
+			return -ENOMEM;
+
+		loc->cl_loc.cl_type_name = loc->cl_data;
+		memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
+		loc->cl_loc.cl_type_name[type_name_len] = '\0';
+
+		loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
+		memcpy(loc->cl_loc.cl_name, name, name_len);
+		loc->cl_loc.cl_name[name_len] = '\0';
+
+		if (!__insert_crush_loc(locs, loc)) {
+			free_crush_loc(loc);
+			return -EEXIST;
+		}
+
+		dout("%s type_name '%s' name '%s'\n", __func__,
+		     loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
+	}
+
+	return 0;
+}
+
+int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
+{
+	struct rb_node *n1 = rb_first(locs1);
+	struct rb_node *n2 = rb_first(locs2);
+	int ret;
+
+	for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
+		struct crush_loc_node *loc1 =
+		    rb_entry(n1, struct crush_loc_node, cl_node);
+		struct crush_loc_node *loc2 =
+		    rb_entry(n2, struct crush_loc_node, cl_node);
+
+		ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
+		if (ret)
+			return ret;
+	}
+
+	if (!n1 && n2)
+		return -1;
+	if (n1 && !n2)
+		return 1;
+	return 0;
+}
+
+void ceph_clear_crush_locs(struct rb_root *locs)
+{
+	while (!RB_EMPTY_ROOT(locs)) {
+		struct crush_loc_node *loc =
+		    rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
+
+		erase_crush_loc(locs, loc);
+		free_crush_loc(loc);
+	}
+}
+
+/*
+ * [a-zA-Z0-9-_.]+
+ */
+static bool is_valid_crush_name(const char *name)
+{
+	do {
+		if (!('a' <= *name && *name <= 'z') &&
+		    !('A' <= *name && *name <= 'Z') &&
+		    !('0' <= *name && *name <= '9') &&
+		    *name != '-' && *name != '_' && *name != '.')
+			return false;
+	} while (*++name != '\0');
+
+	return true;
+}
+
+/*
+ * Gets the parent of an item.  Returns its id (<0 because the
+ * parent is always a bucket), type id (>0 for the same reason,
+ * via @parent_type_id) and location (via @parent_loc).  If no
+ * parent, returns 0.
+ *
+ * Does a linear search, as there are no parent pointers of any
+ * kind.  Note that the result is ambigous for items that occur
+ * multiple times in the map.
+ */
+static int get_immediate_parent(struct crush_map *c, int id,
+				u16 *parent_type_id,
+				struct crush_loc *parent_loc)
+{
+	struct crush_bucket *b;
+	struct crush_name_node *type_cn, *cn;
+	int i, j;
+
+	for (i = 0; i < c->max_buckets; i++) {
+		b = c->buckets[i];
+		if (!b)
+			continue;
+
+		/* ignore per-class shadow hierarchy */
+		cn = lookup_crush_name(&c->names, b->id);
+		if (!cn || !is_valid_crush_name(cn->cn_name))
+			continue;
+
+		for (j = 0; j < b->size; j++) {
+			if (b->items[j] != id)
+				continue;
+
+			*parent_type_id = b->type;
+			type_cn = lookup_crush_name(&c->type_names, b->type);
+			parent_loc->cl_type_name = type_cn->cn_name;
+			parent_loc->cl_name = cn->cn_name;
+			return b->id;
+		}
+	}
+
+	return 0;  /* no parent */
+}
+
+/*
+ * Calculates the locality/distance from an item to a client
+ * location expressed in terms of CRUSH hierarchy as a set of
+ * (bucket type name, bucket name) pairs.  Specifically, looks
+ * for the lowest-valued bucket type for which the location of
+ * @id matches one of the locations in @locs, so for standard
+ * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
+ * a matching host is closer than a matching rack and a matching
+ * data center is closer than a matching zone.
+ *
+ * Specifying multiple locations (a "multipath" location) such
+ * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
+ * is a multimap.  The locality will be:
+ *
+ * - 3 for OSDs in racks foo1 and foo2
+ * - 8 for OSDs in data center bar
+ * - -1 for all other OSDs
+ *
+ * The lowest possible bucket type is 1, so the best locality
+ * for an OSD is 1 (i.e. a matching host).  Locality 0 would be
+ * the OSD itself.
+ */
+int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
+			    struct rb_root *locs)
+{
+	struct crush_loc loc;
+	u16 type_id;
+
+	/*
+	 * Instead of repeated get_immediate_parent() calls,
+	 * the location of @id could be obtained with a single
+	 * depth-first traversal.
+	 */
+	for (;;) {
+		id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
+		if (id >= 0)
+			return -1;  /* not local */
+
+		if (lookup_crush_loc(locs, &loc))
+			return type_id;
+	}
+}

--
Gitblit v1.6.2