hc
2024-12-19 9370bb92b2d16684ee45cf24e879c93c509162da
kernel/fs/pipe.c
....@@ -14,6 +14,7 @@
1414 #include <linux/fs.h>
1515 #include <linux/log2.h>
1616 #include <linux/mount.h>
17
+#include <linux/pseudo_fs.h>
1718 #include <linux/magic.h>
1819 #include <linux/pipe_fs_i.h>
1920 #include <linux/uio.h>
....@@ -23,6 +24,7 @@
2324 #include <linux/syscalls.h>
2425 #include <linux/fcntl.h>
2526 #include <linux/memcontrol.h>
27
+#include <linux/watch_queue.h>
2628
2729 #include <linux/uaccess.h>
2830 #include <asm/ioctls.h>
....@@ -57,10 +59,12 @@
5759 unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;
5860
5961 /*
60
- * We use a start+len construction, which provides full use of the
61
- * allocated memory.
62
- * -- Florian Coosmann (FGC)
63
- *
62
+ * We use head and tail indices that aren't masked off, except at the point of
63
+ * dereference, but rather they're allowed to wrap naturally. This means there
64
+ * isn't a dead spot in the buffer, but the ring has to be a power of two and
65
+ * <= 2^31.
66
+ * -- David Howells 2019-09-23.
67
+ *
6468 * Reads with count = 0 should always return 0.
6569 * -- Julian Bradfield 1999-06-07.
6670 *
....@@ -117,22 +121,6 @@
117121 }
118122 }
119123
120
-/* Drop the inode semaphore and wait for a pipe event, atomically */
121
-void pipe_wait(struct pipe_inode_info *pipe)
122
-{
123
- DEFINE_WAIT(wait);
124
-
125
- /*
126
- * Pipes are system-local resources, so sleeping on them
127
- * is considered a noninteractive wait:
128
- */
129
- prepare_to_wait(&pipe->wait, &wait, TASK_INTERRUPTIBLE);
130
- pipe_unlock(pipe);
131
- schedule();
132
- finish_wait(&pipe->wait, &wait);
133
- pipe_lock(pipe);
134
-}
135
-
136124 static void anon_pipe_buf_release(struct pipe_inode_info *pipe,
137125 struct pipe_buffer *buf)
138126 {
....@@ -149,22 +137,20 @@
149137 put_page(page);
150138 }
151139
152
-static int anon_pipe_buf_steal(struct pipe_inode_info *pipe,
153
- struct pipe_buffer *buf)
140
+static bool anon_pipe_buf_try_steal(struct pipe_inode_info *pipe,
141
+ struct pipe_buffer *buf)
154142 {
155143 struct page *page = buf->page;
156144
157
- if (page_count(page) == 1) {
158
- if (memcg_kmem_enabled())
159
- memcg_kmem_uncharge(page, 0);
160
- __SetPageLocked(page);
161
- return 0;
162
- }
163
- return 1;
145
+ if (page_count(page) != 1)
146
+ return false;
147
+ memcg_kmem_uncharge_page(page, 0);
148
+ __SetPageLocked(page);
149
+ return true;
164150 }
165151
166152 /**
167
- * generic_pipe_buf_steal - attempt to take ownership of a &pipe_buffer
153
+ * generic_pipe_buf_try_steal - attempt to take ownership of a &pipe_buffer
168154 * @pipe: the pipe that the buffer belongs to
169155 * @buf: the buffer to attempt to steal
170156 *
....@@ -175,8 +161,8 @@
175161 * he wishes; the typical use is insertion into a different file
176162 * page cache.
177163 */
178
-int generic_pipe_buf_steal(struct pipe_inode_info *pipe,
179
- struct pipe_buffer *buf)
164
+bool generic_pipe_buf_try_steal(struct pipe_inode_info *pipe,
165
+ struct pipe_buffer *buf)
180166 {
181167 struct page *page = buf->page;
182168
....@@ -187,12 +173,11 @@
187173 */
188174 if (page_count(page) == 1) {
189175 lock_page(page);
190
- return 0;
176
+ return true;
191177 }
192
-
193
- return 1;
178
+ return false;
194179 }
195
-EXPORT_SYMBOL(generic_pipe_buf_steal);
180
+EXPORT_SYMBOL(generic_pipe_buf_try_steal);
196181
197182 /**
198183 * generic_pipe_buf_get - get a reference to a &struct pipe_buffer
....@@ -211,22 +196,6 @@
211196 EXPORT_SYMBOL(generic_pipe_buf_get);
212197
213198 /**
214
- * generic_pipe_buf_confirm - verify contents of the pipe buffer
215
- * @info: the pipe that the buffer belongs to
216
- * @buf: the buffer to confirm
217
- *
218
- * Description:
219
- * This function does nothing, because the generic pipe code uses
220
- * pages that are always good when inserted into the pipe.
221
- */
222
-int generic_pipe_buf_confirm(struct pipe_inode_info *info,
223
- struct pipe_buffer *buf)
224
-{
225
- return 0;
226
-}
227
-EXPORT_SYMBOL(generic_pipe_buf_confirm);
228
-
229
-/**
230199 * generic_pipe_buf_release - put a reference to a &struct pipe_buffer
231200 * @pipe: the pipe that the buffer belongs to
232201 * @buf: the buffer to put a reference to
....@@ -242,33 +211,19 @@
242211 EXPORT_SYMBOL(generic_pipe_buf_release);
243212
244213 static const struct pipe_buf_operations anon_pipe_buf_ops = {
245
- .can_merge = 1,
246
- .confirm = generic_pipe_buf_confirm,
247
- .release = anon_pipe_buf_release,
248
- .steal = anon_pipe_buf_steal,
249
- .get = generic_pipe_buf_get,
214
+ .release = anon_pipe_buf_release,
215
+ .try_steal = anon_pipe_buf_try_steal,
216
+ .get = generic_pipe_buf_get,
250217 };
251218
252
-static const struct pipe_buf_operations anon_pipe_buf_nomerge_ops = {
253
- .can_merge = 0,
254
- .confirm = generic_pipe_buf_confirm,
255
- .release = anon_pipe_buf_release,
256
- .steal = anon_pipe_buf_steal,
257
- .get = generic_pipe_buf_get,
258
-};
259
-
260
-static const struct pipe_buf_operations packet_pipe_buf_ops = {
261
- .can_merge = 0,
262
- .confirm = generic_pipe_buf_confirm,
263
- .release = anon_pipe_buf_release,
264
- .steal = anon_pipe_buf_steal,
265
- .get = generic_pipe_buf_get,
266
-};
267
-
268
-void pipe_buf_mark_unmergeable(struct pipe_buffer *buf)
219
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
220
+static inline bool pipe_readable(const struct pipe_inode_info *pipe)
269221 {
270
- if (buf->ops == &anon_pipe_buf_ops)
271
- buf->ops = &anon_pipe_buf_nomerge_ops;
222
+ unsigned int head = READ_ONCE(pipe->head);
223
+ unsigned int tail = READ_ONCE(pipe->tail);
224
+ unsigned int writers = READ_ONCE(pipe->writers);
225
+
226
+ return !pipe_empty(head, tail) || !writers;
272227 }
273228
274229 static ssize_t
....@@ -277,27 +232,69 @@
277232 size_t total_len = iov_iter_count(to);
278233 struct file *filp = iocb->ki_filp;
279234 struct pipe_inode_info *pipe = filp->private_data;
280
- int do_wakeup;
235
+ bool was_full, wake_next_reader = false;
281236 ssize_t ret;
282237
283238 /* Null read succeeds. */
284239 if (unlikely(total_len == 0))
285240 return 0;
286241
287
- do_wakeup = 0;
288242 ret = 0;
289243 __pipe_lock(pipe);
244
+
245
+ /*
246
+ * We only wake up writers if the pipe was full when we started
247
+ * reading in order to avoid unnecessary wakeups.
248
+ *
249
+ * But when we do wake up writers, we do so using a sync wakeup
250
+ * (WF_SYNC), because we want them to get going and generate more
251
+ * data for us.
252
+ */
253
+ was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
290254 for (;;) {
291
- int bufs = pipe->nrbufs;
292
- if (bufs) {
293
- int curbuf = pipe->curbuf;
294
- struct pipe_buffer *buf = pipe->bufs + curbuf;
255
+ /* Read ->head with a barrier vs post_one_notification() */
256
+ unsigned int head = smp_load_acquire(&pipe->head);
257
+ unsigned int tail = pipe->tail;
258
+ unsigned int mask = pipe->ring_size - 1;
259
+
260
+#ifdef CONFIG_WATCH_QUEUE
261
+ if (pipe->note_loss) {
262
+ struct watch_notification n;
263
+
264
+ if (total_len < 8) {
265
+ if (ret == 0)
266
+ ret = -ENOBUFS;
267
+ break;
268
+ }
269
+
270
+ n.type = WATCH_TYPE_META;
271
+ n.subtype = WATCH_META_LOSS_NOTIFICATION;
272
+ n.info = watch_sizeof(n);
273
+ if (copy_to_iter(&n, sizeof(n), to) != sizeof(n)) {
274
+ if (ret == 0)
275
+ ret = -EFAULT;
276
+ break;
277
+ }
278
+ ret += sizeof(n);
279
+ total_len -= sizeof(n);
280
+ pipe->note_loss = false;
281
+ }
282
+#endif
283
+
284
+ if (!pipe_empty(head, tail)) {
285
+ struct pipe_buffer *buf = &pipe->bufs[tail & mask];
295286 size_t chars = buf->len;
296287 size_t written;
297288 int error;
298289
299
- if (chars > total_len)
290
+ if (chars > total_len) {
291
+ if (buf->flags & PIPE_BUF_FLAG_WHOLE) {
292
+ if (ret == 0)
293
+ ret = -ENOBUFS;
294
+ break;
295
+ }
300296 chars = total_len;
297
+ }
301298
302299 error = pipe_buf_confirm(pipe, buf);
303300 if (error) {
....@@ -324,50 +321,77 @@
324321
325322 if (!buf->len) {
326323 pipe_buf_release(pipe, buf);
327
- curbuf = (curbuf + 1) & (pipe->buffers - 1);
328
- pipe->curbuf = curbuf;
329
- pipe->nrbufs = --bufs;
330
- do_wakeup = 1;
324
+ spin_lock_irq(&pipe->rd_wait.lock);
325
+#ifdef CONFIG_WATCH_QUEUE
326
+ if (buf->flags & PIPE_BUF_FLAG_LOSS)
327
+ pipe->note_loss = true;
328
+#endif
329
+ tail++;
330
+ pipe->tail = tail;
331
+ spin_unlock_irq(&pipe->rd_wait.lock);
331332 }
332333 total_len -= chars;
333334 if (!total_len)
334335 break; /* common path: read succeeded */
336
+ if (!pipe_empty(head, tail)) /* More to do? */
337
+ continue;
335338 }
336
- if (bufs) /* More to do? */
337
- continue;
339
+
338340 if (!pipe->writers)
339341 break;
340
- if (!pipe->waiting_writers) {
341
- /* syscall merging: Usually we must not sleep
342
- * if O_NONBLOCK is set, or if we got some data.
343
- * But if a writer sleeps in kernel space, then
344
- * we can wait for that data without violating POSIX.
345
- */
346
- if (ret)
347
- break;
348
- if (filp->f_flags & O_NONBLOCK) {
349
- ret = -EAGAIN;
350
- break;
351
- }
352
- }
353
- if (signal_pending(current)) {
354
- if (!ret)
355
- ret = -ERESTARTSYS;
342
+ if (ret)
343
+ break;
344
+ if (filp->f_flags & O_NONBLOCK) {
345
+ ret = -EAGAIN;
356346 break;
357347 }
358
- if (do_wakeup) {
359
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
360
- kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
348
+ __pipe_unlock(pipe);
349
+
350
+ /*
351
+ * We only get here if we didn't actually read anything.
352
+ *
353
+ * However, we could have seen (and removed) a zero-sized
354
+ * pipe buffer, and might have made space in the buffers
355
+ * that way.
356
+ *
357
+ * You can't make zero-sized pipe buffers by doing an empty
358
+ * write (not even in packet mode), but they can happen if
359
+ * the writer gets an EFAULT when trying to fill a buffer
360
+ * that already got allocated and inserted in the buffer
361
+ * array.
362
+ *
363
+ * So we still need to wake up any pending writers in the
364
+ * _very_ unlikely case that the pipe was full, but we got
365
+ * no data.
366
+ */
367
+ if (unlikely(was_full)) {
368
+ wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
369
+ kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
361370 }
362
- pipe_wait(pipe);
371
+
372
+ /*
373
+ * But because we didn't read anything, at this point we can
374
+ * just return directly with -ERESTARTSYS if we're interrupted,
375
+ * since we've done any required wakeups and there's no need
376
+ * to mark anything accessed. And we've dropped the lock.
377
+ */
378
+ if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
379
+ return -ERESTARTSYS;
380
+
381
+ __pipe_lock(pipe);
382
+ was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
383
+ wake_next_reader = true;
363384 }
385
+ if (pipe_empty(pipe->head, pipe->tail))
386
+ wake_next_reader = false;
364387 __pipe_unlock(pipe);
365388
366
- /* Signal writers asynchronously that there is more room. */
367
- if (do_wakeup) {
368
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
389
+ if (was_full) {
390
+ wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
369391 kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
370392 }
393
+ if (wake_next_reader)
394
+ wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
371395 if (ret > 0)
372396 file_accessed(filp);
373397 return ret;
....@@ -378,15 +402,28 @@
378402 return (file->f_flags & O_DIRECT) != 0;
379403 }
380404
405
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
406
+static inline bool pipe_writable(const struct pipe_inode_info *pipe)
407
+{
408
+ unsigned int head = READ_ONCE(pipe->head);
409
+ unsigned int tail = READ_ONCE(pipe->tail);
410
+ unsigned int max_usage = READ_ONCE(pipe->max_usage);
411
+
412
+ return !pipe_full(head, tail, max_usage) ||
413
+ !READ_ONCE(pipe->readers);
414
+}
415
+
381416 static ssize_t
382417 pipe_write(struct kiocb *iocb, struct iov_iter *from)
383418 {
384419 struct file *filp = iocb->ki_filp;
385420 struct pipe_inode_info *pipe = filp->private_data;
421
+ unsigned int head;
386422 ssize_t ret = 0;
387
- int do_wakeup = 0;
388423 size_t total_len = iov_iter_count(from);
389424 ssize_t chars;
425
+ bool was_empty = false;
426
+ bool wake_next_writer = false;
390427
391428 /* Null write succeeds. */
392429 if (unlikely(total_len == 0))
....@@ -400,15 +437,34 @@
400437 goto out;
401438 }
402439
403
- /* We try to merge small writes */
404
- chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
405
- if (pipe->nrbufs && chars != 0) {
406
- int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
407
- (pipe->buffers - 1);
408
- struct pipe_buffer *buf = pipe->bufs + lastbuf;
440
+#ifdef CONFIG_WATCH_QUEUE
441
+ if (pipe->watch_queue) {
442
+ ret = -EXDEV;
443
+ goto out;
444
+ }
445
+#endif
446
+
447
+ /*
448
+ * Epoll nonsensically wants a wakeup whether the pipe
449
+ * was already empty or not.
450
+ *
451
+ * If it wasn't empty we try to merge new data into
452
+ * the last buffer.
453
+ *
454
+ * That naturally merges small writes, but it also
455
+ * page-aligns the rest of the writes for large writes
456
+ * spanning multiple pages.
457
+ */
458
+ head = pipe->head;
459
+ was_empty = true;
460
+ chars = total_len & (PAGE_SIZE-1);
461
+ if (chars && !pipe_empty(head, pipe->tail)) {
462
+ unsigned int mask = pipe->ring_size - 1;
463
+ struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
409464 int offset = buf->offset + buf->len;
410465
411
- if (buf->ops->can_merge && offset + chars <= PAGE_SIZE) {
466
+ if ((buf->flags & PIPE_BUF_FLAG_CAN_MERGE) &&
467
+ offset + chars <= PAGE_SIZE) {
412468 ret = pipe_buf_confirm(pipe, buf);
413469 if (ret)
414470 goto out;
....@@ -418,7 +474,7 @@
418474 ret = -EFAULT;
419475 goto out;
420476 }
421
- do_wakeup = 1;
477
+
422478 buf->len += ret;
423479 if (!iov_iter_count(from))
424480 goto out;
....@@ -426,18 +482,17 @@
426482 }
427483
428484 for (;;) {
429
- int bufs;
430
-
431485 if (!pipe->readers) {
432486 send_sig(SIGPIPE, current, 0);
433487 if (!ret)
434488 ret = -EPIPE;
435489 break;
436490 }
437
- bufs = pipe->nrbufs;
438
- if (bufs < pipe->buffers) {
439
- int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1);
440
- struct pipe_buffer *buf = pipe->bufs + newbuf;
491
+
492
+ head = pipe->head;
493
+ if (!pipe_full(head, pipe->tail, pipe->max_usage)) {
494
+ unsigned int mask = pipe->ring_size - 1;
495
+ struct pipe_buffer *buf = &pipe->bufs[head & mask];
441496 struct page *page = pipe->tmp_page;
442497 int copied;
443498
....@@ -449,12 +504,35 @@
449504 }
450505 pipe->tmp_page = page;
451506 }
452
- /* Always wake up, even if the copy fails. Otherwise
453
- * we lock up (O_NONBLOCK-)readers that sleep due to
454
- * syscall merging.
455
- * FIXME! Is this really true?
507
+
508
+ /* Allocate a slot in the ring in advance and attach an
509
+ * empty buffer. If we fault or otherwise fail to use
510
+ * it, either the reader will consume it or it'll still
511
+ * be there for the next write.
456512 */
457
- do_wakeup = 1;
513
+ spin_lock_irq(&pipe->rd_wait.lock);
514
+
515
+ head = pipe->head;
516
+ if (pipe_full(head, pipe->tail, pipe->max_usage)) {
517
+ spin_unlock_irq(&pipe->rd_wait.lock);
518
+ continue;
519
+ }
520
+
521
+ pipe->head = head + 1;
522
+ spin_unlock_irq(&pipe->rd_wait.lock);
523
+
524
+ /* Insert it into the buffer array */
525
+ buf = &pipe->bufs[head & mask];
526
+ buf->page = page;
527
+ buf->ops = &anon_pipe_buf_ops;
528
+ buf->offset = 0;
529
+ buf->len = 0;
530
+ if (is_packetized(filp))
531
+ buf->flags = PIPE_BUF_FLAG_PACKET;
532
+ else
533
+ buf->flags = PIPE_BUF_FLAG_CAN_MERGE;
534
+ pipe->tmp_page = NULL;
535
+
458536 copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
459537 if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
460538 if (!ret)
....@@ -462,25 +540,17 @@
462540 break;
463541 }
464542 ret += copied;
465
-
466
- /* Insert it into the buffer array */
467
- buf->page = page;
468
- buf->ops = &anon_pipe_buf_ops;
469543 buf->offset = 0;
470544 buf->len = copied;
471
- buf->flags = 0;
472
- if (is_packetized(filp)) {
473
- buf->ops = &packet_pipe_buf_ops;
474
- buf->flags = PIPE_BUF_FLAG_PACKET;
475
- }
476
- pipe->nrbufs = ++bufs;
477
- pipe->tmp_page = NULL;
478545
479546 if (!iov_iter_count(from))
480547 break;
481548 }
482
- if (bufs < pipe->buffers)
549
+
550
+ if (!pipe_full(head, pipe->tail, pipe->max_usage))
483551 continue;
552
+
553
+ /* Wait for buffer space to become available. */
484554 if (filp->f_flags & O_NONBLOCK) {
485555 if (!ret)
486556 ret = -EAGAIN;
....@@ -491,21 +561,43 @@
491561 ret = -ERESTARTSYS;
492562 break;
493563 }
494
- if (do_wakeup) {
495
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
564
+
565
+ /*
566
+ * We're going to release the pipe lock and wait for more
567
+ * space. We wake up any readers if necessary, and then
568
+ * after waiting we need to re-check whether the pipe
569
+ * become empty while we dropped the lock.
570
+ */
571
+ __pipe_unlock(pipe);
572
+ if (was_empty) {
573
+ wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
496574 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
497
- do_wakeup = 0;
498575 }
499
- pipe->waiting_writers++;
500
- pipe_wait(pipe);
501
- pipe->waiting_writers--;
576
+ wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
577
+ __pipe_lock(pipe);
578
+ was_empty = pipe_empty(pipe->head, pipe->tail);
579
+ wake_next_writer = true;
502580 }
503581 out:
582
+ if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
583
+ wake_next_writer = false;
504584 __pipe_unlock(pipe);
505
- if (do_wakeup) {
506
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
585
+
586
+ /*
587
+ * If we do do a wakeup event, we do a 'sync' wakeup, because we
588
+ * want the reader to start processing things asap, rather than
589
+ * leave the data pending.
590
+ *
591
+ * This is particularly important for small writes, because of
592
+ * how (for example) the GNU make jobserver uses small writes to
593
+ * wake up pending jobs
594
+ */
595
+ if (was_empty) {
596
+ wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
507597 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
508598 }
599
+ if (wake_next_writer)
600
+ wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
509601 if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {
510602 int err = file_update_time(filp);
511603 if (err)
....@@ -518,23 +610,40 @@
518610 static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
519611 {
520612 struct pipe_inode_info *pipe = filp->private_data;
521
- int count, buf, nrbufs;
613
+ int count, head, tail, mask;
522614
523615 switch (cmd) {
524
- case FIONREAD:
525
- __pipe_lock(pipe);
526
- count = 0;
527
- buf = pipe->curbuf;
528
- nrbufs = pipe->nrbufs;
529
- while (--nrbufs >= 0) {
530
- count += pipe->bufs[buf].len;
531
- buf = (buf+1) & (pipe->buffers - 1);
532
- }
533
- __pipe_unlock(pipe);
616
+ case FIONREAD:
617
+ __pipe_lock(pipe);
618
+ count = 0;
619
+ head = pipe->head;
620
+ tail = pipe->tail;
621
+ mask = pipe->ring_size - 1;
534622
535
- return put_user(count, (int __user *)arg);
536
- default:
537
- return -ENOIOCTLCMD;
623
+ while (tail != head) {
624
+ count += pipe->bufs[tail & mask].len;
625
+ tail++;
626
+ }
627
+ __pipe_unlock(pipe);
628
+
629
+ return put_user(count, (int __user *)arg);
630
+
631
+#ifdef CONFIG_WATCH_QUEUE
632
+ case IOC_WATCH_QUEUE_SET_SIZE: {
633
+ int ret;
634
+ __pipe_lock(pipe);
635
+ ret = watch_queue_set_size(pipe, arg);
636
+ __pipe_unlock(pipe);
637
+ return ret;
638
+ }
639
+
640
+ case IOC_WATCH_QUEUE_SET_FILTER:
641
+ return watch_queue_set_filter(
642
+ pipe, (struct watch_notification_filter __user *)arg);
643
+#endif
644
+
645
+ default:
646
+ return -ENOIOCTLCMD;
538647 }
539648 }
540649
....@@ -544,21 +653,38 @@
544653 {
545654 __poll_t mask;
546655 struct pipe_inode_info *pipe = filp->private_data;
547
- int nrbufs;
656
+ unsigned int head, tail;
548657
549
- poll_wait(filp, &pipe->wait, wait);
658
+ /*
659
+ * Reading pipe state only -- no need for acquiring the semaphore.
660
+ *
661
+ * But because this is racy, the code has to add the
662
+ * entry to the poll table _first_ ..
663
+ */
664
+ if (filp->f_mode & FMODE_READ)
665
+ poll_wait(filp, &pipe->rd_wait, wait);
666
+ if (filp->f_mode & FMODE_WRITE)
667
+ poll_wait(filp, &pipe->wr_wait, wait);
550668
551
- /* Reading only -- no need for acquiring the semaphore. */
552
- nrbufs = pipe->nrbufs;
669
+ /*
670
+ * .. and only then can you do the racy tests. That way,
671
+ * if something changes and you got it wrong, the poll
672
+ * table entry will wake you up and fix it.
673
+ */
674
+ head = READ_ONCE(pipe->head);
675
+ tail = READ_ONCE(pipe->tail);
676
+
553677 mask = 0;
554678 if (filp->f_mode & FMODE_READ) {
555
- mask = (nrbufs > 0) ? EPOLLIN | EPOLLRDNORM : 0;
679
+ if (!pipe_empty(head, tail))
680
+ mask |= EPOLLIN | EPOLLRDNORM;
556681 if (!pipe->writers && filp->f_version != pipe->w_counter)
557682 mask |= EPOLLHUP;
558683 }
559684
560685 if (filp->f_mode & FMODE_WRITE) {
561
- mask |= (nrbufs < pipe->buffers) ? EPOLLOUT | EPOLLWRNORM : 0;
686
+ if (!pipe_full(head, tail, pipe->max_usage))
687
+ mask |= EPOLLOUT | EPOLLWRNORM;
562688 /*
563689 * Most Unices do not set EPOLLERR for FIFOs but on Linux they
564690 * behave exactly like pipes for poll().
....@@ -596,8 +722,10 @@
596722 if (file->f_mode & FMODE_WRITE)
597723 pipe->writers--;
598724
599
- if (pipe->readers || pipe->writers) {
600
- wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM | EPOLLERR | EPOLLHUP);
725
+ /* Was that the last reader or writer, but not the other side? */
726
+ if (!pipe->readers != !pipe->writers) {
727
+ wake_up_interruptible_all(&pipe->rd_wait);
728
+ wake_up_interruptible_all(&pipe->wr_wait);
601729 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
602730 kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
603731 }
....@@ -626,27 +754,27 @@
626754 return retval;
627755 }
628756
629
-static unsigned long account_pipe_buffers(struct user_struct *user,
630
- unsigned long old, unsigned long new)
757
+unsigned long account_pipe_buffers(struct user_struct *user,
758
+ unsigned long old, unsigned long new)
631759 {
632760 return atomic_long_add_return(new - old, &user->pipe_bufs);
633761 }
634762
635
-static bool too_many_pipe_buffers_soft(unsigned long user_bufs)
763
+bool too_many_pipe_buffers_soft(unsigned long user_bufs)
636764 {
637765 unsigned long soft_limit = READ_ONCE(pipe_user_pages_soft);
638766
639767 return soft_limit && user_bufs > soft_limit;
640768 }
641769
642
-static bool too_many_pipe_buffers_hard(unsigned long user_bufs)
770
+bool too_many_pipe_buffers_hard(unsigned long user_bufs)
643771 {
644772 unsigned long hard_limit = READ_ONCE(pipe_user_pages_hard);
645773
646774 return hard_limit && user_bufs > hard_limit;
647775 }
648776
649
-static bool is_unprivileged_user(void)
777
+bool pipe_is_unprivileged_user(void)
650778 {
651779 return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN);
652780 }
....@@ -668,21 +796,24 @@
668796
669797 user_bufs = account_pipe_buffers(user, 0, pipe_bufs);
670798
671
- if (too_many_pipe_buffers_soft(user_bufs) && is_unprivileged_user()) {
799
+ if (too_many_pipe_buffers_soft(user_bufs) && pipe_is_unprivileged_user()) {
672800 user_bufs = account_pipe_buffers(user, pipe_bufs, PIPE_MIN_DEF_BUFFERS);
673801 pipe_bufs = PIPE_MIN_DEF_BUFFERS;
674802 }
675803
676
- if (too_many_pipe_buffers_hard(user_bufs) && is_unprivileged_user())
804
+ if (too_many_pipe_buffers_hard(user_bufs) && pipe_is_unprivileged_user())
677805 goto out_revert_acct;
678806
679807 pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer),
680808 GFP_KERNEL_ACCOUNT);
681809
682810 if (pipe->bufs) {
683
- init_waitqueue_head(&pipe->wait);
811
+ init_waitqueue_head(&pipe->rd_wait);
812
+ init_waitqueue_head(&pipe->wr_wait);
684813 pipe->r_counter = pipe->w_counter = 1;
685
- pipe->buffers = pipe_bufs;
814
+ pipe->max_usage = pipe_bufs;
815
+ pipe->ring_size = pipe_bufs;
816
+ pipe->nr_accounted = pipe_bufs;
686817 pipe->user = user;
687818 mutex_init(&pipe->mutex);
688819 return pipe;
....@@ -700,13 +831,22 @@
700831 {
701832 int i;
702833
703
- (void) account_pipe_buffers(pipe->user, pipe->buffers, 0);
834
+#ifdef CONFIG_WATCH_QUEUE
835
+ if (pipe->watch_queue)
836
+ watch_queue_clear(pipe->watch_queue);
837
+#endif
838
+
839
+ (void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0);
704840 free_uid(pipe->user);
705
- for (i = 0; i < pipe->buffers; i++) {
841
+ for (i = 0; i < pipe->ring_size; i++) {
706842 struct pipe_buffer *buf = pipe->bufs + i;
707843 if (buf->ops)
708844 pipe_buf_release(pipe, buf);
709845 }
846
+#ifdef CONFIG_WATCH_QUEUE
847
+ if (pipe->watch_queue)
848
+ put_watch_queue(pipe->watch_queue);
849
+#endif
710850 if (pipe->tmp_page)
711851 __free_page(pipe->tmp_page);
712852 kfree(pipe->bufs);
....@@ -772,9 +912,19 @@
772912 {
773913 struct inode *inode = get_pipe_inode();
774914 struct file *f;
915
+ int error;
775916
776917 if (!inode)
777918 return -ENFILE;
919
+
920
+ if (flags & O_NOTIFICATION_PIPE) {
921
+ error = watch_queue_init(inode->i_pipe);
922
+ if (error) {
923
+ free_pipe_info(inode->i_pipe);
924
+ iput(inode);
925
+ return error;
926
+ }
927
+ }
778928
779929 f = alloc_file_pseudo(inode, pipe_mnt, "",
780930 O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)),
....@@ -796,6 +946,8 @@
796946 }
797947 res[0]->private_data = inode->i_pipe;
798948 res[1] = f;
949
+ stream_open(inode, res[0]);
950
+ stream_open(inode, res[1]);
799951 return 0;
800952 }
801953
....@@ -804,7 +956,7 @@
804956 int error;
805957 int fdw, fdr;
806958
807
- if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT))
959
+ if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT | O_NOTIFICATION_PIPE))
808960 return -EINVAL;
809961
810962 error = create_pipe_files(files, flags);
....@@ -881,12 +1033,52 @@
8811033 return do_pipe2(fildes, 0);
8821034 }
8831035
1036
+/*
1037
+ * This is the stupid "wait for pipe to be readable or writable"
1038
+ * model.
1039
+ *
1040
+ * See pipe_read/write() for the proper kind of exclusive wait,
1041
+ * but that requires that we wake up any other readers/writers
1042
+ * if we then do not end up reading everything (ie the whole
1043
+ * "wake_next_reader/writer" logic in pipe_read/write()).
1044
+ */
1045
+void pipe_wait_readable(struct pipe_inode_info *pipe)
1046
+{
1047
+ pipe_unlock(pipe);
1048
+ wait_event_interruptible(pipe->rd_wait, pipe_readable(pipe));
1049
+ pipe_lock(pipe);
1050
+}
1051
+
1052
+void pipe_wait_writable(struct pipe_inode_info *pipe)
1053
+{
1054
+ pipe_unlock(pipe);
1055
+ wait_event_interruptible(pipe->wr_wait, pipe_writable(pipe));
1056
+ pipe_lock(pipe);
1057
+}
1058
+
1059
+/*
1060
+ * This depends on both the wait (here) and the wakeup (wake_up_partner)
1061
+ * holding the pipe lock, so "*cnt" is stable and we know a wakeup cannot
1062
+ * race with the count check and waitqueue prep.
1063
+ *
1064
+ * Normally in order to avoid races, you'd do the prepare_to_wait() first,
1065
+ * then check the condition you're waiting for, and only then sleep. But
1066
+ * because of the pipe lock, we can check the condition before being on
1067
+ * the wait queue.
1068
+ *
1069
+ * We use the 'rd_wait' waitqueue for pipe partner waiting.
1070
+ */
8841071 static int wait_for_partner(struct pipe_inode_info *pipe, unsigned int *cnt)
8851072 {
886
- int cur = *cnt;
1073
+ DEFINE_WAIT(rdwait);
1074
+ int cur = *cnt;
8871075
8881076 while (cur == *cnt) {
889
- pipe_wait(pipe);
1077
+ prepare_to_wait(&pipe->rd_wait, &rdwait, TASK_INTERRUPTIBLE);
1078
+ pipe_unlock(pipe);
1079
+ schedule();
1080
+ finish_wait(&pipe->rd_wait, &rdwait);
1081
+ pipe_lock(pipe);
8901082 if (signal_pending(current))
8911083 break;
8921084 }
....@@ -895,7 +1087,7 @@
8951087
8961088 static void wake_up_partner(struct pipe_inode_info *pipe)
8971089 {
898
- wake_up_interruptible(&pipe->wait);
1090
+ wake_up_interruptible_all(&pipe->rd_wait);
8991091 }
9001092
9011093 static int fifo_open(struct inode *inode, struct file *filp)
....@@ -934,9 +1126,9 @@
9341126 __pipe_lock(pipe);
9351127
9361128 /* We can only do regular read/write on fifos */
937
- filp->f_mode &= (FMODE_READ | FMODE_WRITE);
1129
+ stream_open(inode, filp);
9381130
939
- switch (filp->f_mode) {
1131
+ switch (filp->f_mode & (FMODE_READ | FMODE_WRITE)) {
9401132 case FMODE_READ:
9411133 /*
9421134 * O_RDONLY
....@@ -958,7 +1150,7 @@
9581150 }
9591151 }
9601152 break;
961
-
1153
+
9621154 case FMODE_WRITE:
9631155 /*
9641156 * O_WRONLY
....@@ -978,7 +1170,7 @@
9781170 goto err_wr;
9791171 }
9801172 break;
981
-
1173
+
9821174 case FMODE_READ | FMODE_WRITE:
9831175 /*
9841176 * O_RDWR
....@@ -1006,13 +1198,13 @@
10061198
10071199 err_rd:
10081200 if (!--pipe->readers)
1009
- wake_up_interruptible(&pipe->wait);
1201
+ wake_up_interruptible(&pipe->wr_wait);
10101202 ret = -ERESTARTSYS;
10111203 goto err;
10121204
10131205 err_wr:
10141206 if (!--pipe->writers)
1015
- wake_up_interruptible(&pipe->wait);
1207
+ wake_up_interruptible_all(&pipe->rd_wait);
10161208 ret = -ERESTARTSYS;
10171209 goto err;
10181210
....@@ -1032,6 +1224,7 @@
10321224 .unlocked_ioctl = pipe_ioctl,
10331225 .release = pipe_release,
10341226 .fasync = pipe_fasync,
1227
+ .splice_write = iter_file_splice_write,
10351228 };
10361229
10371230 /*
....@@ -1051,20 +1244,91 @@
10511244 }
10521245
10531246 /*
1247
+ * Resize the pipe ring to a number of slots.
1248
+ *
1249
+ * Note the pipe can be reduced in capacity, but only if the current
1250
+ * occupancy doesn't exceed nr_slots; if it does, EBUSY will be
1251
+ * returned instead.
1252
+ */
1253
+int pipe_resize_ring(struct pipe_inode_info *pipe, unsigned int nr_slots)
1254
+{
1255
+ struct pipe_buffer *bufs;
1256
+ unsigned int head, tail, mask, n;
1257
+
1258
+ bufs = kcalloc(nr_slots, sizeof(*bufs),
1259
+ GFP_KERNEL_ACCOUNT | __GFP_NOWARN);
1260
+ if (unlikely(!bufs))
1261
+ return -ENOMEM;
1262
+
1263
+ spin_lock_irq(&pipe->rd_wait.lock);
1264
+ mask = pipe->ring_size - 1;
1265
+ head = pipe->head;
1266
+ tail = pipe->tail;
1267
+
1268
+ n = pipe_occupancy(head, tail);
1269
+ if (nr_slots < n) {
1270
+ spin_unlock_irq(&pipe->rd_wait.lock);
1271
+ kfree(bufs);
1272
+ return -EBUSY;
1273
+ }
1274
+
1275
+ /*
1276
+ * The pipe array wraps around, so just start the new one at zero
1277
+ * and adjust the indices.
1278
+ */
1279
+ if (n > 0) {
1280
+ unsigned int h = head & mask;
1281
+ unsigned int t = tail & mask;
1282
+ if (h > t) {
1283
+ memcpy(bufs, pipe->bufs + t,
1284
+ n * sizeof(struct pipe_buffer));
1285
+ } else {
1286
+ unsigned int tsize = pipe->ring_size - t;
1287
+ if (h > 0)
1288
+ memcpy(bufs + tsize, pipe->bufs,
1289
+ h * sizeof(struct pipe_buffer));
1290
+ memcpy(bufs, pipe->bufs + t,
1291
+ tsize * sizeof(struct pipe_buffer));
1292
+ }
1293
+ }
1294
+
1295
+ head = n;
1296
+ tail = 0;
1297
+
1298
+ kfree(pipe->bufs);
1299
+ pipe->bufs = bufs;
1300
+ pipe->ring_size = nr_slots;
1301
+ if (pipe->max_usage > nr_slots)
1302
+ pipe->max_usage = nr_slots;
1303
+ pipe->tail = tail;
1304
+ pipe->head = head;
1305
+
1306
+ spin_unlock_irq(&pipe->rd_wait.lock);
1307
+
1308
+ /* This might have made more room for writers */
1309
+ wake_up_interruptible(&pipe->wr_wait);
1310
+ return 0;
1311
+}
1312
+
1313
+/*
10541314 * Allocate a new array of pipe buffers and copy the info over. Returns the
10551315 * pipe size if successful, or return -ERROR on error.
10561316 */
10571317 static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
10581318 {
1059
- struct pipe_buffer *bufs;
1060
- unsigned int size, nr_pages;
10611319 unsigned long user_bufs;
1320
+ unsigned int nr_slots, size;
10621321 long ret = 0;
10631322
1064
- size = round_pipe_size(arg);
1065
- nr_pages = size >> PAGE_SHIFT;
1323
+#ifdef CONFIG_WATCH_QUEUE
1324
+ if (pipe->watch_queue)
1325
+ return -EBUSY;
1326
+#endif
10661327
1067
- if (!nr_pages)
1328
+ size = round_pipe_size(arg);
1329
+ nr_slots = size >> PAGE_SHIFT;
1330
+
1331
+ if (!nr_slots)
10681332 return -EINVAL;
10691333
10701334 /*
....@@ -1074,67 +1338,30 @@
10741338 * Decreasing the pipe capacity is always permitted, even
10751339 * if the user is currently over a limit.
10761340 */
1077
- if (nr_pages > pipe->buffers &&
1341
+ if (nr_slots > pipe->max_usage &&
10781342 size > pipe_max_size && !capable(CAP_SYS_RESOURCE))
10791343 return -EPERM;
10801344
1081
- user_bufs = account_pipe_buffers(pipe->user, pipe->buffers, nr_pages);
1345
+ user_bufs = account_pipe_buffers(pipe->user, pipe->nr_accounted, nr_slots);
10821346
1083
- if (nr_pages > pipe->buffers &&
1347
+ if (nr_slots > pipe->max_usage &&
10841348 (too_many_pipe_buffers_hard(user_bufs) ||
10851349 too_many_pipe_buffers_soft(user_bufs)) &&
1086
- is_unprivileged_user()) {
1350
+ pipe_is_unprivileged_user()) {
10871351 ret = -EPERM;
10881352 goto out_revert_acct;
10891353 }
10901354
1091
- /*
1092
- * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't
1093
- * expect a lot of shrink+grow operations, just free and allocate
1094
- * again like we would do for growing. If the pipe currently
1095
- * contains more buffers than arg, then return busy.
1096
- */
1097
- if (nr_pages < pipe->nrbufs) {
1098
- ret = -EBUSY;
1355
+ ret = pipe_resize_ring(pipe, nr_slots);
1356
+ if (ret < 0)
10991357 goto out_revert_acct;
1100
- }
11011358
1102
- bufs = kcalloc(nr_pages, sizeof(*bufs),
1103
- GFP_KERNEL_ACCOUNT | __GFP_NOWARN);
1104
- if (unlikely(!bufs)) {
1105
- ret = -ENOMEM;
1106
- goto out_revert_acct;
1107
- }
1108
-
1109
- /*
1110
- * The pipe array wraps around, so just start the new one at zero
1111
- * and adjust the indexes.
1112
- */
1113
- if (pipe->nrbufs) {
1114
- unsigned int tail;
1115
- unsigned int head;
1116
-
1117
- tail = pipe->curbuf + pipe->nrbufs;
1118
- if (tail < pipe->buffers)
1119
- tail = 0;
1120
- else
1121
- tail &= (pipe->buffers - 1);
1122
-
1123
- head = pipe->nrbufs - tail;
1124
- if (head)
1125
- memcpy(bufs, pipe->bufs + pipe->curbuf, head * sizeof(struct pipe_buffer));
1126
- if (tail)
1127
- memcpy(bufs + head, pipe->bufs, tail * sizeof(struct pipe_buffer));
1128
- }
1129
-
1130
- pipe->curbuf = 0;
1131
- kfree(pipe->bufs);
1132
- pipe->bufs = bufs;
1133
- pipe->buffers = nr_pages;
1134
- return nr_pages * PAGE_SIZE;
1359
+ pipe->max_usage = nr_slots;
1360
+ pipe->nr_accounted = nr_slots;
1361
+ return pipe->max_usage * PAGE_SIZE;
11351362
11361363 out_revert_acct:
1137
- (void) account_pipe_buffers(pipe->user, nr_pages, pipe->buffers);
1364
+ (void) account_pipe_buffers(pipe->user, nr_slots, pipe->nr_accounted);
11381365 return ret;
11391366 }
11401367
....@@ -1143,9 +1370,17 @@
11431370 * location, so checking ->i_pipe is not enough to verify that this is a
11441371 * pipe.
11451372 */
1146
-struct pipe_inode_info *get_pipe_info(struct file *file)
1373
+struct pipe_inode_info *get_pipe_info(struct file *file, bool for_splice)
11471374 {
1148
- return file->f_op == &pipefifo_fops ? file->private_data : NULL;
1375
+ struct pipe_inode_info *pipe = file->private_data;
1376
+
1377
+ if (file->f_op != &pipefifo_fops || !pipe)
1378
+ return NULL;
1379
+#ifdef CONFIG_WATCH_QUEUE
1380
+ if (for_splice && pipe->watch_queue)
1381
+ return NULL;
1382
+#endif
1383
+ return pipe;
11491384 }
11501385
11511386 long pipe_fcntl(struct file *file, unsigned int cmd, unsigned long arg)
....@@ -1153,7 +1388,7 @@
11531388 struct pipe_inode_info *pipe;
11541389 long ret;
11551390
1156
- pipe = get_pipe_info(file);
1391
+ pipe = get_pipe_info(file, false);
11571392 if (!pipe)
11581393 return -EBADF;
11591394
....@@ -1164,7 +1399,7 @@
11641399 ret = pipe_set_size(pipe, arg);
11651400 break;
11661401 case F_GETPIPE_SZ:
1167
- ret = pipe->buffers * PAGE_SIZE;
1402
+ ret = pipe->max_usage * PAGE_SIZE;
11681403 break;
11691404 default:
11701405 ret = -EINVAL;
....@@ -1186,16 +1421,20 @@
11861421 * any operations on the root directory. However, we need a non-trivial
11871422 * d_name - pipe: will go nicely and kill the special-casing in procfs.
11881423 */
1189
-static struct dentry *pipefs_mount(struct file_system_type *fs_type,
1190
- int flags, const char *dev_name, void *data)
1424
+
1425
+static int pipefs_init_fs_context(struct fs_context *fc)
11911426 {
1192
- return mount_pseudo(fs_type, "pipe:", &pipefs_ops,
1193
- &pipefs_dentry_operations, PIPEFS_MAGIC);
1427
+ struct pseudo_fs_context *ctx = init_pseudo(fc, PIPEFS_MAGIC);
1428
+ if (!ctx)
1429
+ return -ENOMEM;
1430
+ ctx->ops = &pipefs_ops;
1431
+ ctx->dops = &pipefs_dentry_operations;
1432
+ return 0;
11941433 }
11951434
11961435 static struct file_system_type pipe_fs_type = {
11971436 .name = "pipefs",
1198
- .mount = pipefs_mount,
1437
+ .init_fs_context = pipefs_init_fs_context,
11991438 .kill_sb = kill_anon_super,
12001439 };
12011440