hc
2024-12-19 9370bb92b2d16684ee45cf24e879c93c509162da
kernel/net/ceph/osd_client.c
....@@ -126,6 +126,9 @@
126126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127127 }
128128
129
+/*
130
+ * Consumes @pages if @own_pages is true.
131
+ */
129132 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
130133 struct page **pages, u64 length, u32 alignment,
131134 bool pages_from_pool, bool own_pages)
....@@ -138,6 +141,9 @@
138141 osd_data->own_pages = own_pages;
139142 }
140143
144
+/*
145
+ * Consumes a ref on @pagelist.
146
+ */
141147 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
142148 struct ceph_pagelist *pagelist)
143149 {
....@@ -164,14 +170,6 @@
164170 osd_data->bvec_pos = *bvec_pos;
165171 osd_data->num_bvecs = num_bvecs;
166172 }
167
-
168
-#define osd_req_op_data(oreq, whch, typ, fld) \
169
-({ \
170
- struct ceph_osd_request *__oreq = (oreq); \
171
- unsigned int __whch = (whch); \
172
- BUG_ON(__whch >= __oreq->r_num_ops); \
173
- &__oreq->r_ops[__whch].typ.fld; \
174
-})
175173
176174 static struct ceph_osd_data *
177175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
....@@ -362,6 +360,8 @@
362360 num_pages = calc_pages_for((u64)osd_data->alignment,
363361 (u64)osd_data->length);
364362 ceph_release_page_vector(osd_data->pages, num_pages);
363
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
364
+ ceph_pagelist_release(osd_data->pagelist);
365365 }
366366 ceph_osd_data_init(osd_data);
367367 }
....@@ -401,6 +401,9 @@
401401 break;
402402 case CEPH_OSD_OP_LIST_WATCHERS:
403403 ceph_osd_data_release(&op->list_watchers.response_data);
404
+ break;
405
+ case CEPH_OSD_OP_COPY_FROM2:
406
+ ceph_osd_data_release(&op->copy_from.osd_data);
404407 break;
405408 default:
406409 break;
....@@ -445,6 +448,7 @@
445448 dest->recovery_deletes = src->recovery_deletes;
446449
447450 dest->flags = src->flags;
451
+ dest->used_replica = src->used_replica;
448452 dest->paused = src->paused;
449453
450454 dest->epoch = src->epoch;
....@@ -468,7 +472,7 @@
468472 {
469473 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
470474 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
471
- WARN_ON(!list_empty(&req->r_unsafe_item));
475
+ WARN_ON(!list_empty(&req->r_private_item));
472476 WARN_ON(req->r_osd);
473477 }
474478
....@@ -521,53 +525,16 @@
521525
522526 static void request_init(struct ceph_osd_request *req)
523527 {
524
- /* req only, each op is zeroed in _osd_req_op_init() */
528
+ /* req only, each op is zeroed in osd_req_op_init() */
525529 memset(req, 0, sizeof(*req));
526530
527531 kref_init(&req->r_kref);
528532 init_completion(&req->r_completion);
529533 RB_CLEAR_NODE(&req->r_node);
530534 RB_CLEAR_NODE(&req->r_mc_node);
531
- INIT_LIST_HEAD(&req->r_unsafe_item);
535
+ INIT_LIST_HEAD(&req->r_private_item);
532536
533537 target_init(&req->r_t);
534
-}
535
-
536
-/*
537
- * This is ugly, but it allows us to reuse linger registration and ping
538
- * requests, keeping the structure of the code around send_linger{_ping}()
539
- * reasonable. Setting up a min_nr=2 mempool for each linger request
540
- * and dealing with copying ops (this blasts req only, watch op remains
541
- * intact) isn't any better.
542
- */
543
-static void request_reinit(struct ceph_osd_request *req)
544
-{
545
- struct ceph_osd_client *osdc = req->r_osdc;
546
- bool mempool = req->r_mempool;
547
- unsigned int num_ops = req->r_num_ops;
548
- u64 snapid = req->r_snapid;
549
- struct ceph_snap_context *snapc = req->r_snapc;
550
- bool linger = req->r_linger;
551
- struct ceph_msg *request_msg = req->r_request;
552
- struct ceph_msg *reply_msg = req->r_reply;
553
-
554
- dout("%s req %p\n", __func__, req);
555
- WARN_ON(kref_read(&req->r_kref) != 1);
556
- request_release_checks(req);
557
-
558
- WARN_ON(kref_read(&request_msg->kref) != 1);
559
- WARN_ON(kref_read(&reply_msg->kref) != 1);
560
- target_destroy(&req->r_t);
561
-
562
- request_init(req);
563
- req->r_osdc = osdc;
564
- req->r_mempool = mempool;
565
- req->r_num_ops = num_ops;
566
- req->r_snapid = snapid;
567
- req->r_snapc = snapc;
568
- req->r_linger = linger;
569
- req->r_request = request_msg;
570
- req->r_reply = reply_msg;
571538 }
572539
573540 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
....@@ -607,12 +574,15 @@
607574 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
608575 }
609576
610
-int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
577
+static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
578
+ int num_request_data_items,
579
+ int num_reply_data_items)
611580 {
612581 struct ceph_osd_client *osdc = req->r_osdc;
613582 struct ceph_msg *msg;
614583 int msg_size;
615584
585
+ WARN_ON(req->r_request || req->r_reply);
616586 WARN_ON(ceph_oid_empty(&req->r_base_oid));
617587 WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
618588
....@@ -634,9 +604,11 @@
634604 msg_size += 4 + 8; /* retry_attempt, features */
635605
636606 if (req->r_mempool)
637
- msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
607
+ msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
608
+ num_request_data_items);
638609 else
639
- msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
610
+ msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
611
+ num_request_data_items, gfp, true);
640612 if (!msg)
641613 return -ENOMEM;
642614
....@@ -649,9 +621,11 @@
649621 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
650622
651623 if (req->r_mempool)
652
- msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
624
+ msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
625
+ num_reply_data_items);
653626 else
654
- msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
627
+ msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
628
+ num_reply_data_items, gfp, true);
655629 if (!msg)
656630 return -ENOMEM;
657631
....@@ -659,7 +633,6 @@
659633
660634 return 0;
661635 }
662
-EXPORT_SYMBOL(ceph_osdc_alloc_messages);
663636
664637 static bool osd_req_opcode_valid(u16 opcode)
665638 {
....@@ -672,13 +645,72 @@
672645 }
673646 }
674647
648
+static void get_num_data_items(struct ceph_osd_request *req,
649
+ int *num_request_data_items,
650
+ int *num_reply_data_items)
651
+{
652
+ struct ceph_osd_req_op *op;
653
+
654
+ *num_request_data_items = 0;
655
+ *num_reply_data_items = 0;
656
+
657
+ for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
658
+ switch (op->op) {
659
+ /* request */
660
+ case CEPH_OSD_OP_WRITE:
661
+ case CEPH_OSD_OP_WRITEFULL:
662
+ case CEPH_OSD_OP_SETXATTR:
663
+ case CEPH_OSD_OP_CMPXATTR:
664
+ case CEPH_OSD_OP_NOTIFY_ACK:
665
+ case CEPH_OSD_OP_COPY_FROM2:
666
+ *num_request_data_items += 1;
667
+ break;
668
+
669
+ /* reply */
670
+ case CEPH_OSD_OP_STAT:
671
+ case CEPH_OSD_OP_READ:
672
+ case CEPH_OSD_OP_LIST_WATCHERS:
673
+ *num_reply_data_items += 1;
674
+ break;
675
+
676
+ /* both */
677
+ case CEPH_OSD_OP_NOTIFY:
678
+ *num_request_data_items += 1;
679
+ *num_reply_data_items += 1;
680
+ break;
681
+ case CEPH_OSD_OP_CALL:
682
+ *num_request_data_items += 2;
683
+ *num_reply_data_items += 1;
684
+ break;
685
+
686
+ default:
687
+ WARN_ON(!osd_req_opcode_valid(op->op));
688
+ break;
689
+ }
690
+ }
691
+}
692
+
693
+/*
694
+ * oid, oloc and OSD op opcode(s) must be filled in before this function
695
+ * is called.
696
+ */
697
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
698
+{
699
+ int num_request_data_items, num_reply_data_items;
700
+
701
+ get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
702
+ return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
703
+ num_reply_data_items);
704
+}
705
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
706
+
675707 /*
676708 * This is an osd op init function for opcodes that have no data or
677709 * other information associated with them. It also serves as a
678710 * common init routine for all the other init functions, below.
679711 */
680
-static struct ceph_osd_req_op *
681
-_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
712
+struct ceph_osd_req_op *
713
+osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
682714 u16 opcode, u32 flags)
683715 {
684716 struct ceph_osd_req_op *op;
....@@ -693,12 +725,6 @@
693725
694726 return op;
695727 }
696
-
697
-void osd_req_op_init(struct ceph_osd_request *osd_req,
698
- unsigned int which, u16 opcode, u32 flags)
699
-{
700
- (void)_osd_req_op_init(osd_req, which, opcode, flags);
701
-}
702728 EXPORT_SYMBOL(osd_req_op_init);
703729
704730 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
....@@ -706,8 +732,8 @@
706732 u64 offset, u64 length,
707733 u64 truncate_size, u32 truncate_seq)
708734 {
709
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
710
- opcode, 0);
735
+ struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
736
+ opcode, 0);
711737 size_t payload_len = 0;
712738
713739 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
....@@ -753,7 +779,7 @@
753779 BUG_ON(which + 1 >= osd_req->r_num_ops);
754780
755781 prev_op = &osd_req->r_ops[which];
756
- op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
782
+ op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
757783 /* dup previous one */
758784 op->indata_len = prev_op->indata_len;
759785 op->outdata_len = prev_op->outdata_len;
....@@ -768,40 +794,45 @@
768794 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
769795
770796 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
771
- u16 opcode, const char *class, const char *method)
797
+ const char *class, const char *method)
772798 {
773
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
774
- opcode, 0);
799
+ struct ceph_osd_req_op *op;
775800 struct ceph_pagelist *pagelist;
776801 size_t payload_len = 0;
777802 size_t size;
803
+ int ret;
778804
779
- BUG_ON(opcode != CEPH_OSD_OP_CALL);
805
+ op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
780806
781
- pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
807
+ pagelist = ceph_pagelist_alloc(GFP_NOFS);
782808 if (!pagelist)
783809 return -ENOMEM;
784
-
785
- ceph_pagelist_init(pagelist);
786810
787811 op->cls.class_name = class;
788812 size = strlen(class);
789813 BUG_ON(size > (size_t) U8_MAX);
790814 op->cls.class_len = size;
791
- ceph_pagelist_append(pagelist, class, size);
815
+ ret = ceph_pagelist_append(pagelist, class, size);
816
+ if (ret)
817
+ goto err_pagelist_free;
792818 payload_len += size;
793819
794820 op->cls.method_name = method;
795821 size = strlen(method);
796822 BUG_ON(size > (size_t) U8_MAX);
797823 op->cls.method_len = size;
798
- ceph_pagelist_append(pagelist, method, size);
824
+ ret = ceph_pagelist_append(pagelist, method, size);
825
+ if (ret)
826
+ goto err_pagelist_free;
799827 payload_len += size;
800828
801829 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
802
-
803830 op->indata_len = payload_len;
804831 return 0;
832
+
833
+err_pagelist_free:
834
+ ceph_pagelist_release(pagelist);
835
+ return ret;
805836 }
806837 EXPORT_SYMBOL(osd_req_op_cls_init);
807838
....@@ -809,25 +840,28 @@
809840 u16 opcode, const char *name, const void *value,
810841 size_t size, u8 cmp_op, u8 cmp_mode)
811842 {
812
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
813
- opcode, 0);
843
+ struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
844
+ opcode, 0);
814845 struct ceph_pagelist *pagelist;
815846 size_t payload_len;
847
+ int ret;
816848
817849 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
818850
819
- pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
851
+ pagelist = ceph_pagelist_alloc(GFP_NOFS);
820852 if (!pagelist)
821853 return -ENOMEM;
822854
823
- ceph_pagelist_init(pagelist);
824
-
825855 payload_len = strlen(name);
826856 op->xattr.name_len = payload_len;
827
- ceph_pagelist_append(pagelist, name, payload_len);
857
+ ret = ceph_pagelist_append(pagelist, name, payload_len);
858
+ if (ret)
859
+ goto err_pagelist_free;
828860
829861 op->xattr.value_len = size;
830
- ceph_pagelist_append(pagelist, value, size);
862
+ ret = ceph_pagelist_append(pagelist, value, size);
863
+ if (ret)
864
+ goto err_pagelist_free;
831865 payload_len += size;
832866
833867 op->xattr.cmp_op = cmp_op;
....@@ -836,6 +870,10 @@
836870 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
837871 op->indata_len = payload_len;
838872 return 0;
873
+
874
+err_pagelist_free:
875
+ ceph_pagelist_release(pagelist);
876
+ return ret;
839877 }
840878 EXPORT_SYMBOL(osd_req_op_xattr_init);
841879
....@@ -843,27 +881,47 @@
843881 * @watch_opcode: CEPH_OSD_WATCH_OP_*
844882 */
845883 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
846
- u64 cookie, u8 watch_opcode)
884
+ u8 watch_opcode, u64 cookie, u32 gen)
847885 {
848886 struct ceph_osd_req_op *op;
849887
850
- op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
888
+ op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
851889 op->watch.cookie = cookie;
852890 op->watch.op = watch_opcode;
853
- op->watch.gen = 0;
891
+ op->watch.gen = gen;
854892 }
855893
894
+/*
895
+ * prot_ver, timeout and notify payload (may be empty) should already be
896
+ * encoded in @request_pl
897
+ */
898
+static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
899
+ u64 cookie, struct ceph_pagelist *request_pl)
900
+{
901
+ struct ceph_osd_req_op *op;
902
+
903
+ op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
904
+ op->notify.cookie = cookie;
905
+
906
+ ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
907
+ op->indata_len = request_pl->length;
908
+}
909
+
910
+/*
911
+ * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
912
+ */
856913 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
857914 unsigned int which,
858915 u64 expected_object_size,
859
- u64 expected_write_size)
916
+ u64 expected_write_size,
917
+ u32 flags)
860918 {
861
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
862
- CEPH_OSD_OP_SETALLOCHINT,
863
- 0);
919
+ struct ceph_osd_req_op *op;
864920
921
+ op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
865922 op->alloc_hint.expected_object_size = expected_object_size;
866923 op->alloc_hint.expected_write_size = expected_write_size;
924
+ op->alloc_hint.flags = flags;
867925
868926 /*
869927 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
....@@ -883,7 +941,7 @@
883941 BUG_ON(length > (u64) SIZE_MAX);
884942 if (length)
885943 ceph_msg_data_add_pages(msg, osd_data->pages,
886
- length, osd_data->alignment);
944
+ length, osd_data->alignment, false);
887945 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
888946 BUG_ON(!length);
889947 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
....@@ -901,12 +959,6 @@
901959 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
902960 const struct ceph_osd_req_op *src)
903961 {
904
- if (WARN_ON(!osd_req_opcode_valid(src->op))) {
905
- pr_err("unrecognized osd opcode %d\n", src->op);
906
-
907
- return 0;
908
- }
909
-
910962 switch (src->op) {
911963 case CEPH_OSD_OP_STAT:
912964 break;
....@@ -945,6 +997,7 @@
945997 cpu_to_le64(src->alloc_hint.expected_object_size);
946998 dst->alloc_hint.expected_write_size =
947999 cpu_to_le64(src->alloc_hint.expected_write_size);
1000
+ dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
9481001 break;
9491002 case CEPH_OSD_OP_SETXATTR:
9501003 case CEPH_OSD_OP_CMPXATTR:
....@@ -955,6 +1008,14 @@
9551008 break;
9561009 case CEPH_OSD_OP_CREATE:
9571010 case CEPH_OSD_OP_DELETE:
1011
+ break;
1012
+ case CEPH_OSD_OP_COPY_FROM2:
1013
+ dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1014
+ dst->copy_from.src_version =
1015
+ cpu_to_le64(src->copy_from.src_version);
1016
+ dst->copy_from.flags = src->copy_from.flags;
1017
+ dst->copy_from.src_fadvise_flags =
1018
+ cpu_to_le32(src->copy_from.src_fadvise_flags);
9581019 break;
9591020 default:
9601021 pr_err("unsupported osd opcode %s\n",
....@@ -1030,16 +1091,24 @@
10301091 truncate_size, truncate_seq);
10311092 }
10321093
1033
- req->r_flags = flags;
10341094 req->r_base_oloc.pool = layout->pool_id;
10351095 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
10361096 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1097
+ req->r_flags = flags | osdc->client->options->read_from_replica;
10371098
10381099 req->r_snapid = vino.snap;
10391100 if (flags & CEPH_OSD_FLAG_WRITE)
10401101 req->r_data_offset = off;
10411102
1042
- r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1103
+ if (num_ops > 1)
1104
+ /*
1105
+ * This is a special case for ceph_writepages_start(), but it
1106
+ * also covers ceph_uninline_data(). If more multi-op request
1107
+ * use cases emerge, we will need a separate helper.
1108
+ */
1109
+ r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
1110
+ else
1111
+ r = ceph_osdc_alloc_messages(req, GFP_NOFS);
10431112 if (r)
10441113 goto fail;
10451114
....@@ -1408,6 +1477,45 @@
14081477 (osdc->osdmap->epoch < osdc->epoch_barrier);
14091478 }
14101479
1480
+static int pick_random_replica(const struct ceph_osds *acting)
1481
+{
1482
+ int i = prandom_u32() % acting->size;
1483
+
1484
+ dout("%s picked osd%d, primary osd%d\n", __func__,
1485
+ acting->osds[i], acting->primary);
1486
+ return i;
1487
+}
1488
+
1489
+/*
1490
+ * Picks the closest replica based on client's location given by
1491
+ * crush_location option. Prefers the primary if the locality is
1492
+ * the same.
1493
+ */
1494
+static int pick_closest_replica(struct ceph_osd_client *osdc,
1495
+ const struct ceph_osds *acting)
1496
+{
1497
+ struct ceph_options *opt = osdc->client->options;
1498
+ int best_i, best_locality;
1499
+ int i = 0, locality;
1500
+
1501
+ do {
1502
+ locality = ceph_get_crush_locality(osdc->osdmap,
1503
+ acting->osds[i],
1504
+ &opt->crush_locs);
1505
+ if (i == 0 ||
1506
+ (locality >= 0 && best_locality < 0) ||
1507
+ (locality >= 0 && best_locality >= 0 &&
1508
+ locality < best_locality)) {
1509
+ best_i = i;
1510
+ best_locality = locality;
1511
+ }
1512
+ } while (++i < acting->size);
1513
+
1514
+ dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1515
+ acting->osds[best_i], best_locality, acting->primary);
1516
+ return best_i;
1517
+}
1518
+
14111519 enum calc_target_result {
14121520 CALC_TARGET_NO_ACTION = 0,
14131521 CALC_TARGET_NEED_RESEND,
....@@ -1416,12 +1524,13 @@
14161524
14171525 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
14181526 struct ceph_osd_request_target *t,
1419
- struct ceph_connection *con,
14201527 bool any_change)
14211528 {
14221529 struct ceph_pg_pool_info *pi;
14231530 struct ceph_pg pgid, last_pgid;
14241531 struct ceph_osds up, acting;
1532
+ bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1533
+ bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
14251534 bool force_resend = false;
14261535 bool unpaused = false;
14271536 bool legacy_change = false;
....@@ -1452,9 +1561,9 @@
14521561 ceph_oid_copy(&t->target_oid, &t->base_oid);
14531562 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
14541563 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1455
- if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1564
+ if (is_read && pi->read_tier >= 0)
14561565 t->target_oloc.pool = pi->read_tier;
1457
- if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1566
+ if (is_write && pi->write_tier >= 0)
14581567 t->target_oloc.pool = pi->write_tier;
14591568
14601569 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
....@@ -1493,7 +1602,8 @@
14931602 unpaused = true;
14941603 }
14951604 legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1496
- ceph_osds_changed(&t->acting, &acting, any_change);
1605
+ ceph_osds_changed(&t->acting, &acting,
1606
+ t->used_replica || any_change);
14971607 if (t->pg_num)
14981608 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
14991609
....@@ -1509,7 +1619,24 @@
15091619 t->sort_bitwise = sort_bitwise;
15101620 t->recovery_deletes = recovery_deletes;
15111621
1512
- t->osd = acting.primary;
1622
+ if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1623
+ CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1624
+ !is_write && pi->type == CEPH_POOL_TYPE_REP &&
1625
+ acting.size > 1) {
1626
+ int pos;
1627
+
1628
+ WARN_ON(!is_read || acting.osds[0] != acting.primary);
1629
+ if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1630
+ pos = pick_random_replica(&acting);
1631
+ } else {
1632
+ pos = pick_closest_replica(osdc, &acting);
1633
+ }
1634
+ t->osd = acting.osds[pos];
1635
+ t->used_replica = pos > 0;
1636
+ } else {
1637
+ t->osd = acting.primary;
1638
+ t->used_replica = false;
1639
+ }
15131640 }
15141641
15151642 if (unpaused || legacy_change || force_resend || split)
....@@ -1845,48 +1972,55 @@
18451972 return true;
18461973 }
18471974
1848
-static void setup_request_data(struct ceph_osd_request *req,
1849
- struct ceph_msg *msg)
1975
+/*
1976
+ * Keep get_num_data_items() in sync with this function.
1977
+ */
1978
+static void setup_request_data(struct ceph_osd_request *req)
18501979 {
1851
- u32 data_len = 0;
1852
- int i;
1980
+ struct ceph_msg *request_msg = req->r_request;
1981
+ struct ceph_msg *reply_msg = req->r_reply;
1982
+ struct ceph_osd_req_op *op;
18531983
1854
- if (!list_empty(&msg->data))
1984
+ if (req->r_request->num_data_items || req->r_reply->num_data_items)
18551985 return;
18561986
1857
- WARN_ON(msg->data_length);
1858
- for (i = 0; i < req->r_num_ops; i++) {
1859
- struct ceph_osd_req_op *op = &req->r_ops[i];
1860
-
1987
+ WARN_ON(request_msg->data_length || reply_msg->data_length);
1988
+ for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
18611989 switch (op->op) {
18621990 /* request */
18631991 case CEPH_OSD_OP_WRITE:
18641992 case CEPH_OSD_OP_WRITEFULL:
18651993 WARN_ON(op->indata_len != op->extent.length);
1866
- ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1994
+ ceph_osdc_msg_data_add(request_msg,
1995
+ &op->extent.osd_data);
18671996 break;
18681997 case CEPH_OSD_OP_SETXATTR:
18691998 case CEPH_OSD_OP_CMPXATTR:
18701999 WARN_ON(op->indata_len != op->xattr.name_len +
18712000 op->xattr.value_len);
1872
- ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
2001
+ ceph_osdc_msg_data_add(request_msg,
2002
+ &op->xattr.osd_data);
18732003 break;
18742004 case CEPH_OSD_OP_NOTIFY_ACK:
1875
- ceph_osdc_msg_data_add(msg,
2005
+ ceph_osdc_msg_data_add(request_msg,
18762006 &op->notify_ack.request_data);
2007
+ break;
2008
+ case CEPH_OSD_OP_COPY_FROM2:
2009
+ ceph_osdc_msg_data_add(request_msg,
2010
+ &op->copy_from.osd_data);
18772011 break;
18782012
18792013 /* reply */
18802014 case CEPH_OSD_OP_STAT:
1881
- ceph_osdc_msg_data_add(req->r_reply,
2015
+ ceph_osdc_msg_data_add(reply_msg,
18822016 &op->raw_data_in);
18832017 break;
18842018 case CEPH_OSD_OP_READ:
1885
- ceph_osdc_msg_data_add(req->r_reply,
2019
+ ceph_osdc_msg_data_add(reply_msg,
18862020 &op->extent.osd_data);
18872021 break;
18882022 case CEPH_OSD_OP_LIST_WATCHERS:
1889
- ceph_osdc_msg_data_add(req->r_reply,
2023
+ ceph_osdc_msg_data_add(reply_msg,
18902024 &op->list_watchers.response_data);
18912025 break;
18922026
....@@ -1895,25 +2029,23 @@
18952029 WARN_ON(op->indata_len != op->cls.class_len +
18962030 op->cls.method_len +
18972031 op->cls.indata_len);
1898
- ceph_osdc_msg_data_add(msg, &op->cls.request_info);
2032
+ ceph_osdc_msg_data_add(request_msg,
2033
+ &op->cls.request_info);
18992034 /* optional, can be NONE */
1900
- ceph_osdc_msg_data_add(msg, &op->cls.request_data);
2035
+ ceph_osdc_msg_data_add(request_msg,
2036
+ &op->cls.request_data);
19012037 /* optional, can be NONE */
1902
- ceph_osdc_msg_data_add(req->r_reply,
2038
+ ceph_osdc_msg_data_add(reply_msg,
19032039 &op->cls.response_data);
19042040 break;
19052041 case CEPH_OSD_OP_NOTIFY:
1906
- ceph_osdc_msg_data_add(msg,
2042
+ ceph_osdc_msg_data_add(request_msg,
19072043 &op->notify.request_data);
1908
- ceph_osdc_msg_data_add(req->r_reply,
2044
+ ceph_osdc_msg_data_add(reply_msg,
19092045 &op->notify.response_data);
19102046 break;
19112047 }
1912
-
1913
- data_len += op->indata_len;
19142048 }
1915
-
1916
- WARN_ON(data_len != msg->data_length);
19172049 }
19182050
19192051 static void encode_pgid(void **p, const struct ceph_pg *pgid)
....@@ -1961,7 +2093,7 @@
19612093 req->r_data_offset || req->r_snapc);
19622094 }
19632095
1964
- setup_request_data(req, msg);
2096
+ setup_request_data(req);
19652097
19662098 encode_spgid(&p, &req->r_t.spgid); /* actual spg */
19672099 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
....@@ -2195,7 +2327,7 @@
21952327 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
21962328
21972329 again:
2198
- ct_res = calc_target(osdc, &req->r_t, NULL, false);
2330
+ ct_res = calc_target(osdc, &req->r_t, false);
21992331 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
22002332 goto promote;
22012333
....@@ -2229,7 +2361,7 @@
22292361 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
22302362 pool_full(osdc, req->r_t.base_oloc.pool))) {
22312363 dout("req %p full/pool_full\n", req);
2232
- if (osdc->abort_on_full) {
2364
+ if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
22332365 err = -ENOSPC;
22342366 } else {
22352367 pr_warn_ratelimited("FULL or reached pool quota\n");
....@@ -2280,6 +2412,7 @@
22802412 atomic_inc(&req->r_osdc->num_requests);
22812413
22822414 req->r_start_stamp = jiffies;
2415
+ req->r_start_latency = ktime_get();
22832416 }
22842417
22852418 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
....@@ -2295,6 +2428,8 @@
22952428
22962429 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
22972430 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2431
+
2432
+ req->r_end_latency = ktime_get();
22982433
22992434 if (req->r_osd)
23002435 unlink_request(req->r_osd, req);
....@@ -2312,7 +2447,7 @@
23122447
23132448 static void __complete_request(struct ceph_osd_request *req)
23142449 {
2315
- dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
2450
+ dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
23162451 req->r_tid, req->r_callback, req->r_result);
23172452
23182453 if (req->r_callback)
....@@ -2399,6 +2534,14 @@
23992534 }
24002535 EXPORT_SYMBOL(ceph_osdc_abort_requests);
24012536
2537
+void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2538
+{
2539
+ down_write(&osdc->lock);
2540
+ osdc->abort_err = 0;
2541
+ up_write(&osdc->lock);
2542
+}
2543
+EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2544
+
24022545 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
24032546 {
24042547 if (likely(eb > osdc->epoch_barrier)) {
....@@ -2459,7 +2602,7 @@
24592602 {
24602603 bool victims = false;
24612604
2462
- if (osdc->abort_on_full &&
2605
+ if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
24632606 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
24642607 for_each_request(osdc, abort_on_full_fn, &victims);
24652608 }
....@@ -2563,10 +2706,13 @@
25632706 WARN_ON(!list_empty(&lreq->pending_lworks));
25642707 WARN_ON(lreq->osd);
25652708
2566
- if (lreq->reg_req)
2567
- ceph_osdc_put_request(lreq->reg_req);
2568
- if (lreq->ping_req)
2569
- ceph_osdc_put_request(lreq->ping_req);
2709
+ if (lreq->request_pl)
2710
+ ceph_pagelist_release(lreq->request_pl);
2711
+ if (lreq->notify_id_pages)
2712
+ ceph_release_page_vector(lreq->notify_id_pages, 1);
2713
+
2714
+ ceph_osdc_put_request(lreq->reg_req);
2715
+ ceph_osdc_put_request(lreq->ping_req);
25702716 target_destroy(&lreq->t);
25712717 kfree(lreq);
25722718 }
....@@ -2835,6 +2981,12 @@
28352981 struct ceph_osd_linger_request *lreq = req->r_priv;
28362982
28372983 mutex_lock(&lreq->lock);
2984
+ if (req != lreq->reg_req) {
2985
+ dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
2986
+ __func__, lreq, lreq->linger_id, req, lreq->reg_req);
2987
+ goto out;
2988
+ }
2989
+
28382990 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
28392991 lreq->linger_id, req->r_result);
28402992 linger_reg_commit_complete(lreq, req->r_result);
....@@ -2858,6 +3010,7 @@
28583010 }
28593011 }
28603012
3013
+out:
28613014 mutex_unlock(&lreq->lock);
28623015 linger_put(lreq);
28633016 }
....@@ -2880,6 +3033,12 @@
28803033 struct ceph_osd_linger_request *lreq = req->r_priv;
28813034
28823035 mutex_lock(&lreq->lock);
3036
+ if (req != lreq->reg_req) {
3037
+ dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3038
+ __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3039
+ goto out;
3040
+ }
3041
+
28833042 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
28843043 lreq, lreq->linger_id, req->r_result, lreq->last_error);
28853044 if (req->r_result < 0) {
....@@ -2889,48 +3048,64 @@
28893048 }
28903049 }
28913050
3051
+out:
28923052 mutex_unlock(&lreq->lock);
28933053 linger_put(lreq);
28943054 }
28953055
28963056 static void send_linger(struct ceph_osd_linger_request *lreq)
28973057 {
2898
- struct ceph_osd_request *req = lreq->reg_req;
2899
- struct ceph_osd_req_op *op = &req->r_ops[0];
3058
+ struct ceph_osd_client *osdc = lreq->osdc;
3059
+ struct ceph_osd_request *req;
3060
+ int ret;
29003061
2901
- verify_osdc_wrlocked(req->r_osdc);
3062
+ verify_osdc_wrlocked(osdc);
3063
+ mutex_lock(&lreq->lock);
29023064 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
29033065
2904
- if (req->r_osd)
2905
- cancel_linger_request(req);
3066
+ if (lreq->reg_req) {
3067
+ if (lreq->reg_req->r_osd)
3068
+ cancel_linger_request(lreq->reg_req);
3069
+ ceph_osdc_put_request(lreq->reg_req);
3070
+ }
29063071
2907
- request_reinit(req);
2908
- ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2909
- ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2910
- req->r_flags = lreq->t.flags;
3072
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3073
+ BUG_ON(!req);
3074
+
3075
+ target_copy(&req->r_t, &lreq->t);
29113076 req->r_mtime = lreq->mtime;
29123077
2913
- mutex_lock(&lreq->lock);
29143078 if (lreq->is_watch && lreq->committed) {
2915
- WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2916
- op->watch.cookie != lreq->linger_id);
2917
- op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2918
- op->watch.gen = ++lreq->register_gen;
3079
+ osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT,
3080
+ lreq->linger_id, ++lreq->register_gen);
29193081 dout("lreq %p reconnect register_gen %u\n", lreq,
2920
- op->watch.gen);
3082
+ req->r_ops[0].watch.gen);
29213083 req->r_callback = linger_reconnect_cb;
29223084 } else {
2923
- if (!lreq->is_watch)
3085
+ if (lreq->is_watch) {
3086
+ osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH,
3087
+ lreq->linger_id, 0);
3088
+ } else {
29243089 lreq->notify_id = 0;
2925
- else
2926
- WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
3090
+
3091
+ refcount_inc(&lreq->request_pl->refcnt);
3092
+ osd_req_op_notify_init(req, 0, lreq->linger_id,
3093
+ lreq->request_pl);
3094
+ ceph_osd_data_pages_init(
3095
+ osd_req_op_data(req, 0, notify, response_data),
3096
+ lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
3097
+ }
29273098 dout("lreq %p register\n", lreq);
29283099 req->r_callback = linger_commit_cb;
29293100 }
2930
- mutex_unlock(&lreq->lock);
3101
+
3102
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3103
+ BUG_ON(ret);
29313104
29323105 req->r_priv = linger_get(lreq);
29333106 req->r_linger = true;
3107
+ lreq->reg_req = req;
3108
+ mutex_unlock(&lreq->lock);
29343109
29353110 submit_request(req, true);
29363111 }
....@@ -2940,6 +3115,12 @@
29403115 struct ceph_osd_linger_request *lreq = req->r_priv;
29413116
29423117 mutex_lock(&lreq->lock);
3118
+ if (req != lreq->ping_req) {
3119
+ dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3120
+ __func__, lreq, lreq->linger_id, req, lreq->ping_req);
3121
+ goto out;
3122
+ }
3123
+
29433124 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
29443125 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
29453126 lreq->last_error);
....@@ -2955,6 +3136,7 @@
29553136 lreq->register_gen, req->r_ops[0].watch.gen);
29563137 }
29573138
3139
+out:
29583140 mutex_unlock(&lreq->lock);
29593141 linger_put(lreq);
29603142 }
....@@ -2962,8 +3144,8 @@
29623144 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
29633145 {
29643146 struct ceph_osd_client *osdc = lreq->osdc;
2965
- struct ceph_osd_request *req = lreq->ping_req;
2966
- struct ceph_osd_req_op *op = &req->r_ops[0];
3147
+ struct ceph_osd_request *req;
3148
+ int ret;
29673149
29683150 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
29693151 dout("%s PAUSERD\n", __func__);
....@@ -2975,19 +3157,26 @@
29753157 __func__, lreq, lreq->linger_id, lreq->ping_sent,
29763158 lreq->register_gen);
29773159
2978
- if (req->r_osd)
2979
- cancel_linger_request(req);
3160
+ if (lreq->ping_req) {
3161
+ if (lreq->ping_req->r_osd)
3162
+ cancel_linger_request(lreq->ping_req);
3163
+ ceph_osdc_put_request(lreq->ping_req);
3164
+ }
29803165
2981
- request_reinit(req);
3166
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3167
+ BUG_ON(!req);
3168
+
29823169 target_copy(&req->r_t, &lreq->t);
2983
-
2984
- WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2985
- op->watch.cookie != lreq->linger_id ||
2986
- op->watch.op != CEPH_OSD_WATCH_OP_PING);
2987
- op->watch.gen = lreq->register_gen;
3170
+ osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id,
3171
+ lreq->register_gen);
29883172 req->r_callback = linger_ping_cb;
3173
+
3174
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3175
+ BUG_ON(ret);
3176
+
29893177 req->r_priv = linger_get(lreq);
29903178 req->r_linger = true;
3179
+ lreq->ping_req = req;
29913180
29923181 ceph_osdc_get_request(req);
29933182 account_request(req);
....@@ -3001,11 +3190,15 @@
30013190 struct ceph_osd_client *osdc = lreq->osdc;
30023191 struct ceph_osd *osd;
30033192
3004
- calc_target(osdc, &lreq->t, NULL, false);
3193
+ down_write(&osdc->lock);
3194
+ linger_register(lreq);
3195
+
3196
+ calc_target(osdc, &lreq->t, false);
30053197 osd = lookup_create_osd(osdc, lreq->t.osd, true);
30063198 link_linger(osd, lreq);
30073199
30083200 send_linger(lreq);
3201
+ up_write(&osdc->lock);
30093202 }
30103203
30113204 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
....@@ -3030,9 +3223,9 @@
30303223 */
30313224 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
30323225 {
3033
- if (lreq->is_watch && lreq->ping_req->r_osd)
3226
+ if (lreq->ping_req && lreq->ping_req->r_osd)
30343227 cancel_linger_request(lreq->ping_req);
3035
- if (lreq->reg_req->r_osd)
3228
+ if (lreq->reg_req && lreq->reg_req->r_osd)
30363229 cancel_linger_request(lreq->reg_req);
30373230 cancel_linger_map_check(lreq);
30383231 unlink_linger(lreq->osd, lreq);
....@@ -3137,17 +3330,24 @@
31373330 int ret;
31383331
31393332 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3140
- ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
3333
+ ret = wait_for_completion_killable(&lreq->reg_commit_wait);
31413334 return ret ?: lreq->reg_commit_error;
31423335 }
31433336
3144
-static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
3337
+static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq,
3338
+ unsigned long timeout)
31453339 {
3146
- int ret;
3340
+ long left;
31473341
31483342 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3149
- ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
3150
- return ret ?: lreq->notify_finish_error;
3343
+ left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait,
3344
+ ceph_timeout_jiffies(timeout));
3345
+ if (left <= 0)
3346
+ left = left ?: -ETIMEDOUT;
3347
+ else
3348
+ left = lreq->notify_finish_error; /* completed */
3349
+
3350
+ return left;
31513351 }
31523352
31533353 /*
....@@ -3372,9 +3572,6 @@
33723572 goto e_inval;
33733573 }
33743574
3375
- len = ceph_decode_32(p);
3376
- *p += len; /* skip osd_instructions */
3377
-
33783575 /* skip the rest */
33793576 *p = struct_end;
33803577 out:
....@@ -3549,6 +3746,26 @@
35493746 goto out_unlock_osdc;
35503747 }
35513748
3749
+ if (m.result == -EAGAIN) {
3750
+ dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3751
+ unlink_request(osd, req);
3752
+ mutex_unlock(&osd->lock);
3753
+
3754
+ /*
3755
+ * The object is missing on the replica or not (yet)
3756
+ * readable. Clear pgid to force a resend to the primary
3757
+ * via legacy_change.
3758
+ */
3759
+ req->r_t.pgid.pool = 0;
3760
+ req->r_t.pgid.seed = 0;
3761
+ WARN_ON(!req->r_t.used_replica);
3762
+ req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3763
+ CEPH_OSD_FLAG_LOCALIZE_READS);
3764
+ req->r_tid = 0;
3765
+ __submit_request(req, false);
3766
+ goto out_unlock_osdc;
3767
+ }
3768
+
35523769 if (m.num_ops != req->r_num_ops) {
35533770 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
35543771 req->r_num_ops, req->r_tid);
....@@ -3619,7 +3836,7 @@
36193836 struct ceph_osd_client *osdc = lreq->osdc;
36203837 enum calc_target_result ct_res;
36213838
3622
- ct_res = calc_target(osdc, &lreq->t, NULL, true);
3839
+ ct_res = calc_target(osdc, &lreq->t, true);
36233840 if (ct_res == CALC_TARGET_NEED_RESEND) {
36243841 struct ceph_osd *osd;
36253842
....@@ -3665,7 +3882,7 @@
36653882 if (!force_resend && !force_resend_writes)
36663883 break;
36673884
3668
- /* fall through */
3885
+ fallthrough;
36693886 case CALC_TARGET_NEED_RESEND:
36703887 cancel_linger_map_check(lreq);
36713888 /*
....@@ -3691,8 +3908,7 @@
36913908 n = rb_next(n); /* unlink_request(), check_pool_dne() */
36923909
36933910 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3694
- ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
3695
- false);
3911
+ ct_res = calc_target(osdc, &req->r_t, false);
36963912 switch (ct_res) {
36973913 case CALC_TARGET_NO_ACTION:
36983914 force_resend_writes = cleared_full ||
....@@ -3703,7 +3919,7 @@
37033919 !force_resend_writes))
37043920 break;
37053921
3706
- /* fall through */
3922
+ fallthrough;
37073923 case CALC_TARGET_NEED_RESEND:
37083924 cancel_map_check(req);
37093925 unlink_request(osd, req);
....@@ -3801,7 +4017,7 @@
38014017 n = rb_next(n);
38024018
38034019 if (req->r_t.epoch < osdc->osdmap->epoch) {
3804
- ct_res = calc_target(osdc, &req->r_t, NULL, false);
4020
+ ct_res = calc_target(osdc, &req->r_t, false);
38054021 if (ct_res == CALC_TARGET_POOL_DNE) {
38064022 erase_request(need_resend, req);
38074023 check_pool_dne(req);
....@@ -4320,9 +4536,7 @@
43204536 lreq->notify_id, notify_id);
43214537 } else if (!completion_done(&lreq->notify_finish_wait)) {
43224538 struct ceph_msg_data *data =
4323
- list_first_entry_or_null(&msg->data,
4324
- struct ceph_msg_data,
4325
- links);
4539
+ msg->num_data_items ? &msg->data[0] : NULL;
43264540
43274541 if (data) {
43284542 if (lreq->preply_pages) {
....@@ -4330,9 +4544,7 @@
43304544 CEPH_MSG_DATA_PAGES);
43314545 *lreq->preply_pages = data->pages;
43324546 *lreq->preply_len = data->length;
4333
- } else {
4334
- ceph_release_page_vector(data->pages,
4335
- calc_pages_for(0, data->length));
4547
+ data->own_pages = false;
43364548 }
43374549 }
43384550 lreq->notify_finish_error = return_code;
....@@ -4467,26 +4679,6 @@
44674679 }
44684680 EXPORT_SYMBOL(ceph_osdc_sync);
44694681
4470
-static struct ceph_osd_request *
4471
-alloc_linger_request(struct ceph_osd_linger_request *lreq)
4472
-{
4473
- struct ceph_osd_request *req;
4474
-
4475
- req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
4476
- if (!req)
4477
- return NULL;
4478
-
4479
- ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4480
- ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4481
-
4482
- if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4483
- ceph_osdc_put_request(req);
4484
- return NULL;
4485
- }
4486
-
4487
- return req;
4488
-}
4489
-
44904682 /*
44914683 * Returns a handle, caller owns a ref.
44924684 */
....@@ -4516,27 +4708,7 @@
45164708 lreq->t.flags = CEPH_OSD_FLAG_WRITE;
45174709 ktime_get_real_ts64(&lreq->mtime);
45184710
4519
- lreq->reg_req = alloc_linger_request(lreq);
4520
- if (!lreq->reg_req) {
4521
- ret = -ENOMEM;
4522
- goto err_put_lreq;
4523
- }
4524
-
4525
- lreq->ping_req = alloc_linger_request(lreq);
4526
- if (!lreq->ping_req) {
4527
- ret = -ENOMEM;
4528
- goto err_put_lreq;
4529
- }
4530
-
4531
- down_write(&osdc->lock);
4532
- linger_register(lreq); /* before osd_req_op_* */
4533
- osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
4534
- CEPH_OSD_WATCH_OP_WATCH);
4535
- osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
4536
- CEPH_OSD_WATCH_OP_PING);
45374711 linger_submit(lreq);
4538
- up_write(&osdc->lock);
4539
-
45404712 ret = linger_reg_commit_wait(lreq);
45414713 if (ret) {
45424714 linger_cancel(lreq);
....@@ -4573,8 +4745,8 @@
45734745 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
45744746 req->r_flags = CEPH_OSD_FLAG_WRITE;
45754747 ktime_get_real_ts64(&req->r_mtime);
4576
- osd_req_op_watch_init(req, 0, lreq->linger_id,
4577
- CEPH_OSD_WATCH_OP_UNWATCH);
4748
+ osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH,
4749
+ lreq->linger_id, 0);
45784750
45794751 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
45804752 if (ret)
....@@ -4599,13 +4771,12 @@
45994771 struct ceph_pagelist *pl;
46004772 int ret;
46014773
4602
- op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4774
+ op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
46034775
4604
- pl = kmalloc(sizeof(*pl), GFP_NOIO);
4776
+ pl = ceph_pagelist_alloc(GFP_NOIO);
46054777 if (!pl)
46064778 return -ENOMEM;
46074779
4608
- ceph_pagelist_init(pl);
46094780 ret = ceph_pagelist_encode_64(pl, notify_id);
46104781 ret |= ceph_pagelist_encode_64(pl, cookie);
46114782 if (payload) {
....@@ -4643,12 +4814,12 @@
46434814 ceph_oloc_copy(&req->r_base_oloc, oloc);
46444815 req->r_flags = CEPH_OSD_FLAG_READ;
46454816
4646
- ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4817
+ ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4818
+ payload_len);
46474819 if (ret)
46484820 goto out_put_req;
46494821
4650
- ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4651
- payload_len);
4822
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
46524823 if (ret)
46534824 goto out_put_req;
46544825
....@@ -4660,36 +4831,6 @@
46604831 return ret;
46614832 }
46624833 EXPORT_SYMBOL(ceph_osdc_notify_ack);
4663
-
4664
-static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4665
- u64 cookie, u32 prot_ver, u32 timeout,
4666
- void *payload, u32 payload_len)
4667
-{
4668
- struct ceph_osd_req_op *op;
4669
- struct ceph_pagelist *pl;
4670
- int ret;
4671
-
4672
- op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4673
- op->notify.cookie = cookie;
4674
-
4675
- pl = kmalloc(sizeof(*pl), GFP_NOIO);
4676
- if (!pl)
4677
- return -ENOMEM;
4678
-
4679
- ceph_pagelist_init(pl);
4680
- ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4681
- ret |= ceph_pagelist_encode_32(pl, timeout);
4682
- ret |= ceph_pagelist_encode_32(pl, payload_len);
4683
- ret |= ceph_pagelist_append(pl, payload, payload_len);
4684
- if (ret) {
4685
- ceph_pagelist_release(pl);
4686
- return -ENOMEM;
4687
- }
4688
-
4689
- ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
4690
- op->indata_len = pl->length;
4691
- return 0;
4692
-}
46934834
46944835 /*
46954836 * @timeout: in seconds
....@@ -4709,7 +4850,6 @@
47094850 size_t *preply_len)
47104851 {
47114852 struct ceph_osd_linger_request *lreq;
4712
- struct page **pages;
47134853 int ret;
47144854
47154855 WARN_ON(!timeout);
....@@ -4722,6 +4862,29 @@
47224862 if (!lreq)
47234863 return -ENOMEM;
47244864
4865
+ lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
4866
+ if (!lreq->request_pl) {
4867
+ ret = -ENOMEM;
4868
+ goto out_put_lreq;
4869
+ }
4870
+
4871
+ ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
4872
+ ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
4873
+ ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
4874
+ ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
4875
+ if (ret) {
4876
+ ret = -ENOMEM;
4877
+ goto out_put_lreq;
4878
+ }
4879
+
4880
+ /* for notify_id */
4881
+ lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
4882
+ if (IS_ERR(lreq->notify_id_pages)) {
4883
+ ret = PTR_ERR(lreq->notify_id_pages);
4884
+ lreq->notify_id_pages = NULL;
4885
+ goto out_put_lreq;
4886
+ }
4887
+
47254888 lreq->preply_pages = preply_pages;
47264889 lreq->preply_len = preply_len;
47274890
....@@ -4729,38 +4892,11 @@
47294892 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
47304893 lreq->t.flags = CEPH_OSD_FLAG_READ;
47314894
4732
- lreq->reg_req = alloc_linger_request(lreq);
4733
- if (!lreq->reg_req) {
4734
- ret = -ENOMEM;
4735
- goto out_put_lreq;
4736
- }
4737
-
4738
- /* for notify_id */
4739
- pages = ceph_alloc_page_vector(1, GFP_NOIO);
4740
- if (IS_ERR(pages)) {
4741
- ret = PTR_ERR(pages);
4742
- goto out_put_lreq;
4743
- }
4744
-
4745
- down_write(&osdc->lock);
4746
- linger_register(lreq); /* before osd_req_op_* */
4747
- ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4748
- timeout, payload, payload_len);
4749
- if (ret) {
4750
- linger_unregister(lreq);
4751
- up_write(&osdc->lock);
4752
- ceph_release_page_vector(pages, 1);
4753
- goto out_put_lreq;
4754
- }
4755
- ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4756
- response_data),
4757
- pages, PAGE_SIZE, 0, false, true);
47584895 linger_submit(lreq);
4759
- up_write(&osdc->lock);
4760
-
47614896 ret = linger_reg_commit_wait(lreq);
47624897 if (!ret)
4763
- ret = linger_notify_finish_wait(lreq);
4898
+ ret = linger_notify_finish_wait(lreq,
4899
+ msecs_to_jiffies(2 * timeout * MSEC_PER_SEC));
47644900 else
47654901 dout("lreq %p failed to initiate notify %d\n", lreq, ret);
47664902
....@@ -4814,20 +4950,26 @@
48144950 ret = ceph_start_decoding(p, end, 2, "watch_item_t",
48154951 &struct_v, &struct_len);
48164952 if (ret)
4817
- return ret;
4953
+ goto bad;
48184954
4819
- ceph_decode_copy(p, &item->name, sizeof(item->name));
4820
- item->cookie = ceph_decode_64(p);
4821
- *p += 4; /* skip timeout_seconds */
4955
+ ret = -EINVAL;
4956
+ ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
4957
+ ceph_decode_64_safe(p, end, item->cookie, bad);
4958
+ ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
4959
+
48224960 if (struct_v >= 2) {
4823
- ceph_decode_copy(p, &item->addr, sizeof(item->addr));
4824
- ceph_decode_addr(&item->addr);
4961
+ ret = ceph_decode_entity_addr(p, end, &item->addr);
4962
+ if (ret)
4963
+ goto bad;
4964
+ } else {
4965
+ ret = 0;
48254966 }
48264967
48274968 dout("%s %s%llu cookie %llu addr %s\n", __func__,
48284969 ENTITY_NAME(item->name), item->cookie,
4829
- ceph_pr_addr(&item->addr.in_addr));
4830
- return 0;
4970
+ ceph_pr_addr(&item->addr));
4971
+bad:
4972
+ return ret;
48314973 }
48324974
48334975 static int decode_watchers(void **p, void *end,
....@@ -4883,10 +5025,6 @@
48835025 ceph_oloc_copy(&req->r_base_oloc, oloc);
48845026 req->r_flags = CEPH_OSD_FLAG_READ;
48855027
4886
- ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4887
- if (ret)
4888
- goto out_put_req;
4889
-
48905028 pages = ceph_alloc_page_vector(1, GFP_NOIO);
48915029 if (IS_ERR(pages)) {
48925030 ret = PTR_ERR(pages);
....@@ -4897,6 +5035,10 @@
48975035 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
48985036 response_data),
48995037 pages, PAGE_SIZE, 0, false, true);
5038
+
5039
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5040
+ if (ret)
5041
+ goto out_put_req;
49005042
49015043 ceph_osdc_start_request(osdc, req, false);
49025044 ret = ceph_osdc_wait_request(osdc, req);
....@@ -4944,12 +5086,12 @@
49445086 const char *class, const char *method,
49455087 unsigned int flags,
49465088 struct page *req_page, size_t req_len,
4947
- struct page *resp_page, size_t *resp_len)
5089
+ struct page **resp_pages, size_t *resp_len)
49485090 {
49495091 struct ceph_osd_request *req;
49505092 int ret;
49515093
4952
- if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
5094
+ if (req_len > PAGE_SIZE)
49535095 return -E2BIG;
49545096
49555097 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
....@@ -4960,26 +5102,26 @@
49605102 ceph_oloc_copy(&req->r_base_oloc, oloc);
49615103 req->r_flags = flags;
49625104
4963
- ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4964
- if (ret)
4965
- goto out_put_req;
4966
-
4967
- ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
5105
+ ret = osd_req_op_cls_init(req, 0, class, method);
49685106 if (ret)
49695107 goto out_put_req;
49705108
49715109 if (req_page)
49725110 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
49735111 0, false, false);
4974
- if (resp_page)
4975
- osd_req_op_cls_response_data_pages(req, 0, &resp_page,
5112
+ if (resp_pages)
5113
+ osd_req_op_cls_response_data_pages(req, 0, resp_pages,
49765114 *resp_len, 0, false, false);
5115
+
5116
+ ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5117
+ if (ret)
5118
+ goto out_put_req;
49775119
49785120 ceph_osdc_start_request(osdc, req, false);
49795121 ret = ceph_osdc_wait_request(osdc, req);
49805122 if (ret >= 0) {
49815123 ret = req->r_ops[0].rval;
4982
- if (resp_page)
5124
+ if (resp_pages)
49835125 *resp_len = req->r_ops[0].outdata_len;
49845126 }
49855127
....@@ -4988,6 +5130,24 @@
49885130 return ret;
49895131 }
49905132 EXPORT_SYMBOL(ceph_osdc_call);
5133
+
5134
+/*
5135
+ * reset all osd connections
5136
+ */
5137
+void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5138
+{
5139
+ struct rb_node *n;
5140
+
5141
+ down_write(&osdc->lock);
5142
+ for (n = rb_first(&osdc->osds); n; ) {
5143
+ struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5144
+
5145
+ n = rb_next(n);
5146
+ if (!reopen_osd(osd))
5147
+ kick_osd_requests(osd);
5148
+ }
5149
+ up_write(&osdc->lock);
5150
+}
49915151
49925152 /*
49935153 * init, shutdown
....@@ -5023,11 +5183,12 @@
50235183 goto out_map;
50245184
50255185 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5026
- PAGE_SIZE, 10, true, "osd_op");
5186
+ PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
50275187 if (err < 0)
50285188 goto out_mempool;
50295189 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5030
- PAGE_SIZE, 10, true, "osd_op_reply");
5190
+ PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5191
+ "osd_op_reply");
50315192 if (err < 0)
50325193 goto out_msgpool;
50335194
....@@ -5091,84 +5252,85 @@
50915252 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
50925253 }
50935254
5094
-/*
5095
- * Read some contiguous pages. If we cross a stripe boundary, shorten
5096
- * *plen. Return number of bytes read, or error.
5097
- */
5098
-int ceph_osdc_readpages(struct ceph_osd_client *osdc,
5099
- struct ceph_vino vino, struct ceph_file_layout *layout,
5100
- u64 off, u64 *plen,
5255
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5256
+ u64 src_snapid, u64 src_version,
5257
+ struct ceph_object_id *src_oid,
5258
+ struct ceph_object_locator *src_oloc,
5259
+ u32 src_fadvise_flags,
5260
+ u32 dst_fadvise_flags,
5261
+ u32 truncate_seq, u64 truncate_size,
5262
+ u8 copy_from_flags)
5263
+{
5264
+ struct ceph_osd_req_op *op;
5265
+ struct page **pages;
5266
+ void *p, *end;
5267
+
5268
+ pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5269
+ if (IS_ERR(pages))
5270
+ return PTR_ERR(pages);
5271
+
5272
+ op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5273
+ dst_fadvise_flags);
5274
+ op->copy_from.snapid = src_snapid;
5275
+ op->copy_from.src_version = src_version;
5276
+ op->copy_from.flags = copy_from_flags;
5277
+ op->copy_from.src_fadvise_flags = src_fadvise_flags;
5278
+
5279
+ p = page_address(pages[0]);
5280
+ end = p + PAGE_SIZE;
5281
+ ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5282
+ encode_oloc(&p, end, src_oloc);
5283
+ ceph_encode_32(&p, truncate_seq);
5284
+ ceph_encode_64(&p, truncate_size);
5285
+ op->indata_len = PAGE_SIZE - (end - p);
5286
+
5287
+ ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5288
+ op->indata_len, 0, false, true);
5289
+ return 0;
5290
+}
5291
+
5292
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
5293
+ u64 src_snapid, u64 src_version,
5294
+ struct ceph_object_id *src_oid,
5295
+ struct ceph_object_locator *src_oloc,
5296
+ u32 src_fadvise_flags,
5297
+ struct ceph_object_id *dst_oid,
5298
+ struct ceph_object_locator *dst_oloc,
5299
+ u32 dst_fadvise_flags,
51015300 u32 truncate_seq, u64 truncate_size,
5102
- struct page **pages, int num_pages, int page_align)
5301
+ u8 copy_from_flags)
51035302 {
51045303 struct ceph_osd_request *req;
5105
- int rc = 0;
5304
+ int ret;
51065305
5107
- dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
5108
- vino.snap, off, *plen);
5109
- req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
5110
- CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
5111
- NULL, truncate_seq, truncate_size,
5112
- false);
5113
- if (IS_ERR(req))
5114
- return PTR_ERR(req);
5306
+ req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
5307
+ if (!req)
5308
+ return -ENOMEM;
51155309
5116
- /* it may be a short read due to an object boundary */
5117
- osd_req_op_extent_osd_data_pages(req, 0,
5118
- pages, *plen, page_align, false, false);
5310
+ req->r_flags = CEPH_OSD_FLAG_WRITE;
51195311
5120
- dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
5121
- off, *plen, *plen, page_align);
5312
+ ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
5313
+ ceph_oid_copy(&req->r_t.base_oid, dst_oid);
51225314
5123
- rc = ceph_osdc_start_request(osdc, req, false);
5124
- if (!rc)
5125
- rc = ceph_osdc_wait_request(osdc, req);
5315
+ ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
5316
+ src_oloc, src_fadvise_flags,
5317
+ dst_fadvise_flags, truncate_seq,
5318
+ truncate_size, copy_from_flags);
5319
+ if (ret)
5320
+ goto out;
51265321
5322
+ ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
5323
+ if (ret)
5324
+ goto out;
5325
+
5326
+ ceph_osdc_start_request(osdc, req, false);
5327
+ ret = ceph_osdc_wait_request(osdc, req);
5328
+
5329
+out:
51275330 ceph_osdc_put_request(req);
5128
- dout("readpages result %d\n", rc);
5129
- return rc;
5331
+ return ret;
51305332 }
5131
-EXPORT_SYMBOL(ceph_osdc_readpages);
5132
-
5133
-/*
5134
- * do a synchronous write on N pages
5135
- */
5136
-int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
5137
- struct ceph_file_layout *layout,
5138
- struct ceph_snap_context *snapc,
5139
- u64 off, u64 len,
5140
- u32 truncate_seq, u64 truncate_size,
5141
- struct timespec64 *mtime,
5142
- struct page **pages, int num_pages)
5143
-{
5144
- struct ceph_osd_request *req;
5145
- int rc = 0;
5146
- int page_align = off & ~PAGE_MASK;
5147
-
5148
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
5149
- CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
5150
- snapc, truncate_seq, truncate_size,
5151
- true);
5152
- if (IS_ERR(req))
5153
- return PTR_ERR(req);
5154
-
5155
- /* it may be a short write due to an object boundary */
5156
- osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
5157
- false, false);
5158
- dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
5159
-
5160
- req->r_mtime = *mtime;
5161
- rc = ceph_osdc_start_request(osdc, req, true);
5162
- if (!rc)
5163
- rc = ceph_osdc_wait_request(osdc, req);
5164
-
5165
- ceph_osdc_put_request(req);
5166
- if (rc == 0)
5167
- rc = len;
5168
- dout("writepages result %d\n", rc);
5169
- return rc;
5170
-}
5171
-EXPORT_SYMBOL(ceph_osdc_writepages);
5333
+EXPORT_SYMBOL(ceph_osdc_copy_from);
51725334
51735335 int __init ceph_osdc_setup(void)
51745336 {
....@@ -5287,9 +5449,6 @@
52875449 return m;
52885450 }
52895451
5290
-/*
5291
- * TODO: switch to a msg-owned pagelist
5292
- */
52935452 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
52945453 {
52955454 struct ceph_msg *m;
....@@ -5297,13 +5456,12 @@
52975456 u32 front_len = le32_to_cpu(hdr->front_len);
52985457 u32 data_len = le32_to_cpu(hdr->data_len);
52995458
5300
- m = ceph_msg_new(type, front_len, GFP_NOIO, false);
5459
+ m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
53015460 if (!m)
53025461 return NULL;
53035462
53045463 if (data_len) {
53055464 struct page **pages;
5306
- struct ceph_osd_data osd_data;
53075465
53085466 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
53095467 GFP_NOIO);
....@@ -5312,9 +5470,7 @@
53125470 return NULL;
53135471 }
53145472
5315
- ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
5316
- false);
5317
- ceph_osdc_msg_data_add(m, &osd_data);
5473
+ ceph_msg_data_add_pages(m, pages, data_len, 0, true);
53185474 }
53195475
53205476 return m;