hc
2024-08-12 233ab1bd4c5697f5cdec94e60206e8c6ac609b4c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
// SPDX-License-Identifier: GPL-2.0+
/*
 * Copyright (C) 2020 Google, Inc
 * Copyright (C) 2020 Palmer Dabbelt <palmerdabbelt@google.com>
 */
 
#include <linux/device-mapper.h>
#include <uapi/linux/dm-user.h>
 
#include <linux/bio.h>
#include <linux/init.h>
#include <linux/mempool.h>
#include <linux/miscdevice.h>
#include <linux/module.h>
#include <linux/poll.h>
#include <linux/uio.h>
#include <linux/wait.h>
#include <linux/workqueue.h>
 
#define DM_MSG_PREFIX "user"
 
#define MAX_OUTSTANDING_MESSAGES 128
 
static unsigned int daemon_timeout_msec = 4000;
module_param_named(dm_user_daemon_timeout_msec, daemon_timeout_msec, uint,
          0644);
MODULE_PARM_DESC(dm_user_daemon_timeout_msec,
        "IO Timeout in msec if daemon does not process");
 
/*
 * dm-user uses four structures:
 *
 *  - "struct target", the outermost structure, corresponds to a single device
 *    mapper target.  This contains the set of outstanding BIOs that have been
 *    provided by DM and are not actively being processed by the user, along
 *    with a misc device that userspace can open to communicate with the
 *    kernel.  Each time userspaces opens the misc device a new channel is
 *    created.
 *  - "struct channel", which represents a single active communication channel
 *    with userspace.  Userspace may choose arbitrary read/write sizes to use
 *    when processing messages, channels form these into logical accesses.
 *    When userspace responds to a full message the channel completes the BIO
 *    and obtains a new message to process from the target.
 *  - "struct message", which wraps a BIO with the additional information
 *    required by the kernel to sort out what to do with BIOs when they return
 *    from userspace.
 *  - "struct dm_user_message", which is the exact message format that
 *    userspace sees.
 *
 * The hot path contains three distinct operations:
 *
 *  - user_map(), which is provided a BIO from device mapper that is queued
 *    into the target.  This allocates and enqueues a new message.
 *  - dev_read(), which dequeues a message, copies it to userspace.
 *  - dev_write(), which looks up a message (keyed by sequence number) and
 *    completes the corresponding BIO.
 *
 * Lock ordering (outer to inner)
 *
 * 1) miscdevice's global lock.  This is held around dev_open, so it has to be
 *    the outermost lock.
 * 2) target->lock
 * 3) channel->lock
 */
 
struct message {
   /*
    * Messages themselves do not need a lock, they're protected by either
    * the target or channel's lock, depending on which can reference them
    * directly.
    */
   struct dm_user_message msg;
   struct bio *bio;
   size_t posn_to_user;
   size_t total_to_user;
   size_t posn_from_user;
   size_t total_from_user;
 
   struct list_head from_user;
   struct list_head to_user;
 
   /*
    * These are written back from the user.  They live in the same spot in
    * the message, but we need to either keep the old values around or
    * call a bunch more BIO helpers.  These are only valid after write has
    * adopted the message.
    */
   u64 return_type;
   u64 return_flags;
 
   struct delayed_work work;
   bool delayed;
   struct target *t;
};
 
struct target {
   /*
    * A target has a single lock, which protects everything in the target
    * (but does not protect the channels associated with a target).
    */
   struct mutex lock;
 
   /*
    * There is only one point at which anything blocks: userspace blocks
    * reading a new message, which is woken up by device mapper providing
    * a new BIO to process (or tearing down the target).  The
    * corresponding write side doesn't block, instead we treat userspace's
    * response containing a message that has yet to be mapped as an
    * invalid operation.
    */
   struct wait_queue_head wq;
 
   /*
    * Messages are delivered to userspace in order, but may be returned
    * out of order.  This allows userspace to schedule IO if it wants to.
    */
   mempool_t message_pool;
   u64 next_seq_to_map;
   u64 next_seq_to_user;
   struct list_head to_user;
 
   /*
    * There is a misc device per target.  The name is selected by
    * userspace (via a DM create ioctl argument), and each ends up in
    * /dev/dm-user/.  It looks like a better way to do this may be to have
    * a filesystem to manage these, but this was more expedient.  The
    * current mechanism is functional, but does result in an arbitrary
    * number of dynamically created misc devices.
    */
   struct miscdevice miscdev;
 
   /*
    * Device mapper's target destructor triggers tearing this all down,
    * but we can't actually free until every channel associated with this
    * target has been destroyed.  Channels each have a reference to their
    * target, and there is an additional single reference that corresponds
    * to both DM and the misc device (both of which are destroyed by DM).
    *
    * In the common case userspace will be asleep waiting for a new
    * message when device mapper decides to destroy the target, which
    * means no new messages will appear.  The destroyed flag triggers a
    * wakeup, which will end up removing the reference.
    */
   struct kref references;
   int dm_destroyed;
   bool daemon_terminated;
};
 
struct channel {
   struct target *target;
 
   /*
    * A channel has a single lock, which prevents multiple reads (or
    * multiple writes) from conflicting with each other.
    */
   struct mutex lock;
 
   struct message *cur_to_user;
   struct message *cur_from_user;
   ssize_t to_user_error;
   ssize_t from_user_error;
 
   /*
    * Once a message has been forwarded to userspace on a channel it must
    * be responded to on the same channel.  This allows us to error out
    * the messages that have not yet been responded to by a channel when
    * that channel closes, which makes handling errors more reasonable for
    * fault-tolerant userspace daemons.  It also happens to make avoiding
    * shared locks between user_map() and dev_read() a lot easier.
    *
    * This does preclude a multi-threaded work stealing userspace
    * implementation (or at least, force a degree of head-of-line blocking
    * on the response path).
    */
   struct list_head from_user;
 
   /*
    * Responses from userspace can arrive in arbitrarily small chunks.
    * We need some place to buffer one up until we can find the
    * corresponding kernel-side message to continue processing, so instead
    * of allocating them we just keep one off to the side here.  This can
    * only ever be pointer to by from_user_cur, and will never have a BIO.
    */
   struct message scratch_message_from_user;
};
 
static void message_kill(struct message *m, mempool_t *pool)
{
   m->bio->bi_status = BLK_STS_IOERR;
   bio_endio(m->bio);
   mempool_free(m, pool);
}
 
static inline bool is_user_space_thread_present(struct target *t)
{
   lockdep_assert_held(&t->lock);
   return (kref_read(&t->references) > 1);
}
 
static void process_delayed_work(struct work_struct *work)
{
   struct delayed_work *del_work = to_delayed_work(work);
   struct message *msg = container_of(del_work, struct message, work);
 
   struct target *t = msg->t;
 
   mutex_lock(&t->lock);
 
   /*
    * There is a atleast one thread to process the IO.
    */
   if (is_user_space_thread_present(t)) {
       mutex_unlock(&t->lock);
       return;
   }
 
   /*
    * Terminate the IO with an error
    */
   list_del(&msg->to_user);
   pr_err("I/O error: sector %llu: no user-space daemon for %s target\n",
          msg->bio->bi_iter.bi_sector,
          t->miscdev.name);
   message_kill(msg, &t->message_pool);
   mutex_unlock(&t->lock);
}
 
static void enqueue_delayed_work(struct message *m, bool is_delay)
{
   unsigned long delay = 0;
 
   m->delayed = true;
   INIT_DELAYED_WORK(&m->work, process_delayed_work);
 
   /*
    * Snapuserd daemon is the user-space process
    * which processes IO request from dm-user
    * when OTA is applied. Per the current design,
    * when a dm-user target is created, daemon
    * attaches to target and starts processing
    * the IO's. Daemon is terminated only when
    * dm-user target is destroyed.
    *
    * If for some reason, daemon crashes or terminates early,
    * without destroying the dm-user target; then
    * there is no mechanism to restart the daemon
    * and start processing the IO's from the same target.
    * Theoretically, it is possible but that infrastructure
    * doesn't exist in the android ecosystem.
    *
    * Thus, when the daemon terminates, there is no way the IO's
    * issued on that target will be processed. Hence,
    * we set the delay to 0 and fail the IO's immediately.
    *
    * On the other hand, when a new dm-user target is created,
    * we wait for the daemon to get attached for the first time.
    * This primarily happens when init first stage spins up
    * the daemon. At this point, since the snapshot device is mounted
    * of a root filesystem, dm-user target may receive IO request
    * even though daemon is not fully launched. We don't want
    * to fail those IO requests immediately. Thus, we queue these
    * requests with a timeout so that daemon is ready to process
    * those IO requests. Again, if the daemon fails to launch within
    * the timeout period, then IO's will be failed.
    */
   if (is_delay)
       delay = msecs_to_jiffies(daemon_timeout_msec);
 
   queue_delayed_work(system_wq, &m->work, delay);
}
 
static inline struct target *target_from_target(struct dm_target *target)
{
   WARN_ON(target->private == NULL);
   return target->private;
}
 
static inline struct target *target_from_miscdev(struct miscdevice *miscdev)
{
   return container_of(miscdev, struct target, miscdev);
}
 
static inline struct channel *channel_from_file(struct file *file)
{
   WARN_ON(file->private_data == NULL);
   return file->private_data;
}
 
static inline struct target *target_from_channel(struct channel *c)
{
   WARN_ON(c->target == NULL);
   return c->target;
}
 
static inline size_t bio_size(struct bio *bio)
{
   struct bio_vec bvec;
   struct bvec_iter iter;
   size_t out = 0;
 
   bio_for_each_segment (bvec, bio, iter)
       out += bio_iter_len(bio, iter);
   return out;
}
 
static inline size_t bio_bytes_needed_to_user(struct bio *bio)
{
   switch (bio_op(bio)) {
   case REQ_OP_WRITE:
       return sizeof(struct dm_user_message) + bio_size(bio);
   case REQ_OP_READ:
   case REQ_OP_FLUSH:
   case REQ_OP_DISCARD:
   case REQ_OP_SECURE_ERASE:
   case REQ_OP_WRITE_SAME:
   case REQ_OP_WRITE_ZEROES:
       return sizeof(struct dm_user_message);
 
   /*
    * These ops are not passed to userspace under the assumption that
    * they're not going to be particularly useful in that context.
    */
   default:
       return -EOPNOTSUPP;
   }
}
 
static inline size_t bio_bytes_needed_from_user(struct bio *bio)
{
   switch (bio_op(bio)) {
   case REQ_OP_READ:
       return sizeof(struct dm_user_message) + bio_size(bio);
   case REQ_OP_WRITE:
   case REQ_OP_FLUSH:
   case REQ_OP_DISCARD:
   case REQ_OP_SECURE_ERASE:
   case REQ_OP_WRITE_SAME:
   case REQ_OP_WRITE_ZEROES:
       return sizeof(struct dm_user_message);
 
   /*
    * These ops are not passed to userspace under the assumption that
    * they're not going to be particularly useful in that context.
    */
   default:
       return -EOPNOTSUPP;
   }
}
 
static inline long bio_type_to_user_type(struct bio *bio)
{
   switch (bio_op(bio)) {
   case REQ_OP_READ:
       return DM_USER_REQ_MAP_READ;
   case REQ_OP_WRITE:
       return DM_USER_REQ_MAP_WRITE;
   case REQ_OP_FLUSH:
       return DM_USER_REQ_MAP_FLUSH;
   case REQ_OP_DISCARD:
       return DM_USER_REQ_MAP_DISCARD;
   case REQ_OP_SECURE_ERASE:
       return DM_USER_REQ_MAP_SECURE_ERASE;
   case REQ_OP_WRITE_SAME:
       return DM_USER_REQ_MAP_WRITE_SAME;
   case REQ_OP_WRITE_ZEROES:
       return DM_USER_REQ_MAP_WRITE_ZEROES;
 
   /*
    * These ops are not passed to userspace under the assumption that
    * they're not going to be particularly useful in that context.
    */
   default:
       return -EOPNOTSUPP;
   }
}
 
static inline long bio_flags_to_user_flags(struct bio *bio)
{
   u64 out = 0;
   typeof(bio->bi_opf) opf = bio->bi_opf & ~REQ_OP_MASK;
 
   if (opf & REQ_FAILFAST_DEV) {
       opf &= ~REQ_FAILFAST_DEV;
       out |= DM_USER_REQ_MAP_FLAG_FAILFAST_DEV;
   }
 
   if (opf & REQ_FAILFAST_TRANSPORT) {
       opf &= ~REQ_FAILFAST_TRANSPORT;
       out |= DM_USER_REQ_MAP_FLAG_FAILFAST_TRANSPORT;
   }
 
   if (opf & REQ_FAILFAST_DRIVER) {
       opf &= ~REQ_FAILFAST_DRIVER;
       out |= DM_USER_REQ_MAP_FLAG_FAILFAST_DRIVER;
   }
 
   if (opf & REQ_SYNC) {
       opf &= ~REQ_SYNC;
       out |= DM_USER_REQ_MAP_FLAG_SYNC;
   }
 
   if (opf & REQ_META) {
       opf &= ~REQ_META;
       out |= DM_USER_REQ_MAP_FLAG_META;
   }
 
   if (opf & REQ_PRIO) {
       opf &= ~REQ_PRIO;
       out |= DM_USER_REQ_MAP_FLAG_PRIO;
   }
 
   if (opf & REQ_NOMERGE) {
       opf &= ~REQ_NOMERGE;
       out |= DM_USER_REQ_MAP_FLAG_NOMERGE;
   }
 
   if (opf & REQ_IDLE) {
       opf &= ~REQ_IDLE;
       out |= DM_USER_REQ_MAP_FLAG_IDLE;
   }
 
   if (opf & REQ_INTEGRITY) {
       opf &= ~REQ_INTEGRITY;
       out |= DM_USER_REQ_MAP_FLAG_INTEGRITY;
   }
 
   if (opf & REQ_FUA) {
       opf &= ~REQ_FUA;
       out |= DM_USER_REQ_MAP_FLAG_FUA;
   }
 
   if (opf & REQ_PREFLUSH) {
       opf &= ~REQ_PREFLUSH;
       out |= DM_USER_REQ_MAP_FLAG_PREFLUSH;
   }
 
   if (opf & REQ_RAHEAD) {
       opf &= ~REQ_RAHEAD;
       out |= DM_USER_REQ_MAP_FLAG_RAHEAD;
   }
 
   if (opf & REQ_BACKGROUND) {
       opf &= ~REQ_BACKGROUND;
       out |= DM_USER_REQ_MAP_FLAG_BACKGROUND;
   }
 
   if (opf & REQ_NOWAIT) {
       opf &= ~REQ_NOWAIT;
       out |= DM_USER_REQ_MAP_FLAG_NOWAIT;
   }
 
   if (opf & REQ_NOUNMAP) {
       opf &= ~REQ_NOUNMAP;
       out |= DM_USER_REQ_MAP_FLAG_NOUNMAP;
   }
 
   if (unlikely(opf)) {
       pr_warn("unsupported BIO type %x\n", opf);
       return -EOPNOTSUPP;
   }
   WARN_ON(out < 0);
   return out;
}
 
/*
 * Not quite what's in blk-map.c, but instead what I thought the functions in
 * blk-map did.  This one seems more generally useful and I think we could
 * write the blk-map version in terms of this one.  The differences are that
 * this has a return value that counts, and blk-map uses the BIO _all iters.
 * Neither  advance the BIO iter but don't advance the IOV iter, which is a bit
 * odd here.
 */
static ssize_t bio_copy_from_iter(struct bio *bio, struct iov_iter *iter)
{
   struct bio_vec bvec;
   struct bvec_iter biter;
   ssize_t out = 0;
 
   bio_for_each_segment (bvec, bio, biter) {
       ssize_t ret;
 
       ret = copy_page_from_iter(bvec.bv_page, bvec.bv_offset,
                     bvec.bv_len, iter);
 
       /*
        * FIXME: I thought that IOV copies had a mechanism for
        * terminating early, if for example a signal came in while
        * sleeping waiting for a page to be mapped, but I don't see
        * where that would happen.
        */
       WARN_ON(ret < 0);
       out += ret;
 
       if (!iov_iter_count(iter))
           break;
 
       if (ret < bvec.bv_len)
           return ret;
   }
 
   return out;
}
 
static ssize_t bio_copy_to_iter(struct bio *bio, struct iov_iter *iter)
{
   struct bio_vec bvec;
   struct bvec_iter biter;
   ssize_t out = 0;
 
   bio_for_each_segment (bvec, bio, biter) {
       ssize_t ret;
 
       ret = copy_page_to_iter(bvec.bv_page, bvec.bv_offset,
                   bvec.bv_len, iter);
 
       /* as above */
       WARN_ON(ret < 0);
       out += ret;
 
       if (!iov_iter_count(iter))
           break;
 
       if (ret < bvec.bv_len)
           return ret;
   }
 
   return out;
}
 
static ssize_t msg_copy_to_iov(struct message *msg, struct iov_iter *to)
{
   ssize_t copied = 0;
 
   if (!iov_iter_count(to))
       return 0;
 
   if (msg->posn_to_user < sizeof(msg->msg)) {
       copied = copy_to_iter((char *)(&msg->msg) + msg->posn_to_user,
                     sizeof(msg->msg) - msg->posn_to_user, to);
   } else {
       copied = bio_copy_to_iter(msg->bio, to);
       if (copied > 0)
           bio_advance(msg->bio, copied);
   }
 
   if (copied < 0)
       return copied;
 
   msg->posn_to_user += copied;
   return copied;
}
 
static ssize_t msg_copy_from_iov(struct message *msg, struct iov_iter *from)
{
   ssize_t copied = 0;
 
   if (!iov_iter_count(from))
       return 0;
 
   if (msg->posn_from_user < sizeof(msg->msg)) {
       copied = copy_from_iter(
           (char *)(&msg->msg) + msg->posn_from_user,
           sizeof(msg->msg) - msg->posn_from_user, from);
   } else {
       copied = bio_copy_from_iter(msg->bio, from);
       if (copied > 0)
           bio_advance(msg->bio, copied);
   }
 
   if (copied < 0)
       return copied;
 
   msg->posn_from_user += copied;
   return copied;
}
 
static struct message *msg_get_map(struct target *t)
{
   struct message *m;
 
   lockdep_assert_held(&t->lock);
 
   m = mempool_alloc(&t->message_pool, GFP_NOIO);
   m->msg.seq = t->next_seq_to_map++;
   INIT_LIST_HEAD(&m->to_user);
   INIT_LIST_HEAD(&m->from_user);
   return m;
}
 
static struct message *msg_get_to_user(struct target *t)
{
   struct message *m;
 
   lockdep_assert_held(&t->lock);
 
   if (list_empty(&t->to_user))
       return NULL;
 
   m = list_first_entry(&t->to_user, struct message, to_user);
 
   list_del(&m->to_user);
 
   /*
    * If the IO was queued to workqueue since there
    * was no daemon to service the IO, then we
    * will have to cancel the delayed work as the
    * IO will be processed by this user-space thread.
    *
    * If the delayed work was already picked up for
    * processing, then wait for it to complete. Note
    * that the IO will not be terminated by the work
    * queue thread.
    */
   if (unlikely(m->delayed)) {
       mutex_unlock(&t->lock);
       cancel_delayed_work_sync(&m->work);
       mutex_lock(&t->lock);
   }
   return m;
}
 
static struct message *msg_get_from_user(struct channel *c, u64 seq)
{
   struct message *m;
   struct list_head *cur, *tmp;
 
   lockdep_assert_held(&c->lock);
 
   list_for_each_safe (cur, tmp, &c->from_user) {
       m = list_entry(cur, struct message, from_user);
       if (m->msg.seq == seq) {
           list_del(&m->from_user);
           return m;
       }
   }
 
   return NULL;
}
 
/*
 * Returns 0 when there is no work left to do.  This must be callable without
 * holding the target lock, as it is part of the waitqueue's check expression.
 * When called without the lock it may spuriously indicate there is remaining
 * work, but when called with the lock it must be accurate.
 */
int target_poll(struct target *t)
{
   return !list_empty(&t->to_user) || t->dm_destroyed;
}
 
void target_release(struct kref *ref)
{
   struct target *t = container_of(ref, struct target, references);
   struct list_head *cur, *tmp;
 
   /*
    * There may be outstanding BIOs that have not yet been given to
    * userspace.  At this point there's nothing we can do about them, as
    * there are and will never be any channels.
    */
   list_for_each_safe (cur, tmp, &t->to_user) {
       struct message *m = list_entry(cur, struct message, to_user);
 
       if (unlikely(m->delayed)) {
           bool ret;
 
           mutex_unlock(&t->lock);
           ret = cancel_delayed_work_sync(&m->work);
           mutex_lock(&t->lock);
           if (!ret)
               continue;
       }
       message_kill(m, &t->message_pool);
   }
 
   mempool_exit(&t->message_pool);
   mutex_unlock(&t->lock);
   mutex_destroy(&t->lock);
   kfree(t);
}
 
void target_put(struct target *t)
{
   /*
    * This both releases a reference to the target and the lock.  We leave
    * it up to the caller to hold the lock, as they probably needed it for
    * something else.
    */
   lockdep_assert_held(&t->lock);
 
   if (!kref_put(&t->references, target_release)) {
       /*
        * User-space thread is getting terminated.
        * We need to scan the list for all those
        * pending IO's which were not processed yet
        * and put them back to work-queue for delayed
        * processing.
        */
       if (!is_user_space_thread_present(t)) {
           struct list_head *cur, *tmp;
 
           list_for_each_safe(cur, tmp, &t->to_user) {
               struct message *m = list_entry(cur,
                                  struct message,
                                  to_user);
               if (!m->delayed)
                   enqueue_delayed_work(m, false);
           }
           /*
            * Daemon attached to this target is terminated.
            */
           t->daemon_terminated = true;
       }
       mutex_unlock(&t->lock);
   }
}
 
static struct channel *channel_alloc(struct target *t)
{
   struct channel *c;
 
   lockdep_assert_held(&t->lock);
 
   c = kzalloc(sizeof(*c), GFP_KERNEL);
   if (c == NULL)
       return NULL;
 
   kref_get(&t->references);
   c->target = t;
   c->cur_from_user = &c->scratch_message_from_user;
   mutex_init(&c->lock);
   INIT_LIST_HEAD(&c->from_user);
   return c;
}
 
void channel_free(struct channel *c)
{
   struct list_head *cur, *tmp;
 
   lockdep_assert_held(&c->lock);
 
   /*
    * There may be outstanding BIOs that have been given to userspace but
    * have not yet been completed.  The channel has been shut down so
    * there's no way to process the rest of those messages, so we just go
    * ahead and error out the BIOs.  Hopefully whatever's on the other end
    * can handle the errors.  One could imagine splitting the BIOs and
    * completing as much as we got, but that seems like overkill here.
    *
    * Our only other options would be to let the BIO hang around (which
    * seems way worse) or to resubmit it to userspace in the hope there's
    * another channel.  I don't really like the idea of submitting a
    * message twice.
    */
   if (c->cur_to_user != NULL)
       message_kill(c->cur_to_user, &c->target->message_pool);
   if (c->cur_from_user != &c->scratch_message_from_user)
       message_kill(c->cur_from_user, &c->target->message_pool);
   list_for_each_safe (cur, tmp, &c->from_user)
       message_kill(list_entry(cur, struct message, from_user),
                &c->target->message_pool);
 
   mutex_lock(&c->target->lock);
   target_put(c->target);
   mutex_unlock(&c->lock);
   mutex_destroy(&c->lock);
   kfree(c);
}
 
static int dev_open(struct inode *inode, struct file *file)
{
   struct channel *c;
   struct target *t;
 
   /*
    * This is called by miscdev, which sets private_data to point to the
    * struct miscdevice that was opened.  The rest of our file operations
    * want to refer to the channel that's been opened, so we swap that
    * pointer out with a fresh channel.
    *
    * This is called with the miscdev lock held, which is also held while
    * registering/unregistering the miscdev.  The miscdev must be
    * registered for this to get called, which means there must be an
    * outstanding reference to the target, which means it cannot be freed
    * out from under us despite us not holding a reference yet.
    */
   t = container_of(file->private_data, struct target, miscdev);
   mutex_lock(&t->lock);
   file->private_data = c = channel_alloc(t);
 
   if (c == NULL) {
       mutex_unlock(&t->lock);
       return -ENOMEM;
   }
 
   mutex_unlock(&t->lock);
   return 0;
}
 
static ssize_t dev_read(struct kiocb *iocb, struct iov_iter *to)
{
   struct channel *c = channel_from_file(iocb->ki_filp);
   ssize_t total_processed = 0;
   ssize_t processed;
 
   mutex_lock(&c->lock);
 
   if (unlikely(c->to_user_error)) {
       total_processed = c->to_user_error;
       goto cleanup_unlock;
   }
 
   if (c->cur_to_user == NULL) {
       struct target *t = target_from_channel(c);
 
       mutex_lock(&t->lock);
 
       while (!target_poll(t)) {
           int e;
 
           mutex_unlock(&t->lock);
           mutex_unlock(&c->lock);
           e = wait_event_interruptible(t->wq, target_poll(t));
           mutex_lock(&c->lock);
           mutex_lock(&t->lock);
 
           if (unlikely(e != 0)) {
               /*
                * We haven't processed any bytes in either the
                * BIO or the IOV, so we can just terminate
                * right now.  Elsewhere in the kernel handles
                * restarting the syscall when appropriate.
                */
               total_processed = e;
               mutex_unlock(&t->lock);
               goto cleanup_unlock;
           }
       }
 
       if (unlikely(t->dm_destroyed)) {
           /*
            * DM has destroyed this target, so just lock
            * the user out.  There's really nothing else
            * we can do here.  Note that we don't actually
            * tear any thing down until userspace has
            * closed the FD, as there may still be
            * outstanding BIOs.
            *
            * This is kind of a wacky error code to
            * return.  My goal was really just to try and
            * find something that wasn't likely to be
            * returned by anything else in the miscdev
            * path.  The message "block device required"
            * seems like a somewhat reasonable thing to
            * say when the target has disappeared out from
            * under us, but "not block" isn't sensible.
            */
           c->to_user_error = total_processed = -ENOTBLK;
           mutex_unlock(&t->lock);
           goto cleanup_unlock;
       }
 
       /*
        * Ensures that accesses to the message data are not ordered
        * before the remote accesses that produce that message data.
        *
        * This pairs with the barrier in user_map(), via the
        * conditional within the while loop above. Also see the lack
        * of barrier in user_dtr(), which is why this can be after the
        * destroyed check.
        */
       smp_rmb();
 
       c->cur_to_user = msg_get_to_user(t);
       WARN_ON(c->cur_to_user == NULL);
       mutex_unlock(&t->lock);
   }
 
   processed = msg_copy_to_iov(c->cur_to_user, to);
   total_processed += processed;
 
   WARN_ON(c->cur_to_user->posn_to_user > c->cur_to_user->total_to_user);
   if (c->cur_to_user->posn_to_user == c->cur_to_user->total_to_user) {
       struct message *m = c->cur_to_user;
 
       c->cur_to_user = NULL;
       list_add_tail(&m->from_user, &c->from_user);
   }
 
cleanup_unlock:
   mutex_unlock(&c->lock);
   return total_processed;
}
 
static ssize_t dev_write(struct kiocb *iocb, struct iov_iter *from)
{
   struct channel *c = channel_from_file(iocb->ki_filp);
   ssize_t total_processed = 0;
   ssize_t processed;
 
   mutex_lock(&c->lock);
 
   if (unlikely(c->from_user_error)) {
       total_processed = c->from_user_error;
       goto cleanup_unlock;
   }
 
   /*
    * cur_from_user can never be NULL.  If there's no real message it must
    * point to the scratch space.
    */
   WARN_ON(c->cur_from_user == NULL);
   if (c->cur_from_user->posn_from_user < sizeof(struct dm_user_message)) {
       struct message *msg, *old;
 
       processed = msg_copy_from_iov(c->cur_from_user, from);
       if (processed <= 0) {
           pr_warn("msg_copy_from_iov() returned %zu\n",
               processed);
           c->from_user_error = -EINVAL;
           goto cleanup_unlock;
       }
       total_processed += processed;
 
       /*
        * In the unlikely event the user has provided us a very short
        * write, not even big enough to fill a message, just succeed.
        * We'll eventually build up enough bytes to do something.
        */
       if (unlikely(c->cur_from_user->posn_from_user <
                sizeof(struct dm_user_message)))
           goto cleanup_unlock;
 
       old = c->cur_from_user;
       mutex_lock(&c->target->lock);
       msg = msg_get_from_user(c, c->cur_from_user->msg.seq);
       if (msg == NULL) {
           pr_info("user provided an invalid messag seq of %llx\n",
               old->msg.seq);
           mutex_unlock(&c->target->lock);
           c->from_user_error = -EINVAL;
           goto cleanup_unlock;
       }
       mutex_unlock(&c->target->lock);
 
       WARN_ON(old->posn_from_user != sizeof(struct dm_user_message));
       msg->posn_from_user = sizeof(struct dm_user_message);
       msg->return_type = old->msg.type;
       msg->return_flags = old->msg.flags;
       WARN_ON(msg->posn_from_user > msg->total_from_user);
       c->cur_from_user = msg;
       WARN_ON(old != &c->scratch_message_from_user);
   }
 
   /*
    * Userspace can signal an error for single requests by overwriting the
    * seq field.
    */
   switch (c->cur_from_user->return_type) {
   case DM_USER_RESP_SUCCESS:
       c->cur_from_user->bio->bi_status = BLK_STS_OK;
       break;
   case DM_USER_RESP_ERROR:
   case DM_USER_RESP_UNSUPPORTED:
   default:
       c->cur_from_user->bio->bi_status = BLK_STS_IOERR;
       goto finish_bio;
   }
 
   /*
    * The op was a success as far as userspace is concerned, so process
    * whatever data may come along with it.  The user may provide the BIO
    * data in multiple chunks, in which case we don't need to finish the
    * BIO.
    */
   processed = msg_copy_from_iov(c->cur_from_user, from);
   total_processed += processed;
 
   if (c->cur_from_user->posn_from_user <
       c->cur_from_user->total_from_user)
       goto cleanup_unlock;
 
finish_bio:
   /*
    * When we set up this message the BIO's size matched the
    * message size, if that's not still the case then something
    * has gone off the rails.
    */
   WARN_ON(bio_size(c->cur_from_user->bio) != 0);
   bio_endio(c->cur_from_user->bio);
 
   /*
    * We don't actually need to take the target lock here, as all
    * we're doing is freeing the message and mempools have their
    * own lock.  Each channel has its ows scratch message.
    */
   WARN_ON(c->cur_from_user == &c->scratch_message_from_user);
   mempool_free(c->cur_from_user, &c->target->message_pool);
   c->scratch_message_from_user.posn_from_user = 0;
   c->cur_from_user = &c->scratch_message_from_user;
 
cleanup_unlock:
   mutex_unlock(&c->lock);
   return total_processed;
}
 
static int dev_release(struct inode *inode, struct file *file)
{
   struct channel *c;
 
   c = channel_from_file(file);
   mutex_lock(&c->lock);
   channel_free(c);
 
   return 0;
}
 
static const struct file_operations file_operations = {
   .owner = THIS_MODULE,
   .open = dev_open,
   .llseek = no_llseek,
   .read_iter = dev_read,
   .write_iter = dev_write,
   .release = dev_release,
};
 
static int user_ctr(struct dm_target *ti, unsigned int argc, char **argv)
{
   struct target *t;
   int r;
 
   if (argc != 3) {
       ti->error = "Invalid argument count";
       r = -EINVAL;
       goto cleanup_none;
   }
 
   t = kzalloc(sizeof(*t), GFP_KERNEL);
   if (t == NULL) {
       r = -ENOMEM;
       goto cleanup_none;
   }
   ti->private = t;
 
   /* Enable more BIO types. */
   ti->num_discard_bios = 1;
   ti->discards_supported = true;
   ti->num_flush_bios = 1;
   ti->flush_supported = true;
 
   /*
    * We begin with a single reference to the target, which is miscdev's
    * reference.  This ensures that the target won't be freed
    * until after the miscdev has been unregistered and all extant
    * channels have been closed.
    */
   kref_init(&t->references);
 
   t->daemon_terminated = false;
   mutex_init(&t->lock);
   init_waitqueue_head(&t->wq);
   INIT_LIST_HEAD(&t->to_user);
   mempool_init_kmalloc_pool(&t->message_pool, MAX_OUTSTANDING_MESSAGES,
                 sizeof(struct message));
 
   t->miscdev.minor = MISC_DYNAMIC_MINOR;
   t->miscdev.fops = &file_operations;
   t->miscdev.name = kasprintf(GFP_KERNEL, "dm-user/%s", argv[2]);
   if (t->miscdev.name == NULL) {
       r = -ENOMEM;
       goto cleanup_message_pool;
   }
 
   /*
    * Once the miscdev is registered it can be opened and therefor
    * concurrent references to the channel can happen.  Holding the target
    * lock during misc_register() could deadlock.  If registration
    * succeeds then we will not access the target again so we just stick a
    * barrier here, which pairs with taking the target lock everywhere
    * else the target is accessed.
    *
    * I forgot where we ended up on the RCpc/RCsc locks.  IIU RCsc locks
    * would mean that we could take the target lock earlier and release it
    * here instead of the memory barrier.  I'm not sure that's any better,
    * though, and this isn't on a hot path so it probably doesn't matter
    * either way.
    */
   smp_mb();
 
   r = misc_register(&t->miscdev);
   if (r) {
       DMERR("Unable to register miscdev %s for dm-user",
             t->miscdev.name);
       r = -ENOMEM;
       goto cleanup_misc_name;
   }
 
   return 0;
 
cleanup_misc_name:
   kfree(t->miscdev.name);
cleanup_message_pool:
   mempool_exit(&t->message_pool);
   kfree(t);
cleanup_none:
   return r;
}
 
static void user_dtr(struct dm_target *ti)
{
   struct target *t = target_from_target(ti);
 
   /*
    * Removes the miscdev.  This must be called without the target lock
    * held to avoid a possible deadlock because our open implementation is
    * called holding the miscdev lock and must later take the target lock.
    *
    * There is no race here because only DM can register/unregister the
    * miscdev, and DM ensures that doesn't happen twice.  The internal
    * miscdev lock is sufficient to ensure there are no races between
    * deregistering the miscdev and open.
    */
   misc_deregister(&t->miscdev);
 
   /*
    * We are now free to take the target's lock and drop our reference to
    * the target.  There are almost certainly tasks sleeping in read on at
    * least one of the channels associated with this target, this
    * explicitly wakes them up and terminates the read.
    */
   mutex_lock(&t->lock);
   /*
    * No barrier here, as wait/wake ensures that the flag visibility is
    * correct WRT the wake/sleep state of the target tasks.
    */
   t->dm_destroyed = true;
   wake_up_all(&t->wq);
   target_put(t);
}
 
/*
 * Consumes a BIO from device mapper, queueing it up for userspace.
 */
static int user_map(struct dm_target *ti, struct bio *bio)
{
   struct target *t;
   struct message *entry;
 
   t = target_from_target(ti);
   /*
    * FIXME
    *
    * This seems like a bad idea.  Specifically, here we're
    * directly on the IO path when we take the target lock, which may also
    * be taken from a user context.  The user context doesn't actively
    * trigger anything that may sleep while holding the lock, but this
    * still seems like a bad idea.
    *
    * The obvious way to fix this would be to use a proper queue, which
    * would result in no shared locks between the direct IO path and user
    * tasks.  I had a version that did this, but the head-of-line blocking
    * from the circular buffer resulted in us needing a fairly large
    * allocation in order to avoid situations in which the queue fills up
    * and everything goes off the rails.
    *
    * I could jump through a some hoops to avoid a shared lock while still
    * allowing for a large queue, but I'm not actually sure that allowing
    * for very large queues is the right thing to do here.  Intuitively it
    * seems better to keep the queues small in here (essentially sized to
    * the user latency for performance reasons only) and rely on returning
    * DM_MAPIO_REQUEUE regularly, as that would give the rest of the
    * kernel more information.
    *
    * I'll spend some time trying to figure out what's going on with
    * DM_MAPIO_REQUEUE, but if someone has a better idea of how to fix
    * this I'm all ears.
    */
   mutex_lock(&t->lock);
 
   /*
    * FIXME
    *
    * The assumption here is that there's no benefit to returning
    * DM_MAPIO_KILL as opposed to just erroring out the BIO, but I'm not
    * sure that's actually true -- for example, I could imagine users
    * expecting that submitted BIOs are unlikely to fail and therefor
    * relying on submission failure to indicate an unsupported type.
    *
    * There's two ways I can think of to fix this:
    *   - Add DM arguments that are parsed during the constructor that
    *     allow various dm_target flags to be set that indicate the op
    *     types supported by this target.  This may make sense for things
    *     like discard, where DM can already transform the BIOs to a form
    *     that's likely to be supported.
    *   - Some sort of pre-filter that allows userspace to hook in here
    *     and kill BIOs before marking them as submitted.  My guess would
    *     be that a userspace round trip is a bad idea here, but a BPF
    *     call seems resonable.
    *
    * My guess is that we'd likely want to do both.  The first one is easy
    * and gives DM the proper info, so it seems better.  The BPF call
    * seems overly complex for just this, but one could imagine wanting to
    * sometimes return _MAPPED and a BPF filter would be the way to do
    * that.
    *
    * For example, in Android we have an in-kernel DM device called
    * "dm-bow" that takes advange of some portion of the space that has
    * been discarded on a device to provide opportunistic block-level
    * backups.  While one could imagine just implementing this entirely in
    * userspace, that would come with an appreciable performance penalty.
    * Instead one could keep a BPF program that forwards most accesses
    * directly to the backing block device while informing a userspace
    * daemon of any discarded space and on writes to blocks that are to be
    * backed up.
    */
   if (unlikely((bio_type_to_user_type(bio) < 0) ||
            (bio_flags_to_user_flags(bio) < 0))) {
       mutex_unlock(&t->lock);
       return DM_MAPIO_KILL;
   }
 
   entry = msg_get_map(t);
   if (unlikely(entry == NULL)) {
       mutex_unlock(&t->lock);
       return DM_MAPIO_REQUEUE;
   }
 
   entry->msg.type = bio_type_to_user_type(bio);
   entry->msg.flags = bio_flags_to_user_flags(bio);
   entry->msg.sector = bio->bi_iter.bi_sector;
   entry->msg.len = bio_size(bio);
   entry->bio = bio;
   entry->posn_to_user = 0;
   entry->total_to_user = bio_bytes_needed_to_user(bio);
   entry->posn_from_user = 0;
   entry->total_from_user = bio_bytes_needed_from_user(bio);
   entry->delayed = false;
   entry->t = t;
   /* Pairs with the barrier in dev_read() */
   smp_wmb();
   list_add_tail(&entry->to_user, &t->to_user);
 
   /*
    * If there is no daemon to process the IO's,
    * queue these messages into a workqueue with
    * a timeout.
    */
   if (!is_user_space_thread_present(t))
       enqueue_delayed_work(entry, !t->daemon_terminated);
 
   wake_up_interruptible(&t->wq);
   mutex_unlock(&t->lock);
   return DM_MAPIO_SUBMITTED;
}
 
static struct target_type user_target = {
   .name = "user",
   .version = { 1, 0, 0 },
   .module = THIS_MODULE,
   .ctr = user_ctr,
   .dtr = user_dtr,
   .map = user_map,
};
 
static int __init dm_user_init(void)
{
   int r;
 
   r = dm_register_target(&user_target);
   if (r) {
       DMERR("register failed %d", r);
       goto error;
   }
 
   return 0;
 
error:
   return r;
}
 
static void __exit dm_user_exit(void)
{
   dm_unregister_target(&user_target);
}
 
module_init(dm_user_init);
module_exit(dm_user_exit);
MODULE_AUTHOR("Palmer Dabbelt <palmerdabbelt@google.com>");
MODULE_DESCRIPTION(DM_NAME " target returning blocks from userspace");
MODULE_LICENSE("GPL");