liyujie
2025-08-28 786ff4f4ca2374bdd9177f2e24b503d43e7a3b93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
 
#include "base/task_scheduler/scheduler_worker_pool.h"
 
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/task_scheduler/delayed_task_manager.h"
#include "base/task_scheduler/task_tracker.h"
#include "base/threading/thread_local.h"
 
namespace base {
namespace internal {
 
namespace {
 
// The number of SchedulerWorkerPool that are alive in this process. This
// variable should only be incremented when the SchedulerWorkerPool instances
// are brought up (on the main thread; before any tasks are posted) and
// decremented when the same instances are brought down (i.e., only when unit
// tests tear down the task environment and never in production). This makes the
// variable const while worker threads are up and as such it doesn't need to be
// atomic. It is used to tell when a task is posted from the main thread after
// the task environment was brought down in unit tests so that
// SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting
// such callers know they should complete necessary work synchronously. Note:
// |!g_active_pools_count| is generally equivalent to
// |!TaskScheduler::GetInstance()| but has the advantage of being valid in
// task_scheduler unit tests that don't instantiate a full TaskScheduler.
int g_active_pools_count = 0;
 
// SchedulerWorkerPool that owns the current thread, if any.
LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
    tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
 
const SchedulerWorkerPool* GetCurrentWorkerPool() {
  return tls_current_worker_pool.Get().Get();
}
 
}  // namespace
 
// A task runner that runs tasks in parallel.
class SchedulerParallelTaskRunner : public TaskRunner {
 public:
  // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
  // long as |worker_pool| is alive.
  // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
  SchedulerParallelTaskRunner(const TaskTraits& traits,
                              SchedulerWorkerPool* worker_pool)
      : traits_(traits), worker_pool_(worker_pool) {
    DCHECK(worker_pool_);
  }
 
  // TaskRunner:
  bool PostDelayedTask(const Location& from_here,
                       OnceClosure closure,
                       TimeDelta delay) override {
    if (!g_active_pools_count)
      return false;
 
    // Post the task as part of a one-off single-task Sequence.
    return worker_pool_->PostTaskWithSequence(
        Task(from_here, std::move(closure), traits_, delay),
        MakeRefCounted<Sequence>());
  }
 
  bool RunsTasksInCurrentSequence() const override {
    return GetCurrentWorkerPool() == worker_pool_;
  }
 
 private:
  ~SchedulerParallelTaskRunner() override = default;
 
  const TaskTraits traits_;
  SchedulerWorkerPool* const worker_pool_;
 
  DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
};
 
// A task runner that runs tasks in sequence.
class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
 public:
  // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
  // so long as |worker_pool| is alive.
  // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
  SchedulerSequencedTaskRunner(const TaskTraits& traits,
                               SchedulerWorkerPool* worker_pool)
      : traits_(traits), worker_pool_(worker_pool) {
    DCHECK(worker_pool_);
  }
 
  // SequencedTaskRunner:
  bool PostDelayedTask(const Location& from_here,
                       OnceClosure closure,
                       TimeDelta delay) override {
    if (!g_active_pools_count)
      return false;
 
    Task task(from_here, std::move(closure), traits_, delay);
    task.sequenced_task_runner_ref = this;
 
    // Post the task as part of |sequence_|.
    return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
  }
 
  bool PostNonNestableDelayedTask(const Location& from_here,
                                  OnceClosure closure,
                                  base::TimeDelta delay) override {
    // Tasks are never nested within the task scheduler.
    return PostDelayedTask(from_here, std::move(closure), delay);
  }
 
  bool RunsTasksInCurrentSequence() const override {
    return sequence_->token() == SequenceToken::GetForCurrentThread();
  }
 
 private:
  ~SchedulerSequencedTaskRunner() override = default;
 
  // Sequence for all Tasks posted through this TaskRunner.
  const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>();
 
  const TaskTraits traits_;
  SchedulerWorkerPool* const worker_pool_;
 
  DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
 
scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits(
    const TaskTraits& traits) {
  return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this);
}
 
scoped_refptr<SequencedTaskRunner>
SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits(
    const TaskTraits& traits) {
  return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this);
}
 
bool SchedulerWorkerPool::PostTaskWithSequence(
    Task task,
    scoped_refptr<Sequence> sequence) {
  DCHECK(task.task);
  DCHECK(sequence);
 
  if (!task_tracker_->WillPostTask(&task))
    return false;
 
  if (task.delayed_run_time.is_null()) {
    PostTaskWithSequenceNow(std::move(task), std::move(sequence));
  } else {
    // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
    // for details.
    CHECK(task.task);
    delayed_task_manager_->AddDelayedTask(
        std::move(task), BindOnce(
                             [](scoped_refptr<Sequence> sequence,
                                SchedulerWorkerPool* worker_pool, Task task) {
                               worker_pool->PostTaskWithSequenceNow(
                                   std::move(task), std::move(sequence));
                             },
                             std::move(sequence), Unretained(this)));
  }
 
  return true;
}
 
SchedulerWorkerPool::SchedulerWorkerPool(
    TrackedRef<TaskTracker> task_tracker,
    DelayedTaskManager* delayed_task_manager)
    : task_tracker_(std::move(task_tracker)),
      delayed_task_manager_(delayed_task_manager) {
  DCHECK(task_tracker_);
  DCHECK(delayed_task_manager_);
  ++g_active_pools_count;
}
 
SchedulerWorkerPool::~SchedulerWorkerPool() {
  --g_active_pools_count;
  DCHECK_GE(g_active_pools_count, 0);
}
 
void SchedulerWorkerPool::BindToCurrentThread() {
  DCHECK(!GetCurrentWorkerPool());
  tls_current_worker_pool.Get().Set(this);
}
 
void SchedulerWorkerPool::UnbindFromCurrentThread() {
  DCHECK(GetCurrentWorkerPool());
  tls_current_worker_pool.Get().Set(nullptr);
}
 
void SchedulerWorkerPool::PostTaskWithSequenceNow(
    Task task,
    scoped_refptr<Sequence> sequence) {
  DCHECK(task.task);
  DCHECK(sequence);
 
  // Confirm that |task| is ready to run (its delayed run time is either null or
  // in the past).
  DCHECK_LE(task.delayed_run_time, TimeTicks::Now());
 
  const bool sequence_was_empty = sequence->PushTask(std::move(task));
  if (sequence_was_empty) {
    // Try to schedule |sequence| if it was empty before |task| was inserted
    // into it. Otherwise, one of these must be true:
    // - |sequence| is already scheduled, or,
    // - The pool is running a Task from |sequence|. The pool is expected to
    //   reschedule |sequence| once it's done running the Task.
    sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this);
    if (sequence)
      OnCanScheduleSequence(std::move(sequence));
  }
}
 
}  // namespace internal
}  // namespace base