ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import logging, os, re
 
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
 
class KernelTrace(object):
    """Allows access and control to Kernel tracing facilities.
 
    Example code snippet:
        trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock'])
        results = trace.read(regexp=r'frequency=(\d+)')
 
    Public methods:
        on          : Enables tracing
        off         : Disables tracing
        is_tracing  : Returns Boolean of tracing status.
        event_on    : Turns event on.  Returns boolean of success
        event_off   : Turns event off.  Returns boolean of success
        flush       : Flushes trace buffer
        read        : Reads trace buffer returns list of
                      - tuples if regexp provided
                      - else matching string
        uptime_secs : Returns float of current uptime.
 
    Private functions:
        _onoff       : Disable/enable tracing
        _onoff_event : Disable/enable events
 
    Private attributes:
        _buffer      : list to hold parsed results from trace buffer
        _buffer_ptr  : integer pointing to last byte read
 
    TODO(tbroch):  List of potential enhancements
       - currently only supports trace events.  Add other tracers.
    """
    _TRACE_ROOT = '/sys/kernel/debug/tracing'
    _TRACE_EN_PATH = os.path.join(_TRACE_ROOT, 'tracing_enabled')
 
    def __init__(self, flush=True, events=None, on=True):
        """Constructor for KernelTrace class"""
        self._buffer = []
        self._buffer_ptr = 0
        self._events = []
        self._on = on
 
        if flush:
            self.flush()
        for event in events:
            if self.event_on(event):
                self._events.append(event)
        if on:
            self.on()
 
 
    def __del__(self, flush=True, events=None, on=True):
        """Deconstructor for KernelTrace class"""
        for event in self._events:
            self.event_off(event)
        if self._on:
            self.off()
 
 
    def _onoff(self, val):
        """Enable/Disable tracing.
 
        Arguments:
            val: integer, 1 for on, 0 for off
 
        Raises:
            error.TestFail: If unable to enable/disable tracing
              boolean of tracing on/off status
        """
        utils.write_one_line(self._TRACE_EN_PATH, val)
        fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
        result = int(utils.read_one_line(fname).strip())
        if not result == val:
            raise error.TestFail("Unable to %sable tracing" %
                                 'en' if val == 1 else 'dis')
 
 
    def on(self):
        """Enable tracing."""
        return self._onoff(1)
 
 
    def off(self):
        """Disable tracing."""
        self._onoff(0)
 
 
    def is_tracing(self):
        """Is tracing on?
 
        Returns:
            True if tracing enabled and at least one event is enabled.
        """
        fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
        result = int(utils.read_one_line(fname).strip())
        if result == 1 and len(self._events) > 0:
            return True
        return False
 
 
    def _event_onoff(self, event, val):
        """Enable/Disable tracing event.
 
        TODO(tbroch) Consider allowing wild card enabling of trace events via
            /sys/kernel/debug/tracing/set_event although it makes filling buffer
            really easy
 
        Arguments:
            event: list of events.
                   See kernel(Documentation/trace/events.txt) for formatting.
            val: integer, 1 for on, 0 for off
 
         Returns:
            True if success, false otherwise
        """
        logging.debug("event_onoff: event:%s val:%d", event, val)
        event_path = event.replace(':', '/')
        fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable')
 
        if not os.path.exists(fname):
            logging.warning("Unable to locate tracing event %s", fname)
            return False
        utils.write_one_line(fname, val)
 
        fname = os.path.join(self._TRACE_ROOT, "set_event")
        found = False
        with open(fname) as fd:
            for ln in fd.readlines():
                logging.debug("set_event ln:%s", ln)
                if re.findall(event, ln):
                    found = True
                    break
 
        if val == 1 and not found:
            logging.warning("Event %s not enabled", event)
            return False
 
        if val == 0 and found:
            logging.warning("Event %s not disabled", event)
            return False
 
        return True
 
 
    def event_on(self, event):
        return self._event_onoff(event, 1)
 
 
    def event_off(self, event):
        return self._event_onoff(event, 0)
 
 
    def flush(self):
        """Flush trace buffer.
 
        Raises:
            error.TestFail: If unable to flush
        """
        self.off()
        fname = os.path.join(self._TRACE_ROOT, 'free_buffer')
        utils.write_one_line(fname, 1)
        self._buffer_ptr = 0
 
        fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb')
        result = utils.read_one_line(fname).strip()
        if result is '0':
            return True
        return False
 
 
    def read(self, regexp=None):
        fname = os.path.join(self._TRACE_ROOT, 'trace')
        fd = open(fname)
        fd.seek(self._buffer_ptr)
        for ln in fd.readlines():
            if regexp is None:
                self._buffer.append(ln)
                continue
            results = re.findall(regexp, ln)
            if results:
                logging.debug(ln)
                self._buffer.append(results[0])
        self._buffer_ptr = fd.tell()
        fd.close()
        return self._buffer
 
 
    @staticmethod
    def uptime_secs():
        results = utils.read_one_line("/proc/uptime")
        return float(results.split()[0])