liyujie
2025-08-28 786ff4f4ca2374bdd9177f2e24b503d43e7a3b93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import re
import time
 
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
 
# en-US key matrix (from "kb membrane pin matrix.pdf")
KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
             '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
             '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
             'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
             'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
             '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
             'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
             ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
             'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
             '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
             '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
             '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
             '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
             '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
             '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
             '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
             '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
             '<left>': (7, 12)}
 
 
def has_ectool():
    """Determine if ectool shell command is present.
 
    Returns:
        boolean true if avail, false otherwise.
    """
    cmd = 'which ectool'
    return (utils.system(cmd, ignore_status=True) == 0)
 
 
class ECError(Exception):
    """Base class for a failure when communicating with EC."""
    pass
 
 
class EC_Common(object):
    """Class for EC common.
 
    This incredibly brief base class is intended to encapsulate common elements
    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
    that includes only the use of ectool.
    """
 
    def __init__(self, target='cros_ec'):
        """Constructor.
 
        @param target: target name of ec to communicate with.
        """
        if not has_ectool():
            ec_info = utils.system_output("mosys ec info",
                                          ignore_status=True)
            logging.warning("Ectool absent on this platform ( %s )",
                         ec_info)
            raise error.TestNAError("Platform doesn't support ectool")
        self._target = target
 
    def ec_command(self, cmd, **kwargs):
        """Executes ec command and returns results.
 
        @param cmd: string of command to execute.
        @param kwargs: optional params passed to utils.system_output
 
        @returns: string of results from ec command.
        """
        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
        logging.debug('Command: %s', full_cmd)
        result = utils.system_output(full_cmd, **kwargs)
        logging.debug('Result: %s', result)
        return result
 
 
class EC(EC_Common):
    """Class for CrOS embedded controller (EC)."""
    HELLO_RE = "EC says hello"
    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
    SET_FANSPEED_RE = "Fan target RPM set."
    TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)"
    # <sensor idx>: <sensor type> <sensor name>
    TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)"
    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
    # For battery, check we can see a non-zero capacity value.
    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
 
    def __init__(self):
        """Constructor."""
        super(EC, self).__init__()
        self._temperature_dict = None
 
    def hello(self, **kwargs):
        """Test EC hello command.
 
        @param kwargs: optional params passed to utils.system_output
 
        @returns True if success False otherwise.
        """
        response = self.ec_command('hello', **kwargs)
        return (re.search(self.HELLO_RE, response) is not None)
 
    def auto_fan_ctrl(self):
        """Turns auto fan ctrl on.
 
        @returns True if success False otherwise.
        """
        response = self.ec_command('autofanctrl')
        logging.info('Turned on auto fan control.')
        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
 
    def get_fanspeed(self):
        """Gets fanspeed.
 
        @raises error.TestError if regexp fails to match.
 
        @returns integer of fan speed RPM.
        """
        response = self.ec_command('pwmgetfanrpm')
        match = re.search(self.GET_FANSPEED_RE, response)
        if not match:
            raise error.TestError('Unable to read fan speed')
 
        rpm = int(match.group(1))
        logging.info('Fan speed: %d', rpm)
        return rpm
 
    def set_fanspeed(self, rpm):
        """Sets fan speed.
 
        @param rpm: integer of fan speed RPM to set
 
        @returns True if success False otherwise.
        """
        response = self.ec_command('pwmsetfanrpm %d' % rpm)
        logging.info('Set fan speed: %d', rpm)
        return (re.search(self.SET_FANSPEED_RE, response) is not None)
 
    def _get_temperature_dict(self):
        """Read EC temperature name and idx into a dict.
 
        @returns dict where key=<sensor name>, value =<sensor idx>
        """
        # The sensor (name, idx) mapping does not change.
        if self._temperature_dict:
            return self._temperature_dict
 
        temperature_dict = {}
        response = self.ec_command('tempsinfo all')
        for rline in response.split('\n'):
            match = re.search(self.TEMP_SENSOR_INFO_RE, rline)
            if match:
                temperature_dict[match.group(3)] = int(match.group(1))
 
        self._temperature_dict = temperature_dict
        return temperature_dict
 
    def get_temperature(self, idx=None, name=None):
        """Gets temperature from idx sensor.
 
        Reads temperature either directly if idx is provided or by discovering
        idx using name.
 
        @param idx:  integer of temp sensor to read.  Default=None
        @param name: string of temp sensor to read.  Default=None.
            For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro
 
        @raises ECError if fails to find idx of name.
        @raises error.TestError if fails to read sensor or fails to identify
        sensor to read from idx & name param.
 
        @returns integer of temperature reading in degrees Kelvin.
        """
        if idx is None:
            temperature_dict = self._get_temperature_dict()
            if name in temperature_dict:
                idx = temperature_dict[name]
            else:
                raise ECError('Finding temp idx for name %s' % name)
 
        response = self.ec_command('temps %d' % idx)
        match = re.search(self.TEMP_SENSOR_TEMP_RE, response)
        if not match:
            raise error.TestError('Reading temperature idx %d' % idx)
 
        return int(match.group(1))
 
    def get_battery(self):
        """Get battery presence (design capacity found).
 
        @returns True if success False otherwise.
        """
        try:
            response = self.ec_command('battery')
        except error.CmdError:
            raise ECError('calling EC battery command')
 
        return (re.search(self.BATTERY_RE, response) is not None)
 
    def get_lightbar(self):
        """Test lightbar.
 
        @returns True if success False otherwise.
        """
        self.ec_command('lightbar on')
        self.ec_command('lightbar init')
        self.ec_command('lightbar 4 255 255 255')
        response = self.ec_command('lightbar')
        self.ec_command('lightbar off')
        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
 
    def key_press(self, key):
        """Emit key down and up signal of the keyboard.
 
        @param key: name of a key defined in KEYMATRIX.
        """
        self.key_down(key)
        self.key_up(key)
 
    def _key_action(self, key, action_type):
        if not key in KEYMATRIX:
            raise error.TestError('Unknown key: ' + key)
        row, col = KEYMATRIX[key]
        self.ec_command('kbpress %d %d %d' % (row, col, action_type))
 
    def key_down(self, key):
        """Emit key down signal of the keyboard.
 
        @param key: name of a key defined in KEYMATRIX.
        """
        self._key_action(key, 1)
 
    def key_up(self, key):
        """Emit key up signal of the keyboard.
 
        @param key: name of a key defined in KEYMATRIX.
        """
        self._key_action(key, 0)
 
 
class EC_USBPD_Port(EC_Common):
    """Class for CrOS embedded controller for USB-PD Port.
 
    Public attributes:
        index: integer of USB type-C port index.
 
    Public Methods:
        is_dfp: Determine if data role is Downstream Facing Port (DFP).
        is_amode_supported: Check if alternate mode is supported by port.
        is_amode_entered: Check if alternate mode is entered.
        set_amode: Set an alternate mode.
 
    Private attributes:
        _port: integer of USB type-C port id.
        _port_info: holds usbpd protocol info.
        _amodes: holds alternate mode info.
 
    Private methods:
        _invalidate_port_data: Remove port data to force re-eval.
        _get_port_info: Get USB-PD port info.
        _get_amodes: parse and return port's svid info.
    """
    def __init__(self, index):
        """Constructor.
 
        @param index: integer of USB type-C port index.
        """
        self.index = index
        # TODO(crosbug.com/p/38133) target= only works for samus
        super(EC_USBPD_Port, self).__init__(target='cros_pd')
 
        # Interrogate port at instantiation.  Use invalidate to force re-eval.
        self._port_info = self._get_port_info()
        self._amodes = self._get_amodes()
 
    def _invalidate_port_data(self):
        """Remove port data to force re-eval."""
        self._port_info = None
        self._amodes = None
 
    def _get_port_info(self):
        """Get USB-PD port info.
 
        ectool command usbpd provides the following information about the port:
          - Enabled/Disabled
          - Power & Data Role
          - Polarity
          - Protocol State
 
        At time of authoring it looks like:
          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
 
        @raises error.TestError if ...
          port info not parseable.
 
        @returns dictionary for <port> with keyval pairs:
          enabled: True | False | None
          power_role: sink | source | None
          data_role: UFP | DFP | None
          is_reversed: True | False | None
          state: various strings | None
        """
        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
                       'Polarity:CC(\d+)\s+State:(\w+)'
 
        match = re.search(PORT_INFO_RE,
                          self.ec_command("usbpd %s" % (self.index)))
        if not match or int(match.group(1)) != self.index:
            error.TestError('Unable to determine port %d info' % self.index)
 
        pinfo = dict(enabled=None, power_role=None, data_role=None,
                    is_reversed=None, state=None)
        pinfo['enabled'] = match.group(2) == 'enabled'
        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
        pinfo['data_role'] = match.group(4)
        pinfo['is_reversed'] = True if match.group(5) == '2' else False
        pinfo['state'] = match.group(6)
        logging.debug('port_info = %s', pinfo)
        return pinfo
 
    def _get_amodes(self):
        """Parse alternate modes from pdgetmode.
 
        Looks like ...
          *SVID:0xff01 *0x00000485  0x00000000 ...
          SVID:0x18d1   0x00000001  0x00000000 ...
 
        @returns dictionary of format:
          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
            where:
              <svid>        : USB-IF Standard or vendor id as
                              hex string (i.e. 0xff01)
              <config_list> : list of uint32_t configs
              <opos>        : integer of active object position.
                              Note, this is the config list index + 1
        """
        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
        svids = dict()
        cmd = 'pdgetmode %d' % self.index
        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
            if line.strip() == '':
                continue
            logging.debug('pdgetmode line: %s', line)
            match = re.search(SVID_RE, line)
            if not match:
                logging.warning("Unable to parse SVID line %s", line)
                continue
            active = match.group(1) == '*'
            svid = match.group(2)
            configs_str = match.group(3)
            configs = list()
            opos = None
            for i,config in enumerate(configs_str.split(), 1):
                if config.startswith('*'):
                    opos = i
                    config = config[1:]
                config = int(config, 16)
                # ignore unpopulated configs
                if config == 0:
                    continue
                configs.append(config)
            svids[svid] = dict(active=active, configs=configs, opos=opos)
 
        logging.debug("Port %d svids = %s", self.index, svids)
        return svids
 
    def is_dfp(self):
        """Determine if data role is Downstream Facing Port (DFP).
 
        @returns True if DFP False otherwise.
        """
        if self._port_info is None:
            self._port_info = self._get_port_info()
 
        return self._port_info['data_role'] == 'DFP'
 
    def is_amode_supported(self, svid):
        """Check if alternate mode is supported by port partner.
 
        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
        """
        if self._amodes is None:
            self._amodes = self._get_amodes()
 
        if svid in self._amodes.keys():
            return True
        return False
 
    def is_amode_entered(self, svid, opos):
        """Check if alternate mode is entered.
 
        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
        @param opos: object position of config to act on.
 
        @returns True if entered False otherwise
        """
        if self._amodes is None:
            self._amodes = self._get_amodes()
 
        if not self.is_amode_supported(svid):
            return False
 
        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
            return True
 
        return False
 
    def set_amode(self, svid, opos, enter, delay_secs=2):
        """Set alternate mode.
 
        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
        @param opos: object position of config to act on.
        @param enter: Boolean of whether to enter mode.
 
        @raises error.TestError if ...
           mode not supported.
           opos is > number of configs.
 
        @returns True if successful False otherwise
        """
        if self._amodes is None:
            self._amodes = self._get_amodes()
 
        if svid not in self._amodes.keys():
            raise error.TestError("SVID %s not supported", svid)
 
        if opos > len(self._amodes[svid]['configs']):
            raise error.TestError("opos > available configs")
 
        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
                                         1 if enter else 0)
        self.ec_command(cmd, ignore_status=True)
        self._invalidate_port_data()
 
        # allow some time for mode entry/exit
        time.sleep(delay_secs)
        return self.is_amode_entered(svid, opos) == enter
 
    def get_flash_info(self):
        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
                                    'dev_minor', 'rw_hash', 'image_status'])
 
        cmd = 'infopddev %d' % self.index
 
        tries = 3
        while (tries):
            res = self.ec_command(cmd, ignore_status=True)
            if not 'has no discovered device' in res:
                break
 
            tries -= 1
            time.sleep(1)
 
        for ln in res.split('\n'):
            mat1 = re.match(mat1_re, ln)
            if mat1:
                flash_dict['ptype'] = int(mat1.group(1))
                flash_dict['vid'] = mat1.group(2)
                flash_dict['pid'] = mat1.group(3)
                continue
 
            mat2 = re.match(mat2_re, ln)
            if mat2:
                flash_dict['dev_major'] = int(mat2.group(1))
                flash_dict['dev_minor'] = int(mat2.group(2))
                flash_dict['rw_hash'] = mat2.group(3)
                flash_dict['image_status'] = mat2.group(4)
                break
 
        return flash_dict
 
 
class EC_USBPD(EC_Common):
    """Class for CrOS embedded controller for USB-PD.
 
    Public attributes:
        ports: list EC_USBPD_Port instances
 
    Public Methods:
        get_num_ports: get number of USB-PD ports device has.
 
    Private attributes:
        _num_ports: integer number of USB-PD ports device has.
    """
    def __init__(self, num_ports=None):
        """Constructor.
 
        @param num_ports: total number of USB-PD ports on device.  This is an
          override.  If left 'None' will try to determine.
        """
        self._num_ports = num_ports
        self.ports = list()
 
        # TODO(crosbug.com/p/38133) target= only works for samus
        super(EC_USBPD, self).__init__(target='cros_pd')
 
        if (self.get_num_ports() == 0):
            raise error.TestNAError("Device has no USB-PD ports")
 
        for i in xrange(self._num_ports):
            self.ports.append(EC_USBPD_Port(i))
 
    def get_num_ports(self):
        """Determine the number of ports for device.
 
        Uses ectool's usbpdpower command which in turn makes host command call
        to EC_CMD_USB_PD_PORTS to determine the number of ports.
 
        TODO(tbroch) May want to consider adding separate ectool command to
        surface the number of ports directly instead of via usbpdpower
 
        @returns number of ports.
        """
        if (self._num_ports is not None):
            return self._num_ports
 
        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
        return self._num_ports