liyujie
2025-08-28 786ff4f4ca2374bdd9177f2e24b503d43e7a3b93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
#include <algorithm>
#include <utility>
 
#include "src/trace_processor/proto_trace_parser.h"
#include "src/trace_processor/trace_sorter.h"
 
namespace perfetto {
namespace trace_processor {
 
TraceSorter::TraceSorter(TraceProcessorContext* context, int64_t window_size_ns)
    : context_(context), window_size_ns_(window_size_ns) {
  const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
  bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
  if (bypass_next_stage_for_testing_)
    PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
}
 
void TraceSorter::Queue::Sort() {
  PERFETTO_DCHECK(needs_sorting());
  PERFETTO_DCHECK(sort_start_idx_ < events_.size());
 
  // If sort_min_ts_ has been set, it will no long be max_int, and so will be
  // smaller than max_ts_.
  PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
 
  // We know that all events between [0, sort_start_idx_] are sorted. Witin
  // this range, perform a bound search and find the iterator for the min
  // timestamp that broke the monotonicity. Re-sort from there to the end.
  auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
  PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
  auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
                                     &TimestampedTracePiece::Compare);
  std::sort(sort_begin, events_.end());
  sort_start_idx_ = 0;
  sort_min_ts_ = 0;
 
  // At this point |events_| must be fully sorted.
  PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
}
 
// Removes all the events in |queues_| that are earlier than the given window
// size and moves them to the next parser stages, respecting global timestamp
// order. This function is a "extract min from N sorted queues", with some
// little cleverness: we know that events tend to be bursty, so events are
// not going to be randomly distributed on the N |queues_|.
// Upon each iteration this function finds the first two queues (if any) that
// have the oldest events, and extracts events from the 1st until hitting the
// min_ts of the 2nd. Imagine the queues are as follows:
//
//  q0           {min_ts: 10  max_ts: 30}
//  q1    {min_ts:5              max_ts: 35}
//  q2              {min_ts: 12    max_ts: 40}
//
// We know that we can extract all events from q1 until we hit ts=10 without
// looking at any other queue. After hitting ts=10, we need to re-look to all of
// them to figure out the next min-event.
// There are more suitable data structures to do this (e.g. keeping a min-heap
// to avoid re-scanning all the queues all the times) but doesn't seem worth it.
// With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
// time in a profiler.
void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) {
  DCHECK_ftrace_batch_cpu(kNoBatch);
  constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
  const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0;
  int64_t extract_end_ts = global_max_ts_ - window_size_ns;
  auto* next_stage = context_->parser.get();
  size_t iterations = 0;
  for (;; iterations++) {
    size_t min_queue_idx = 0;  // The index of the queue with the min(ts).
 
    // The top-2 min(ts) among all queues.
    // queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
    int64_t min_queue_ts[2]{kTsMax, kTsMax};
 
    // This loop identifies the queue which starts with the earliest event and
    // also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
    bool has_queues_with_expired_events = false;
    for (size_t i = 0; i < queues_.size(); i++) {
      auto& queue = queues_[i];
      if (queue.events_.empty())
        continue;
      PERFETTO_DCHECK(queue.min_ts_ >= global_min_ts_);
      PERFETTO_DCHECK(queue.max_ts_ <= global_max_ts_);
      if (queue.min_ts_ < min_queue_ts[0]) {
        min_queue_ts[1] = min_queue_ts[0];
        min_queue_ts[0] = queue.min_ts_;
        min_queue_idx = i;
        has_queues_with_expired_events = true;
      } else if (queue.min_ts_ < min_queue_ts[1]) {
        min_queue_ts[1] = queue.min_ts_;
      }
    }
    if (!has_queues_with_expired_events) {
      // All the queues have events that start after the window (i.e. they are
      // too recent and not eligible to be extracted given the current window).
      break;
    }
 
    Queue& queue = queues_[min_queue_idx];
    auto& events = queue.events_;
    if (queue.needs_sorting())
      queue.Sort();
    PERFETTO_DCHECK(queue.min_ts_ == events.front().timestamp);
    PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
 
    // Now that we identified the min-queue, extract all events from it until
    // we hit either: (1) the min-ts of the 2nd queue or (2) the window limit,
    // whichever comes first.
    int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]);
    size_t num_extracted = 0;
    for (auto& event : events) {
      int64_t timestamp = event.timestamp;
      if (timestamp > extract_until_ts)
        break;
 
      ++num_extracted;
      if (bypass_next_stage_for_testing_)
        continue;
 
      if (min_queue_idx == 0) {
        // queues_[0] is for non-ftrace packets.
        next_stage->ParseTracePacket(timestamp, std::move(event));
      } else {
        // Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
        uint32_t cpu = static_cast<uint32_t>(min_queue_idx - 1);
        next_stage->ParseFtracePacket(cpu, timestamp, std::move(event));
      }
    }  // for (event: events)
 
    if (!num_extracted) {
      // No events can be extracted from any of the queues. This means that
      // either we hit the window or all queues are empty.
      break;
    }
 
    // Now remove the entries from the event buffer and update the queue-local
    // and global time bounds.
    events.erase_front(num_extracted);
 
    // Update the global_{min,max}_ts to reflect the bounds after extraction.
    if (events.empty()) {
      queue.min_ts_ = kTsMax;
      queue.max_ts_ = 0;
      global_min_ts_ = min_queue_ts[1];
 
      // If we extraced the max entry from a queue (i.e. we emptied the queue)
      // we need to recompute the global max, because it might have been the one
      // just extracted.
      global_max_ts_ = 0;
      for (auto& q : queues_)
        global_max_ts_ = std::max(global_max_ts_, q.max_ts_);
    } else {
      queue.min_ts_ = queue.events_.front().timestamp;
      global_min_ts_ = std::min(queue.min_ts_, min_queue_ts[1]);
    }
  }  // for(;;)
 
  // We decide to extract events only when we know (using the global_{min,max}
  // bounds) that there are eligible events. We should never end up in a
  // situation where we call this function but then realize that there was
  // nothing to extract.
  PERFETTO_DCHECK(iterations > 0 || was_empty);
 
#if PERFETTO_DCHECK_IS_ON()
  // Check that the global min/max are consistent.
  int64_t dbg_min_ts = kTsMax;
  int64_t dbg_max_ts = 0;
  for (auto& q : queues_) {
    dbg_min_ts = std::min(dbg_min_ts, q.min_ts_);
    dbg_max_ts = std::max(dbg_max_ts, q.max_ts_);
  }
  PERFETTO_DCHECK(global_min_ts_ == dbg_min_ts);
  PERFETTO_DCHECK(global_max_ts_ == dbg_max_ts);
#endif
}
 
}  // namespace trace_processor
}  // namespace perfetto