/*
|
* Copyright (C) 2018 The Android Open Source Project
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
|
#include <algorithm>
|
#include <utility>
|
|
#include "src/trace_processor/proto_trace_parser.h"
|
#include "src/trace_processor/trace_sorter.h"
|
|
namespace perfetto {
|
namespace trace_processor {
|
|
TraceSorter::TraceSorter(TraceProcessorContext* context, int64_t window_size_ns)
|
: context_(context), window_size_ns_(window_size_ns) {
|
const char* env = getenv("TRACE_PROCESSOR_SORT_ONLY");
|
bypass_next_stage_for_testing_ = env && !strcmp(env, "1");
|
if (bypass_next_stage_for_testing_)
|
PERFETTO_ELOG("TEST MODE: bypassing protobuf parsing stage");
|
}
|
|
void TraceSorter::Queue::Sort() {
|
PERFETTO_DCHECK(needs_sorting());
|
PERFETTO_DCHECK(sort_start_idx_ < events_.size());
|
|
// If sort_min_ts_ has been set, it will no long be max_int, and so will be
|
// smaller than max_ts_.
|
PERFETTO_DCHECK(sort_min_ts_ < max_ts_);
|
|
// We know that all events between [0, sort_start_idx_] are sorted. Witin
|
// this range, perform a bound search and find the iterator for the min
|
// timestamp that broke the monotonicity. Re-sort from there to the end.
|
auto sort_end = events_.begin() + static_cast<ssize_t>(sort_start_idx_);
|
PERFETTO_DCHECK(std::is_sorted(events_.begin(), sort_end));
|
auto sort_begin = std::lower_bound(events_.begin(), sort_end, sort_min_ts_,
|
&TimestampedTracePiece::Compare);
|
std::sort(sort_begin, events_.end());
|
sort_start_idx_ = 0;
|
sort_min_ts_ = 0;
|
|
// At this point |events_| must be fully sorted.
|
PERFETTO_DCHECK(std::is_sorted(events_.begin(), events_.end()));
|
}
|
|
// Removes all the events in |queues_| that are earlier than the given window
|
// size and moves them to the next parser stages, respecting global timestamp
|
// order. This function is a "extract min from N sorted queues", with some
|
// little cleverness: we know that events tend to be bursty, so events are
|
// not going to be randomly distributed on the N |queues_|.
|
// Upon each iteration this function finds the first two queues (if any) that
|
// have the oldest events, and extracts events from the 1st until hitting the
|
// min_ts of the 2nd. Imagine the queues are as follows:
|
//
|
// q0 {min_ts: 10 max_ts: 30}
|
// q1 {min_ts:5 max_ts: 35}
|
// q2 {min_ts: 12 max_ts: 40}
|
//
|
// We know that we can extract all events from q1 until we hit ts=10 without
|
// looking at any other queue. After hitting ts=10, we need to re-look to all of
|
// them to figure out the next min-event.
|
// There are more suitable data structures to do this (e.g. keeping a min-heap
|
// to avoid re-scanning all the queues all the times) but doesn't seem worth it.
|
// With Android traces (that have 8 CPUs) this function accounts for ~1-3% cpu
|
// time in a profiler.
|
void TraceSorter::SortAndExtractEventsBeyondWindow(int64_t window_size_ns) {
|
DCHECK_ftrace_batch_cpu(kNoBatch);
|
constexpr int64_t kTsMax = std::numeric_limits<int64_t>::max();
|
const bool was_empty = global_min_ts_ == kTsMax && global_max_ts_ == 0;
|
int64_t extract_end_ts = global_max_ts_ - window_size_ns;
|
auto* next_stage = context_->parser.get();
|
size_t iterations = 0;
|
for (;; iterations++) {
|
size_t min_queue_idx = 0; // The index of the queue with the min(ts).
|
|
// The top-2 min(ts) among all queues.
|
// queues_[min_queue_idx].events.timestamp == min_queue_ts[0].
|
int64_t min_queue_ts[2]{kTsMax, kTsMax};
|
|
// This loop identifies the queue which starts with the earliest event and
|
// also remembers the earliest event of the 2nd queue (in min_queue_ts[1]).
|
bool has_queues_with_expired_events = false;
|
for (size_t i = 0; i < queues_.size(); i++) {
|
auto& queue = queues_[i];
|
if (queue.events_.empty())
|
continue;
|
PERFETTO_DCHECK(queue.min_ts_ >= global_min_ts_);
|
PERFETTO_DCHECK(queue.max_ts_ <= global_max_ts_);
|
if (queue.min_ts_ < min_queue_ts[0]) {
|
min_queue_ts[1] = min_queue_ts[0];
|
min_queue_ts[0] = queue.min_ts_;
|
min_queue_idx = i;
|
has_queues_with_expired_events = true;
|
} else if (queue.min_ts_ < min_queue_ts[1]) {
|
min_queue_ts[1] = queue.min_ts_;
|
}
|
}
|
if (!has_queues_with_expired_events) {
|
// All the queues have events that start after the window (i.e. they are
|
// too recent and not eligible to be extracted given the current window).
|
break;
|
}
|
|
Queue& queue = queues_[min_queue_idx];
|
auto& events = queue.events_;
|
if (queue.needs_sorting())
|
queue.Sort();
|
PERFETTO_DCHECK(queue.min_ts_ == events.front().timestamp);
|
PERFETTO_DCHECK(queue.min_ts_ == global_min_ts_);
|
|
// Now that we identified the min-queue, extract all events from it until
|
// we hit either: (1) the min-ts of the 2nd queue or (2) the window limit,
|
// whichever comes first.
|
int64_t extract_until_ts = std::min(extract_end_ts, min_queue_ts[1]);
|
size_t num_extracted = 0;
|
for (auto& event : events) {
|
int64_t timestamp = event.timestamp;
|
if (timestamp > extract_until_ts)
|
break;
|
|
++num_extracted;
|
if (bypass_next_stage_for_testing_)
|
continue;
|
|
if (min_queue_idx == 0) {
|
// queues_[0] is for non-ftrace packets.
|
next_stage->ParseTracePacket(timestamp, std::move(event));
|
} else {
|
// Ftrace queues start at offset 1. So queues_[1] = cpu[0] and so on.
|
uint32_t cpu = static_cast<uint32_t>(min_queue_idx - 1);
|
next_stage->ParseFtracePacket(cpu, timestamp, std::move(event));
|
}
|
} // for (event: events)
|
|
if (!num_extracted) {
|
// No events can be extracted from any of the queues. This means that
|
// either we hit the window or all queues are empty.
|
break;
|
}
|
|
// Now remove the entries from the event buffer and update the queue-local
|
// and global time bounds.
|
events.erase_front(num_extracted);
|
|
// Update the global_{min,max}_ts to reflect the bounds after extraction.
|
if (events.empty()) {
|
queue.min_ts_ = kTsMax;
|
queue.max_ts_ = 0;
|
global_min_ts_ = min_queue_ts[1];
|
|
// If we extraced the max entry from a queue (i.e. we emptied the queue)
|
// we need to recompute the global max, because it might have been the one
|
// just extracted.
|
global_max_ts_ = 0;
|
for (auto& q : queues_)
|
global_max_ts_ = std::max(global_max_ts_, q.max_ts_);
|
} else {
|
queue.min_ts_ = queue.events_.front().timestamp;
|
global_min_ts_ = std::min(queue.min_ts_, min_queue_ts[1]);
|
}
|
} // for(;;)
|
|
// We decide to extract events only when we know (using the global_{min,max}
|
// bounds) that there are eligible events. We should never end up in a
|
// situation where we call this function but then realize that there was
|
// nothing to extract.
|
PERFETTO_DCHECK(iterations > 0 || was_empty);
|
|
#if PERFETTO_DCHECK_IS_ON()
|
// Check that the global min/max are consistent.
|
int64_t dbg_min_ts = kTsMax;
|
int64_t dbg_max_ts = 0;
|
for (auto& q : queues_) {
|
dbg_min_ts = std::min(dbg_min_ts, q.min_ts_);
|
dbg_max_ts = std::max(dbg_max_ts, q.max_ts_);
|
}
|
PERFETTO_DCHECK(global_min_ts_ == dbg_min_ts);
|
PERFETTO_DCHECK(global_max_ts_ == dbg_max_ts);
|
#endif
|
}
|
|
} // namespace trace_processor
|
} // namespace perfetto
|