liyujie
2025-08-28 786ff4f4ca2374bdd9177f2e24b503d43e7a3b93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
 
#ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
#define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
 
#include "base/base_export.h"
#include "base/callback.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/pending_task.h"
#include "base/sequence_checker.h"
#include "base/synchronization/lock.h"
#include "base/time/time.h"
 
namespace base {
 
class BasicPostTaskPerfTest;
 
namespace internal {
 
// Implements a queue of tasks posted to the message loop running on the current
// thread. This class takes care of synchronizing posting tasks from different
// threads and together with MessageLoop ensures clean shutdown.
class BASE_EXPORT IncomingTaskQueue
    : public RefCountedThreadSafe<IncomingTaskQueue> {
 public:
  // TODO(gab): Move this to SequencedTaskSource::Observer in
  // https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
  class Observer {
   public:
    virtual ~Observer() = default;
 
    // Notifies this Observer that it is about to enqueue |task|. The Observer
    // may alter |task| as a result (e.g. add metadata to the PendingTask
    // struct). This may be called while holding a lock and shouldn't perform
    // logic requiring synchronization (override DidQueueTask() for that).
    virtual void WillQueueTask(PendingTask* task) = 0;
 
    // Notifies this Observer that a task was queued in the IncomingTaskQueue it
    // observes. |was_empty| is true if the task source was empty (i.e.
    // |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked
    // from any thread.
    virtual void DidQueueTask(bool was_empty) = 0;
  };
 
  // Provides a read and remove only view into a task queue.
  class ReadAndRemoveOnlyQueue {
   public:
    ReadAndRemoveOnlyQueue() = default;
    virtual ~ReadAndRemoveOnlyQueue() = default;
 
    // Returns the next task. HasTasks() is assumed to be true.
    virtual const PendingTask& Peek() = 0;
 
    // Removes and returns the next task. HasTasks() is assumed to be true.
    virtual PendingTask Pop() = 0;
 
    // Whether this queue has tasks.
    virtual bool HasTasks() = 0;
 
    // Removes all tasks.
    virtual void Clear() = 0;
 
   private:
    DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue);
  };
 
  // Provides a read-write task queue.
  class Queue : public ReadAndRemoveOnlyQueue {
   public:
    Queue() = default;
    ~Queue() override = default;
 
    // Adds the task to the end of the queue.
    virtual void Push(PendingTask pending_task) = 0;
 
   private:
    DISALLOW_COPY_AND_ASSIGN(Queue);
  };
 
  // Constructs an IncomingTaskQueue which will invoke |task_queue_observer|
  // when tasks are queued. |task_queue_observer| will be bound to this
  // IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw
  // pointer since IncomingTaskQueue is ref-counted. For the same reasons,
  // |task_queue_observer| needs to support being invoked racily during
  // shutdown).
  explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer);
 
  // Appends a task to the incoming queue. Posting of all tasks is routed though
  // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
  // task is properly synchronized between different threads.
  //
  // Returns true if the task was successfully added to the queue, otherwise
  // returns false. In all cases, the ownership of |task| is transferred to the
  // called method.
  bool AddToIncomingQueue(const Location& from_here,
                          OnceClosure task,
                          TimeDelta delay,
                          Nestable nestable);
 
  // Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be
  // undone. Note that the registered IncomingTaskQueue::Observer may still
  // racily receive a few DidQueueTask() calls while the Shutdown() signal
  // propagates to other threads and it needs to support that.
  void Shutdown();
 
  ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; }
 
  Queue& delayed_tasks() { return delayed_tasks_; }
 
  Queue& deferred_tasks() { return deferred_tasks_; }
 
  bool HasPendingHighResolutionTasks() const {
    DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
    return delayed_tasks_.HasPendingHighResolutionTasks();
  }
 
  // Reports UMA metrics about its queues before the MessageLoop goes to sleep
  // per being idle.
  void ReportMetricsOnIdle() const;
 
 private:
  friend class base::BasicPostTaskPerfTest;
  friend class RefCountedThreadSafe<IncomingTaskQueue>;
 
  // These queues below support the previous MessageLoop behavior of
  // maintaining three queue queues to process tasks:
  //
  // TriageQueue
  // The first queue to receive all tasks for the processing sequence (when
  // reloading from the thread-safe |incoming_queue_|). Tasks are generally
  // either dispatched immediately or sent to the queues below.
  //
  // DelayedQueue
  // The queue for holding tasks that should be run later and sorted by expected
  // run time.
  //
  // DeferredQueue
  // The queue for holding tasks that couldn't be run while the MessageLoop was
  // nested. These are generally processed during the idle stage.
  //
  // Many of these do not share implementations even though they look like they
  // could because of small quirks (reloading semantics) or differing underlying
  // data strucutre (TaskQueue vs DelayedTaskQueue).
 
  // The starting point for all tasks on the sequence processing the tasks.
  class TriageQueue : public ReadAndRemoveOnlyQueue {
   public:
    TriageQueue(IncomingTaskQueue* outer);
    ~TriageQueue() override;
 
    // ReadAndRemoveOnlyQueue:
    // The methods below will attempt to reload from the incoming queue if the
    // queue itself is empty (Clear() has special logic to reload only once
    // should destructors post more tasks).
    const PendingTask& Peek() override;
    PendingTask Pop() override;
    // Whether this queue has tasks after reloading from the incoming queue.
    bool HasTasks() override;
    void Clear() override;
 
   private:
    void ReloadFromIncomingQueueIfEmpty();
 
    IncomingTaskQueue* const outer_;
    TaskQueue queue_;
 
    DISALLOW_COPY_AND_ASSIGN(TriageQueue);
  };
 
  class DelayedQueue : public Queue {
   public:
    DelayedQueue();
    ~DelayedQueue() override;
 
    // Queue:
    const PendingTask& Peek() override;
    PendingTask Pop() override;
    // Whether this queue has tasks after sweeping the cancelled ones in front.
    bool HasTasks() override;
    void Clear() override;
    void Push(PendingTask pending_task) override;
 
    size_t Size() const;
    bool HasPendingHighResolutionTasks() const {
      return pending_high_res_tasks_ > 0;
    }
 
   private:
    DelayedTaskQueue queue_;
 
    // Number of high resolution tasks in |queue_|.
    int pending_high_res_tasks_ = 0;
 
    SEQUENCE_CHECKER(sequence_checker_);
 
    DISALLOW_COPY_AND_ASSIGN(DelayedQueue);
  };
 
  class DeferredQueue : public Queue {
   public:
    DeferredQueue();
    ~DeferredQueue() override;
 
    // Queue:
    const PendingTask& Peek() override;
    PendingTask Pop() override;
    bool HasTasks() override;
    void Clear() override;
    void Push(PendingTask pending_task) override;
 
   private:
    TaskQueue queue_;
 
    SEQUENCE_CHECKER(sequence_checker_);
 
    DISALLOW_COPY_AND_ASSIGN(DeferredQueue);
  };
 
  virtual ~IncomingTaskQueue();
 
  // Adds a task to |incoming_queue_|. The caller retains ownership of
  // |pending_task|, but this function will reset the value of
  // |pending_task->task|. This is needed to ensure that the posting call stack
  // does not retain |pending_task->task| beyond this function call.
  bool PostPendingTask(PendingTask* pending_task);
 
  // Does the real work of posting a pending task. Returns true if
  // |incoming_queue_| was empty before |pending_task| was posted.
  bool PostPendingTaskLockRequired(PendingTask* pending_task);
 
  // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
  // from the sequence processing the tasks.
  void ReloadWorkQueue(TaskQueue* work_queue);
 
  // Checks calls made only on the MessageLoop thread.
  SEQUENCE_CHECKER(sequence_checker_);
 
  const std::unique_ptr<Observer> task_queue_observer_;
 
  // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
  TriageQueue triage_tasks_;
 
  // Queue for delayed tasks on the |sequence_checker_| sequence.
  DelayedQueue delayed_tasks_;
 
  // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
  DeferredQueue deferred_tasks_;
 
  // Synchronizes access to all members below this line.
  base::Lock incoming_queue_lock_;
 
  // An incoming queue of tasks that are acquired under a mutex for processing
  // on this instance's thread. These tasks have not yet been been pushed to
  // |triage_tasks_|.
  TaskQueue incoming_queue_;
 
  // True if new tasks should be accepted.
  bool accept_new_tasks_ = true;
 
  // The next sequence number to use for delayed tasks.
  int next_sequence_num_ = 0;
 
  // True if the outgoing queue (|triage_tasks_|) is empty. Toggled under
  // |incoming_queue_lock_| in ReloadWorkQueue() so that
  // PostPendingTaskLockRequired() can tell, without accessing the thread unsafe
  // |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a
  // PostTask() (and needs to inform its Observer).
  bool triage_queue_empty_ = true;
 
  DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
};
 
}  // namespace internal
}  // namespace base
 
#endif  // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_