hc
2024-05-13 9d77db3c730780c8ef5ccd4b66403ff5675cfe4e
kernel/net/ceph/osdmap.c
....@@ -138,6 +138,79 @@
138138 return -EINVAL;
139139 }
140140
141
+struct crush_name_node {
142
+ struct rb_node cn_node;
143
+ int cn_id;
144
+ char cn_name[];
145
+};
146
+
147
+static struct crush_name_node *alloc_crush_name(size_t name_len)
148
+{
149
+ struct crush_name_node *cn;
150
+
151
+ cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO);
152
+ if (!cn)
153
+ return NULL;
154
+
155
+ RB_CLEAR_NODE(&cn->cn_node);
156
+ return cn;
157
+}
158
+
159
+static void free_crush_name(struct crush_name_node *cn)
160
+{
161
+ WARN_ON(!RB_EMPTY_NODE(&cn->cn_node));
162
+
163
+ kfree(cn);
164
+}
165
+
166
+DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node)
167
+
168
+static int decode_crush_names(void **p, void *end, struct rb_root *root)
169
+{
170
+ u32 n;
171
+
172
+ ceph_decode_32_safe(p, end, n, e_inval);
173
+ while (n--) {
174
+ struct crush_name_node *cn;
175
+ int id;
176
+ u32 name_len;
177
+
178
+ ceph_decode_32_safe(p, end, id, e_inval);
179
+ ceph_decode_32_safe(p, end, name_len, e_inval);
180
+ ceph_decode_need(p, end, name_len, e_inval);
181
+
182
+ cn = alloc_crush_name(name_len);
183
+ if (!cn)
184
+ return -ENOMEM;
185
+
186
+ cn->cn_id = id;
187
+ memcpy(cn->cn_name, *p, name_len);
188
+ cn->cn_name[name_len] = '\0';
189
+ *p += name_len;
190
+
191
+ if (!__insert_crush_name(root, cn)) {
192
+ free_crush_name(cn);
193
+ return -EEXIST;
194
+ }
195
+ }
196
+
197
+ return 0;
198
+
199
+e_inval:
200
+ return -EINVAL;
201
+}
202
+
203
+void clear_crush_names(struct rb_root *root)
204
+{
205
+ while (!RB_EMPTY_ROOT(root)) {
206
+ struct crush_name_node *cn =
207
+ rb_entry(rb_first(root), struct crush_name_node, cn_node);
208
+
209
+ erase_crush_name(root, cn);
210
+ free_crush_name(cn);
211
+ }
212
+}
213
+
141214 static struct crush_choose_arg_map *alloc_choose_arg_map(void)
142215 {
143216 struct crush_choose_arg_map *arg_map;
....@@ -354,6 +427,8 @@
354427 if (c == NULL)
355428 return ERR_PTR(-ENOMEM);
356429
430
+ c->type_names = RB_ROOT;
431
+ c->names = RB_ROOT;
357432 c->choose_args = RB_ROOT;
358433
359434 /* set tunables to default values */
....@@ -495,9 +570,8 @@
495570 / sizeof(struct crush_rule_step))
496571 goto bad;
497572 #endif
498
- r = c->rules[i] = kmalloc(sizeof(*r) +
499
- yes*sizeof(struct crush_rule_step),
500
- GFP_NOFS);
573
+ r = kmalloc(struct_size(r, steps, yes), GFP_NOFS);
574
+ c->rules[i] = r;
501575 if (r == NULL)
502576 goto badmem;
503577 dout(" rule %d is at %p\n", i, r);
....@@ -511,8 +585,14 @@
511585 }
512586 }
513587
514
- ceph_decode_skip_map(p, end, 32, string, bad); /* type_map */
515
- ceph_decode_skip_map(p, end, 32, string, bad); /* name_map */
588
+ err = decode_crush_names(p, end, &c->type_names);
589
+ if (err)
590
+ goto fail;
591
+
592
+ err = decode_crush_names(p, end, &c->names);
593
+ if (err)
594
+ goto fail;
595
+
516596 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */
517597
518598 /* tunables */
....@@ -637,48 +717,11 @@
637717 /*
638718 * rbtree of pg pool info
639719 */
640
-static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
641
-{
642
- struct rb_node **p = &root->rb_node;
643
- struct rb_node *parent = NULL;
644
- struct ceph_pg_pool_info *pi = NULL;
645
-
646
- while (*p) {
647
- parent = *p;
648
- pi = rb_entry(parent, struct ceph_pg_pool_info, node);
649
- if (new->id < pi->id)
650
- p = &(*p)->rb_left;
651
- else if (new->id > pi->id)
652
- p = &(*p)->rb_right;
653
- else
654
- return -EEXIST;
655
- }
656
-
657
- rb_link_node(&new->node, parent, p);
658
- rb_insert_color(&new->node, root);
659
- return 0;
660
-}
661
-
662
-static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
663
-{
664
- struct ceph_pg_pool_info *pi;
665
- struct rb_node *n = root->rb_node;
666
-
667
- while (n) {
668
- pi = rb_entry(n, struct ceph_pg_pool_info, node);
669
- if (id < pi->id)
670
- n = n->rb_left;
671
- else if (id > pi->id)
672
- n = n->rb_right;
673
- else
674
- return pi;
675
- }
676
- return NULL;
677
-}
720
+DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node)
678721
679722 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id)
680723 {
681
- return __lookup_pg_pool(&map->pg_pools, id);
724
+ return lookup_pg_pool(&map->pg_pools, id);
682725 }
683726
684727 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
....@@ -691,8 +734,7 @@
691734 if (WARN_ON_ONCE(id > (u64) INT_MAX))
692735 return NULL;
693736
694
- pi = __lookup_pg_pool(&map->pg_pools, (int) id);
695
-
737
+ pi = lookup_pg_pool(&map->pg_pools, id);
696738 return pi ? pi->name : NULL;
697739 }
698740 EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
....@@ -715,14 +757,14 @@
715757 {
716758 struct ceph_pg_pool_info *pi;
717759
718
- pi = __lookup_pg_pool(&map->pg_pools, id);
760
+ pi = lookup_pg_pool(&map->pg_pools, id);
719761 return pi ? pi->flags : 0;
720762 }
721763 EXPORT_SYMBOL(ceph_pg_pool_flags);
722764
723765 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
724766 {
725
- rb_erase(&pi->node, root);
767
+ erase_pg_pool(root, pi);
726768 kfree(pi->name);
727769 kfree(pi);
728770 }
....@@ -904,7 +946,7 @@
904946 ceph_decode_32_safe(p, end, len, bad);
905947 dout(" pool %llu len %d\n", pool, len);
906948 ceph_decode_need(p, end, len, bad);
907
- pi = __lookup_pg_pool(&map->pg_pools, pool);
949
+ pi = lookup_pg_pool(&map->pg_pools, pool);
908950 if (pi) {
909951 char *name = kstrndup(*p, len, GFP_NOFS);
910952
....@@ -920,6 +962,143 @@
920962
921963 bad:
922964 return -EINVAL;
965
+}
966
+
967
+/*
968
+ * CRUSH workspaces
969
+ *
970
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
971
+ * Two simplifications: there is only one type of workspace and there
972
+ * is always at least one workspace.
973
+ */
974
+static struct crush_work *alloc_workspace(const struct crush_map *c)
975
+{
976
+ struct crush_work *work;
977
+ size_t work_size;
978
+
979
+ WARN_ON(!c->working_size);
980
+ work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
981
+ dout("%s work_size %zu bytes\n", __func__, work_size);
982
+
983
+ work = ceph_kvmalloc(work_size, GFP_NOIO);
984
+ if (!work)
985
+ return NULL;
986
+
987
+ INIT_LIST_HEAD(&work->item);
988
+ crush_init_workspace(c, work);
989
+ return work;
990
+}
991
+
992
+static void free_workspace(struct crush_work *work)
993
+{
994
+ WARN_ON(!list_empty(&work->item));
995
+ kvfree(work);
996
+}
997
+
998
+static void init_workspace_manager(struct workspace_manager *wsm)
999
+{
1000
+ INIT_LIST_HEAD(&wsm->idle_ws);
1001
+ spin_lock_init(&wsm->ws_lock);
1002
+ atomic_set(&wsm->total_ws, 0);
1003
+ wsm->free_ws = 0;
1004
+ init_waitqueue_head(&wsm->ws_wait);
1005
+}
1006
+
1007
+static void add_initial_workspace(struct workspace_manager *wsm,
1008
+ struct crush_work *work)
1009
+{
1010
+ WARN_ON(!list_empty(&wsm->idle_ws));
1011
+
1012
+ list_add(&work->item, &wsm->idle_ws);
1013
+ atomic_set(&wsm->total_ws, 1);
1014
+ wsm->free_ws = 1;
1015
+}
1016
+
1017
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
1018
+{
1019
+ struct crush_work *work;
1020
+
1021
+ while (!list_empty(&wsm->idle_ws)) {
1022
+ work = list_first_entry(&wsm->idle_ws, struct crush_work,
1023
+ item);
1024
+ list_del_init(&work->item);
1025
+ free_workspace(work);
1026
+ }
1027
+ atomic_set(&wsm->total_ws, 0);
1028
+ wsm->free_ws = 0;
1029
+}
1030
+
1031
+/*
1032
+ * Finds an available workspace or allocates a new one. If it's not
1033
+ * possible to allocate a new one, waits until there is one.
1034
+ */
1035
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
1036
+ const struct crush_map *c)
1037
+{
1038
+ struct crush_work *work;
1039
+ int cpus = num_online_cpus();
1040
+
1041
+again:
1042
+ spin_lock(&wsm->ws_lock);
1043
+ if (!list_empty(&wsm->idle_ws)) {
1044
+ work = list_first_entry(&wsm->idle_ws, struct crush_work,
1045
+ item);
1046
+ list_del_init(&work->item);
1047
+ wsm->free_ws--;
1048
+ spin_unlock(&wsm->ws_lock);
1049
+ return work;
1050
+
1051
+ }
1052
+ if (atomic_read(&wsm->total_ws) > cpus) {
1053
+ DEFINE_WAIT(wait);
1054
+
1055
+ spin_unlock(&wsm->ws_lock);
1056
+ prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
1057
+ if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
1058
+ schedule();
1059
+ finish_wait(&wsm->ws_wait, &wait);
1060
+ goto again;
1061
+ }
1062
+ atomic_inc(&wsm->total_ws);
1063
+ spin_unlock(&wsm->ws_lock);
1064
+
1065
+ work = alloc_workspace(c);
1066
+ if (!work) {
1067
+ atomic_dec(&wsm->total_ws);
1068
+ wake_up(&wsm->ws_wait);
1069
+
1070
+ /*
1071
+ * Do not return the error but go back to waiting. We
1072
+ * have the inital workspace and the CRUSH computation
1073
+ * time is bounded so we will get it eventually.
1074
+ */
1075
+ WARN_ON(atomic_read(&wsm->total_ws) < 1);
1076
+ goto again;
1077
+ }
1078
+ return work;
1079
+}
1080
+
1081
+/*
1082
+ * Puts a workspace back on the list or frees it if we have enough
1083
+ * idle ones sitting around.
1084
+ */
1085
+static void put_workspace(struct workspace_manager *wsm,
1086
+ struct crush_work *work)
1087
+{
1088
+ spin_lock(&wsm->ws_lock);
1089
+ if (wsm->free_ws <= num_online_cpus()) {
1090
+ list_add(&work->item, &wsm->idle_ws);
1091
+ wsm->free_ws++;
1092
+ spin_unlock(&wsm->ws_lock);
1093
+ goto wake;
1094
+ }
1095
+ spin_unlock(&wsm->ws_lock);
1096
+
1097
+ free_workspace(work);
1098
+ atomic_dec(&wsm->total_ws);
1099
+wake:
1100
+ if (wq_has_sleeper(&wsm->ws_wait))
1101
+ wake_up(&wsm->ws_wait);
9231102 }
9241103
9251104 /*
....@@ -939,7 +1118,8 @@
9391118 map->primary_temp = RB_ROOT;
9401119 map->pg_upmap = RB_ROOT;
9411120 map->pg_upmap_items = RB_ROOT;
942
- mutex_init(&map->crush_workspace_mutex);
1121
+
1122
+ init_workspace_manager(&map->crush_wsm);
9431123
9441124 return map;
9451125 }
....@@ -947,8 +1127,11 @@
9471127 void ceph_osdmap_destroy(struct ceph_osdmap *map)
9481128 {
9491129 dout("osdmap_destroy %p\n", map);
1130
+
9501131 if (map->crush)
9511132 crush_destroy(map->crush);
1133
+ cleanup_workspace_manager(&map->crush_wsm);
1134
+
9521135 while (!RB_EMPTY_ROOT(&map->pg_temp)) {
9531136 struct ceph_pg_mapping *pg =
9541137 rb_entry(rb_first(&map->pg_temp),
....@@ -983,11 +1166,10 @@
9831166 struct ceph_pg_pool_info, node);
9841167 __remove_pg_pool(&map->pg_pools, pi);
9851168 }
986
- kfree(map->osd_state);
987
- kfree(map->osd_weight);
988
- kfree(map->osd_addr);
989
- kfree(map->osd_primary_affinity);
990
- kfree(map->crush_workspace);
1169
+ kvfree(map->osd_state);
1170
+ kvfree(map->osd_weight);
1171
+ kvfree(map->osd_addr);
1172
+ kvfree(map->osd_primary_affinity);
9911173 kfree(map);
9921174 }
9931175
....@@ -996,28 +1178,41 @@
9961178 *
9971179 * The new elements are properly initialized.
9981180 */
999
-static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
1181
+static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
10001182 {
10011183 u32 *state;
10021184 u32 *weight;
10031185 struct ceph_entity_addr *addr;
1186
+ u32 to_copy;
10041187 int i;
10051188
1006
- state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
1007
- if (!state)
1189
+ dout("%s old %u new %u\n", __func__, map->max_osd, max);
1190
+ if (max == map->max_osd)
1191
+ return 0;
1192
+
1193
+ state = ceph_kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS);
1194
+ weight = ceph_kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS);
1195
+ addr = ceph_kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS);
1196
+ if (!state || !weight || !addr) {
1197
+ kvfree(state);
1198
+ kvfree(weight);
1199
+ kvfree(addr);
10081200 return -ENOMEM;
1201
+ }
1202
+
1203
+ to_copy = min(map->max_osd, max);
1204
+ if (map->osd_state) {
1205
+ memcpy(state, map->osd_state, to_copy * sizeof(*state));
1206
+ memcpy(weight, map->osd_weight, to_copy * sizeof(*weight));
1207
+ memcpy(addr, map->osd_addr, to_copy * sizeof(*addr));
1208
+ kvfree(map->osd_state);
1209
+ kvfree(map->osd_weight);
1210
+ kvfree(map->osd_addr);
1211
+ }
1212
+
10091213 map->osd_state = state;
1010
-
1011
- weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
1012
- if (!weight)
1013
- return -ENOMEM;
10141214 map->osd_weight = weight;
1015
-
1016
- addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
1017
- if (!addr)
1018
- return -ENOMEM;
10191215 map->osd_addr = addr;
1020
-
10211216 for (i = map->max_osd; i < max; i++) {
10221217 map->osd_state[i] = 0;
10231218 map->osd_weight[i] = CEPH_OSD_OUT;
....@@ -1027,12 +1222,16 @@
10271222 if (map->osd_primary_affinity) {
10281223 u32 *affinity;
10291224
1030
- affinity = krealloc(map->osd_primary_affinity,
1031
- max*sizeof(*affinity), GFP_NOFS);
1225
+ affinity = ceph_kvmalloc(array_size(max, sizeof(*affinity)),
1226
+ GFP_NOFS);
10321227 if (!affinity)
10331228 return -ENOMEM;
1034
- map->osd_primary_affinity = affinity;
10351229
1230
+ memcpy(affinity, map->osd_primary_affinity,
1231
+ to_copy * sizeof(*affinity));
1232
+ kvfree(map->osd_primary_affinity);
1233
+
1234
+ map->osd_primary_affinity = affinity;
10361235 for (i = map->max_osd; i < max; i++)
10371236 map->osd_primary_affinity[i] =
10381237 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
....@@ -1045,26 +1244,22 @@
10451244
10461245 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
10471246 {
1048
- void *workspace;
1049
- size_t work_size;
1247
+ struct crush_work *work;
10501248
10511249 if (IS_ERR(crush))
10521250 return PTR_ERR(crush);
10531251
1054
- work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
1055
- dout("%s work_size %zu bytes\n", __func__, work_size);
1056
- workspace = kmalloc(work_size, GFP_NOIO);
1057
- if (!workspace) {
1252
+ work = alloc_workspace(crush);
1253
+ if (!work) {
10581254 crush_destroy(crush);
10591255 return -ENOMEM;
10601256 }
1061
- crush_init_workspace(crush, workspace);
10621257
10631258 if (map->crush)
10641259 crush_destroy(map->crush);
1065
- kfree(map->crush_workspace);
1260
+ cleanup_workspace_manager(&map->crush_wsm);
10661261 map->crush = crush;
1067
- map->crush_workspace = workspace;
1262
+ add_initial_workspace(&map->crush_wsm, work);
10681263 return 0;
10691264 }
10701265
....@@ -1138,18 +1333,18 @@
11381333
11391334 ceph_decode_64_safe(p, end, pool, e_inval);
11401335
1141
- pi = __lookup_pg_pool(&map->pg_pools, pool);
1336
+ pi = lookup_pg_pool(&map->pg_pools, pool);
11421337 if (!incremental || !pi) {
11431338 pi = kzalloc(sizeof(*pi), GFP_NOFS);
11441339 if (!pi)
11451340 return -ENOMEM;
11461341
1342
+ RB_CLEAR_NODE(&pi->node);
11471343 pi->id = pool;
11481344
1149
- ret = __insert_pg_pool(&map->pg_pools, pi);
1150
- if (ret) {
1345
+ if (!__insert_pg_pool(&map->pg_pools, pi)) {
11511346 kfree(pi);
1152
- return ret;
1347
+ return -EEXIST;
11531348 }
11541349 }
11551350
....@@ -1308,9 +1503,9 @@
13081503 if (!map->osd_primary_affinity) {
13091504 int i;
13101505
1311
- map->osd_primary_affinity = kmalloc_array(map->max_osd,
1312
- sizeof(u32),
1313
- GFP_NOFS);
1506
+ map->osd_primary_affinity = ceph_kvmalloc(
1507
+ array_size(map->max_osd, sizeof(*map->osd_primary_affinity)),
1508
+ GFP_NOFS);
13141509 if (!map->osd_primary_affinity)
13151510 return -ENOMEM;
13161511
....@@ -1331,7 +1526,7 @@
13311526
13321527 ceph_decode_32_safe(p, end, len, e_inval);
13331528 if (len == 0) {
1334
- kfree(map->osd_primary_affinity);
1529
+ kvfree(map->osd_primary_affinity);
13351530 map->osd_primary_affinity = NULL;
13361531 return 0;
13371532 }
....@@ -1499,11 +1694,9 @@
14991694
15001695 /* osd_state, osd_weight, osd_addrs->client_addr */
15011696 ceph_decode_need(p, end, 3*sizeof(u32) +
1502
- map->max_osd*((struct_v >= 5 ? sizeof(u32) :
1503
- sizeof(u8)) +
1504
- sizeof(*map->osd_weight) +
1505
- sizeof(*map->osd_addr)), e_inval);
1506
-
1697
+ map->max_osd*(struct_v >= 5 ? sizeof(u32) :
1698
+ sizeof(u8)) +
1699
+ sizeof(*map->osd_weight), e_inval);
15071700 if (ceph_decode_32(p) != map->max_osd)
15081701 goto e_inval;
15091702
....@@ -1524,9 +1717,11 @@
15241717 if (ceph_decode_32(p) != map->max_osd)
15251718 goto e_inval;
15261719
1527
- ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
1528
- for (i = 0; i < map->max_osd; i++)
1529
- ceph_decode_addr(&map->osd_addr[i]);
1720
+ for (i = 0; i < map->max_osd; i++) {
1721
+ err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]);
1722
+ if (err)
1723
+ goto bad;
1724
+ }
15301725
15311726 /* pg_temp */
15321727 err = decode_pg_temp(p, end, map);
....@@ -1628,12 +1823,17 @@
16281823 void *new_state;
16291824 void *new_weight_end;
16301825 u32 len;
1826
+ int i;
16311827
16321828 new_up_client = *p;
16331829 ceph_decode_32_safe(p, end, len, e_inval);
1634
- len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
1635
- ceph_decode_need(p, end, len, e_inval);
1636
- *p += len;
1830
+ for (i = 0; i < len; ++i) {
1831
+ struct ceph_entity_addr addr;
1832
+
1833
+ ceph_decode_skip_32(p, end, e_inval);
1834
+ if (ceph_decode_entity_addr(p, end, &addr))
1835
+ goto e_inval;
1836
+ }
16371837
16381838 new_state = *p;
16391839 ceph_decode_32_safe(p, end, len, e_inval);
....@@ -1709,9 +1909,9 @@
17091909 struct ceph_entity_addr addr;
17101910
17111911 osd = ceph_decode_32(p);
1712
- ceph_decode_copy(p, &addr, sizeof(addr));
1713
- ceph_decode_addr(&addr);
17141912 BUG_ON(osd >= map->max_osd);
1913
+ if (ceph_decode_entity_addr(p, end, &addr))
1914
+ goto e_inval;
17151915 pr_info("osd%d up\n", osd);
17161916 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
17171917 map->osd_addr[osd] = addr;
....@@ -1808,7 +2008,7 @@
18082008 struct ceph_pg_pool_info *pi;
18092009
18102010 ceph_decode_64_safe(p, end, pool, e_inval);
1811
- pi = __lookup_pg_pool(&map->pg_pools, pool);
2011
+ pi = lookup_pg_pool(&map->pg_pools, pool);
18122012 if (pi)
18132013 __remove_pg_pool(&map->pg_pools, pi);
18142014 }
....@@ -2258,6 +2458,7 @@
22582458 s64 choose_args_index)
22592459 {
22602460 struct crush_choose_arg_map *arg_map;
2461
+ struct crush_work *work;
22612462 int r;
22622463
22632464 BUG_ON(result_max > CEPH_PG_MAX_SIZE);
....@@ -2268,12 +2469,11 @@
22682469 arg_map = lookup_choose_arg_map(&map->crush->choose_args,
22692470 CEPH_DEFAULT_CHOOSE_ARGS);
22702471
2271
- mutex_lock(&map->crush_workspace_mutex);
2472
+ work = get_workspace(&map->crush_wsm, map->crush);
22722473 r = crush_do_rule(map->crush, ruleno, x, result, result_max,
2273
- weight, weight_max, map->crush_workspace,
2474
+ weight, weight_max, work,
22742475 arg_map ? arg_map->args : NULL);
2275
- mutex_unlock(&map->crush_workspace_mutex);
2276
-
2476
+ put_workspace(&map->crush_wsm, work);
22772477 return r;
22782478 }
22792479
....@@ -2651,3 +2851,221 @@
26512851 return acting.primary;
26522852 }
26532853 EXPORT_SYMBOL(ceph_pg_to_acting_primary);
2854
+
2855
+static struct crush_loc_node *alloc_crush_loc(size_t type_name_len,
2856
+ size_t name_len)
2857
+{
2858
+ struct crush_loc_node *loc;
2859
+
2860
+ loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO);
2861
+ if (!loc)
2862
+ return NULL;
2863
+
2864
+ RB_CLEAR_NODE(&loc->cl_node);
2865
+ return loc;
2866
+}
2867
+
2868
+static void free_crush_loc(struct crush_loc_node *loc)
2869
+{
2870
+ WARN_ON(!RB_EMPTY_NODE(&loc->cl_node));
2871
+
2872
+ kfree(loc);
2873
+}
2874
+
2875
+static int crush_loc_compare(const struct crush_loc *loc1,
2876
+ const struct crush_loc *loc2)
2877
+{
2878
+ return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?:
2879
+ strcmp(loc1->cl_name, loc2->cl_name);
2880
+}
2881
+
2882
+DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare,
2883
+ RB_BYPTR, const struct crush_loc *, cl_node)
2884
+
2885
+/*
2886
+ * Parses a set of <bucket type name>':'<bucket name> pairs separated
2887
+ * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar".
2888
+ *
2889
+ * Note that @crush_location is modified by strsep().
2890
+ */
2891
+int ceph_parse_crush_location(char *crush_location, struct rb_root *locs)
2892
+{
2893
+ struct crush_loc_node *loc;
2894
+ const char *type_name, *name, *colon;
2895
+ size_t type_name_len, name_len;
2896
+
2897
+ dout("%s '%s'\n", __func__, crush_location);
2898
+ while ((type_name = strsep(&crush_location, "|"))) {
2899
+ colon = strchr(type_name, ':');
2900
+ if (!colon)
2901
+ return -EINVAL;
2902
+
2903
+ type_name_len = colon - type_name;
2904
+ if (type_name_len == 0)
2905
+ return -EINVAL;
2906
+
2907
+ name = colon + 1;
2908
+ name_len = strlen(name);
2909
+ if (name_len == 0)
2910
+ return -EINVAL;
2911
+
2912
+ loc = alloc_crush_loc(type_name_len, name_len);
2913
+ if (!loc)
2914
+ return -ENOMEM;
2915
+
2916
+ loc->cl_loc.cl_type_name = loc->cl_data;
2917
+ memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len);
2918
+ loc->cl_loc.cl_type_name[type_name_len] = '\0';
2919
+
2920
+ loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1;
2921
+ memcpy(loc->cl_loc.cl_name, name, name_len);
2922
+ loc->cl_loc.cl_name[name_len] = '\0';
2923
+
2924
+ if (!__insert_crush_loc(locs, loc)) {
2925
+ free_crush_loc(loc);
2926
+ return -EEXIST;
2927
+ }
2928
+
2929
+ dout("%s type_name '%s' name '%s'\n", __func__,
2930
+ loc->cl_loc.cl_type_name, loc->cl_loc.cl_name);
2931
+ }
2932
+
2933
+ return 0;
2934
+}
2935
+
2936
+int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2)
2937
+{
2938
+ struct rb_node *n1 = rb_first(locs1);
2939
+ struct rb_node *n2 = rb_first(locs2);
2940
+ int ret;
2941
+
2942
+ for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) {
2943
+ struct crush_loc_node *loc1 =
2944
+ rb_entry(n1, struct crush_loc_node, cl_node);
2945
+ struct crush_loc_node *loc2 =
2946
+ rb_entry(n2, struct crush_loc_node, cl_node);
2947
+
2948
+ ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc);
2949
+ if (ret)
2950
+ return ret;
2951
+ }
2952
+
2953
+ if (!n1 && n2)
2954
+ return -1;
2955
+ if (n1 && !n2)
2956
+ return 1;
2957
+ return 0;
2958
+}
2959
+
2960
+void ceph_clear_crush_locs(struct rb_root *locs)
2961
+{
2962
+ while (!RB_EMPTY_ROOT(locs)) {
2963
+ struct crush_loc_node *loc =
2964
+ rb_entry(rb_first(locs), struct crush_loc_node, cl_node);
2965
+
2966
+ erase_crush_loc(locs, loc);
2967
+ free_crush_loc(loc);
2968
+ }
2969
+}
2970
+
2971
+/*
2972
+ * [a-zA-Z0-9-_.]+
2973
+ */
2974
+static bool is_valid_crush_name(const char *name)
2975
+{
2976
+ do {
2977
+ if (!('a' <= *name && *name <= 'z') &&
2978
+ !('A' <= *name && *name <= 'Z') &&
2979
+ !('0' <= *name && *name <= '9') &&
2980
+ *name != '-' && *name != '_' && *name != '.')
2981
+ return false;
2982
+ } while (*++name != '\0');
2983
+
2984
+ return true;
2985
+}
2986
+
2987
+/*
2988
+ * Gets the parent of an item. Returns its id (<0 because the
2989
+ * parent is always a bucket), type id (>0 for the same reason,
2990
+ * via @parent_type_id) and location (via @parent_loc). If no
2991
+ * parent, returns 0.
2992
+ *
2993
+ * Does a linear search, as there are no parent pointers of any
2994
+ * kind. Note that the result is ambigous for items that occur
2995
+ * multiple times in the map.
2996
+ */
2997
+static int get_immediate_parent(struct crush_map *c, int id,
2998
+ u16 *parent_type_id,
2999
+ struct crush_loc *parent_loc)
3000
+{
3001
+ struct crush_bucket *b;
3002
+ struct crush_name_node *type_cn, *cn;
3003
+ int i, j;
3004
+
3005
+ for (i = 0; i < c->max_buckets; i++) {
3006
+ b = c->buckets[i];
3007
+ if (!b)
3008
+ continue;
3009
+
3010
+ /* ignore per-class shadow hierarchy */
3011
+ cn = lookup_crush_name(&c->names, b->id);
3012
+ if (!cn || !is_valid_crush_name(cn->cn_name))
3013
+ continue;
3014
+
3015
+ for (j = 0; j < b->size; j++) {
3016
+ if (b->items[j] != id)
3017
+ continue;
3018
+
3019
+ *parent_type_id = b->type;
3020
+ type_cn = lookup_crush_name(&c->type_names, b->type);
3021
+ parent_loc->cl_type_name = type_cn->cn_name;
3022
+ parent_loc->cl_name = cn->cn_name;
3023
+ return b->id;
3024
+ }
3025
+ }
3026
+
3027
+ return 0; /* no parent */
3028
+}
3029
+
3030
+/*
3031
+ * Calculates the locality/distance from an item to a client
3032
+ * location expressed in terms of CRUSH hierarchy as a set of
3033
+ * (bucket type name, bucket name) pairs. Specifically, looks
3034
+ * for the lowest-valued bucket type for which the location of
3035
+ * @id matches one of the locations in @locs, so for standard
3036
+ * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9)
3037
+ * a matching host is closer than a matching rack and a matching
3038
+ * data center is closer than a matching zone.
3039
+ *
3040
+ * Specifying multiple locations (a "multipath" location) such
3041
+ * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs
3042
+ * is a multimap. The locality will be:
3043
+ *
3044
+ * - 3 for OSDs in racks foo1 and foo2
3045
+ * - 8 for OSDs in data center bar
3046
+ * - -1 for all other OSDs
3047
+ *
3048
+ * The lowest possible bucket type is 1, so the best locality
3049
+ * for an OSD is 1 (i.e. a matching host). Locality 0 would be
3050
+ * the OSD itself.
3051
+ */
3052
+int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
3053
+ struct rb_root *locs)
3054
+{
3055
+ struct crush_loc loc;
3056
+ u16 type_id;
3057
+
3058
+ /*
3059
+ * Instead of repeated get_immediate_parent() calls,
3060
+ * the location of @id could be obtained with a single
3061
+ * depth-first traversal.
3062
+ */
3063
+ for (;;) {
3064
+ id = get_immediate_parent(osdmap->crush, id, &type_id, &loc);
3065
+ if (id >= 0)
3066
+ return -1; /* not local */
3067
+
3068
+ if (lookup_crush_loc(locs, &loc))
3069
+ return type_id;
3070
+ }
3071
+}