ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import error, logging, os, serial, shutil, threading, time
 
_power_play_data_file = '/tmp/power_play_data'
 
class PowerPlay(object):
    """Class to record serial over USB data from Power Play (go/powerplay).
 
    It detects if powerplay is connected to the DUT over USB and opens the
    serial port to start receiving powerplay data. It also opens a text file to
    save this data after some formatting.
    """
 
    version = 1
 
    def __init__(self, test_obj, record_interval=0):
        """Initialize PowerPlay.
 
        @param test_obj: test object.
        @param record_interval: Power play data recording interval in seconds.
        """
        self.test = test_obj
        self.ser = None
        self.recording_interval = record_interval
        self.momentary_curr_list = list()
        self.record_thread = None
 
    def extract_current(self, pp_data):
        """Extract momentary current value from each line of powerplay data.
 
        @param pp_data: Single line of powerplay data with eight comma separated
                        values.
        @return list containing momentary current values.
        """
        if pp_data[0].isdigit():
            self.momentary_curr_list.append(float(pp_data[pp_data.index(',')+1:]
                    [:pp_data[pp_data.index(',')+1:].index(',')]))
        return self.momentary_curr_list
 
    def start_recording_power_play_data(self):
        """Starts a new thread to record power play data."""
        self.record_thread = threading.Thread(target=self.start_record_thread)
        self.record_thread.daemon = True
        self.record_thread.start()
 
    def start_record_thread(self):
        """Start recording power play data.
 
        Get a list of connected USB devices and try to establish a serial
        connection. Once the connection is established, open a text file and
        start reading serial data and write it to the text file after some
        formatting.
        """
        devices = [x for x in os.listdir('/dev/') if x.startswith('ttyUSB')]
 
        for device in devices:
            device_link = '/dev/' + device
            try:
                if self.ser == None:
                    logging.info('Trying ... %s', device_link)
                    self.ser = serial.Serial(device_link, 115200)
                    logging.info('Successfully connected to %s', device_link)
                    break
            except serial.SerialException, e:
                raise error.TestError('Failed to connect to %s becuase of %s' %
                                     (device_link, str(e)))
 
        self.text_file = open(_power_play_data_file, 'w')
 
        if self.ser != None:
            title_row = ('time,powerplay_timestamp,momentary_current (A),' +
                    'momentary_charge (AH),average_current (A),' +
                    'total_standby_time,total_wake_time,num_wakes,is_awake?\n')
            self.text_file.write(title_row)
            start_time = time.time()
            while self.ser.readline():
                current_timestamp = (('{:>10.3f}'.
                        format(time.time() - start_time)).replace(' ', ''))
                pp_data = (self.ser.readline().replace('\00', '').
                        replace(' ', ',').replace('\r', ''))
                if (not pp_data.startswith('#') and (len(pp_data) > 30) and
                        not self.text_file.closed):
                    self.text_file.write(current_timestamp + ',' + pp_data)
                    self.momentary_curr_list = self.extract_current(pp_data)
                time.sleep(self.recording_interval)
                self.ser.flushInput()
        else:
            self.text_file.write('No data from powerplay. Check connection.')
 
    def stop_recording_power_play_data(self):
        """Stop recording power play data.
 
        Close the text file and copy it to the test log results directory. Also
        report current data to the performance dashboard.
        """
        if not self.text_file.closed:
            self.text_file.close()
        shutil.copy(_power_play_data_file, self.test.resultsdir)
        self.test.output_perf_value(description='momentary_current_draw',
                               value=self.momentary_curr_list,
                               units='Amps', higher_is_better=False)