huangcm
2025-08-14 5d6606c55520a76d5bb8297d83fd9bbf967e5244
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# Copyright 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
import os
import os.path
import re
import subprocess
import sys
import tempfile
import time
 
import its.device
import numpy
 
SCENE_NAME = 'sensor_fusion'
SKIP_RET_CODE = 101
TEST_NAME = 'test_sensor_fusion'
TEST_DIR = os.path.join(os.getcwd(), 'tests', SCENE_NAME)
W, H = 640, 480
 
# For finding best correlation shifts from test output logs.
SHIFT_RE = re.compile('^Best correlation of [0-9.]+ at shift of [-0-9.]+ms$')
# For finding lines that indicate socket issues in failed test runs.
SOCKET_FAIL_RE = re.compile(
        r'.*((socket\.(error|timeout))|(Problem with socket)).*')
 
FPS = 30
TEST_LENGTH = 7  # seconds
 
 
def main():
    """Run test_sensor_fusion NUM_RUNS times.
 
    Save intermediate files and produce a summary/report of the results.
 
    Script should be run from the top-level CameraITS directory.
 
    Command line arguments:
        camera:      Camera(s) to be tested. Use comma to separate multiple
                     camera Ids. Ex: 'camera=0,1' or 'camera=1'
        device:      Device id for adb
        fps:         FPS to capture with during the test
        img_size:    Comma-separated dimensions of captured images (defaults to
                     640x480). Ex: 'img_size=<width>,<height>'
        num_runs:    Number of times to repeat the test
        rotator:     String for rotator id in for vid:pid:ch
        test_length: How long the test should run for (in seconds)
        tmp_dir:     Location of temp directory for output files
    """
 
    camera_id = '0'
    fps = str(FPS)
    img_size = '%s,%s' % (W, H)
    num_runs = 1
    rotator_ids = 'default'
    test_length = str(TEST_LENGTH)
    tmp_dir = None
    for s in sys.argv[1:]:
        if s[:7] == 'camera=' and len(s) > 7:
            camera_id = s[7:]
        if s[:4] == 'fps=' and len(s) > 4:
            fps = s[4:]
        elif s[:9] == 'img_size=' and len(s) > 9:
            img_size = s[9:]
        elif s[:9] == 'num_runs=' and len(s) > 9:
            num_runs = int(s[9:])
        elif s[:8] == 'rotator=' and len(s) > 8:
            rotator_ids = s[8:]
        elif s[:12] == 'test_length=' and len(s) > 12:
            test_length = s[12:]
        elif s[:8] == 'tmp_dir=' and len(s) > 8:
            tmp_dir = s[8:]
 
    # Make output directories to hold the generated files.
    tmpdir = tempfile.mkdtemp(dir=tmp_dir)
    print 'Saving output files to:', tmpdir, '\n'
 
    device_id = its.device.get_device_id()
    device_id_arg = 'device=' + device_id
    print 'Testing device ' + device_id
 
    # ensure camera_id is valid
    avail_camera_ids = find_avail_camera_ids()
    if camera_id not in avail_camera_ids:
        print 'Need to specify valid camera_id in ', avail_camera_ids
        sys.exit()
 
    camera_id_arg = 'camera=' + camera_id
    if rotator_ids:
        rotator_id_arg = 'rotator=' + rotator_ids
    print 'Preparing to run sensor_fusion on camera', camera_id
 
    img_size_arg = 'img_size=' + img_size
    print 'Image dimensions are ' + 'x'.join(img_size.split(','))
 
    fps_arg = 'fps=' + fps
    test_length_arg = 'test_length=' + test_length
    print 'Capturing at %sfps' % fps
 
    os.mkdir(os.path.join(tmpdir, camera_id))
 
    # Run test "num_runs" times, capturing stdout and stderr.
    num_pass = 0
    num_fail = 0
    num_skip = 0
    num_socket_fails = 0
    num_non_socket_fails = 0
    shift_list = []
    for i in range(num_runs):
        os.mkdir(os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i)))
        cmd = 'python tools/rotation_rig.py rotator=%s' % rotator_ids
        subprocess.Popen(cmd.split())
        cmd = ['python', os.path.join(TEST_DIR, TEST_NAME+'.py'),
               device_id_arg, camera_id_arg, rotator_id_arg, img_size_arg,
               fps_arg, test_length_arg]
        outdir = os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i))
        outpath = os.path.join(outdir, TEST_NAME+'_stdout.txt')
        errpath = os.path.join(outdir, TEST_NAME+'_stderr.txt')
        t0 = time.time()
        with open(outpath, 'w') as fout, open(errpath, 'w') as ferr:
            retcode = subprocess.call(
                    cmd, stderr=ferr, stdout=fout, cwd=outdir)
        t1 = time.time()
 
        if retcode == 0:
            retstr = 'PASS '
            time_shift = find_time_shift(outpath)
            shift_list.append(time_shift)
            num_pass += 1
        elif retcode == SKIP_RET_CODE:
            retstr = 'SKIP '
            num_skip += 1
        else:
            retstr = 'FAIL '
            time_shift = find_time_shift(outpath)
            if time_shift is None:
                if is_socket_fail(errpath):
                    num_socket_fails += 1
                else:
                    num_non_socket_fails += 1
            else:
                shift_list.append(time_shift)
            num_fail += 1
        msg = '%s %s/%s [%.1fs]' % (retstr, SCENE_NAME, TEST_NAME, t1-t0)
        print msg
 
    if num_pass == 1:
        print 'Best shift is %sms' % shift_list[0]
    elif num_pass > 1:
        shift_arr = numpy.array(shift_list)
        mean, std = numpy.mean(shift_arr), numpy.std(shift_arr)
        print 'Best shift mean is %sms with std. dev. of %sms' % (mean, std)
 
    pass_percentage = 100*float(num_pass+num_skip)/num_runs
    print '%d / %d tests passed (%.1f%%)' % (num_pass+num_skip,
                                             num_runs,
                                             pass_percentage)
 
    if num_socket_fails != 0:
        print '%s failure(s) due to socket issues' % num_socket_fails
    if num_non_socket_fails != 0:
        print '%s non-socket failure(s)' % num_non_socket_fails
 
 
def is_socket_fail(err_file_path):
    """Search through a test run's stderr log for any mention of socket issues.
 
    Args:
        err_file_path: File path for stderr logs to search through
 
    Returns:
        True if the test run failed and it was due to socket issues. Otherwise,
        False.
    """
    return find_matching_line(err_file_path, SOCKET_FAIL_RE) is not None
 
 
def find_time_shift(out_file_path):
    """Search through a test run's stdout log for the best time shift.
 
    Args:
        out_file_path: File path for stdout logs to search through
 
    Returns:
        The best time shift, if one is found. Otherwise, returns None.
    """
    line = find_matching_line(out_file_path, SHIFT_RE)
    if line is None:
        return None
    else:
        words = line.split(' ')
        # Get last word and strip off 'ms\n' before converting to a float.
        return float(words[-1][:-3])
 
 
def find_matching_line(file_path, regex):
    """Search each line in the file at 'file_path' for a line matching 'regex'.
 
    Args:
        file_path: File path for file being searched
        regex:     Regex used to match against lines
 
    Returns:
        The first matching line. If none exists, returns None.
    """
    with open(file_path) as f:
        for line in f:
            if regex.match(line):
                return line
    return None
 
def find_avail_camera_ids():
    """Find the available camera IDs.
 
    Returns:
        list of available cameras
    """
    with its.device.ItsSession() as cam:
        avail_camera_ids = cam.get_camera_ids()
    return avail_camera_ids
 
 
if __name__ == '__main__':
    main()