tzh
2024-08-22 c7d0944258c7d0943aa7b2211498fd612971ce27
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import contextlib
import json
import logging
from lxml import etree
import os
import StringIO
 
from autotest_lib.client.common_lib import utils
 
 
class ChartFixture:
    """Sets up chart tablet to display dummy scene image."""
    DISPLAY_SCRIPT = '/usr/local/autotest/bin/display_chart.py'
 
    def __init__(self, chart_host, scene_uri):
        self.host = chart_host
        self.scene_uri = scene_uri
        self.display_pid = None
 
    def initialize(self):
        """Prepare scene file and display it on chart host."""
        logging.info('Prepare scene file')
        chart_dir = self.host.get_tmp_dir()
        scene_path = os.path.join(
                chart_dir, self.scene_uri[self.scene_uri.rfind('/') + 1:])
        self.host.run('wget', args=('-O', scene_path, self.scene_uri))
        self.host.run('chmod', args=('-R', '755', chart_dir))
 
        logging.info('Display scene file')
        self.display_pid = self.host.run_background(
                'python %s %s' % (self.DISPLAY_SCRIPT, scene_path))
        # TODO(inker): Suppose chart should be displayed very soon. Or require
        # of waiting until chart actually displayed.
 
    def cleanup(self):
        """Cleanup display script."""
        if self.display_pid is not None:
            self.host.run('kill', args=('-2', str(self.display_pid)))
 
 
def get_chart_address(host_address, args):
    """Get address of chart tablet from commandline args or mapping logic in
    test lab.
 
    @param host_address: a list of hostname strings.
    @param args: a dict parse from commandline args.
    @return:
        A list of strings for chart tablet addresses.
    """
    address = utils.args_to_dict(args).get('chart')
    if address is not None:
        return address.split(',')
    elif utils.is_in_container():
        return [
                utils.get_lab_chart_address(host)
                for host in host_address
        ]
    else:
        return None
 
 
class DUTFixture:
    """Sets up camera filter for target camera facing on DUT."""
    TEST_CONFIG_PATH = '/var/cache/camera/test_config.json'
    GENERATE_CAMERA_PROFILE = os.path.join('/usr', 'bin',
                                           'generate_camera_profile')
    GENERATE_CAMERA_PROFILE_BACKUP = GENERATE_CAMERA_PROFILE + '.bak'
    CAMERA_PROFILE_PATH = ('/mnt/stateful_partition/encrypted/var/cache/camera'
                           '/media_profiles.xml')
 
    def __init__(self, test, host, facing):
        self.test = test
        self.host = host
        self.facing = facing
 
    @contextlib.contextmanager
    def _set_selinux_permissive(self):
        selinux_mode = self.host.run_output('getenforce')
        self.host.run('setenforce 0')
        yield
        self.host.run('setenforce', args=(selinux_mode, ))
 
    def _filter_camera_profile(self, content, facing):
        """Filter camera profile of target facing from content of camera
        profile.
 
        @return:
            New camera profile with only target facing, camera ids are
            renumbered from 0.
        """
        tree = etree.parse(
                StringIO.StringIO(content),
                parser=etree.XMLParser(compact=False))
        root = tree.getroot()
        profiles = root.findall('CamcorderProfiles')
        logging.debug('%d number of camera(s) found in camera profile',
                      len(profiles))
        assert 1 <= len(profiles) <= 2
        if len(profiles) == 2:
            cam_id = 0 if facing == 'back' else 1
            for p in profiles:
                if cam_id == int(p.attrib['cameraId']):
                    p.attrib['cameraId'] = '0'
                else:
                    root.remove(p)
        else:
            with self.test._login_chrome(
                    board=self.test._get_board_name(),
                    reboot=False), self._set_selinux_permissive():
                has_front_camera = (
                        'feature:android.hardware.camera.front' in self.host.
                        run_output('android-sh -c "pm list features"'))
                logging.debug('has_front_camera=%s', has_front_camera)
            if (facing == 'front') != has_front_camera:
                root.remove(profiles[0])
        return etree.tostring(
                tree, xml_declaration=True, encoding=tree.docinfo.encoding)
 
    def _read_file(self, filepath):
        """Read content of filepath from host."""
        tmp_path = os.path.join(self.test.tmpdir, os.path.basename(filepath))
        self.host.get_file(filepath, tmp_path, delete_dest=True)
        with open(tmp_path) as f:
            return f.read()
 
    def _write_file(self, filepath, content, permission=None, owner=None):
        """Write content to filepath on remote host.
        @param permission: set permission to 0xxx octal number of remote file.
        @param owner: set owner of remote file.
        """
        tmp_path = os.path.join(self.test.tmpdir, os.path.basename(filepath))
        with open(tmp_path, 'w') as f:
            f.write(content)
        if permission is not None:
            os.chmod(tmp_path, permission)
        self.host.send_file(tmp_path, filepath, delete_dest=True)
        if owner is not None:
            self.host.run('chown', args=(owner, filepath))
 
    def initialize(self):
        """Filter out camera other than target facing on DUT."""
        logging.info('Restart camera service with filter option')
        self._write_file(
                self.TEST_CONFIG_PATH,
                json.dumps({
                        'enable_back_camera': self.facing == 'back',
                        'enable_front_camera': self.facing == 'front',
                        'enable_external_camera': False
                }),
                owner='arc-camera')
        self.host.run('restart cros-camera')
 
        # To replace camera profile in ARC++ container, arc_setup will run
        # GENERATE_CAMERA_PROFILE and mount its generated profile under
        # CAMERA_PROFILE_PATH into container.
        logging.info('Replace camera profile in ARC++ container')
        self.host.run(
                'mv',
                args=(self.GENERATE_CAMERA_PROFILE,
                      self.GENERATE_CAMERA_PROFILE_BACKUP))
        self._write_file(
                self.GENERATE_CAMERA_PROFILE,
                '''\
#!/bin/bash
# Put this executable file to %r to make sure arc-setup knows
# we're using dynamic media_profiles.xml copy from host path
# %r''' % (self.GENERATE_CAMERA_PROFILE, self.CAMERA_PROFILE_PATH),
                permission=0755)
        profile = self._read_file(self.CAMERA_PROFILE_PATH)
        new_profile = self._filter_camera_profile(profile, self.facing)
        self._write_file(self.CAMERA_PROFILE_PATH, new_profile)
        self.host.run('restart ui')
 
    def cleanup(self):
        """Cleanup camera filter."""
        logging.info('Remove filter option and restore camera service')
        self.host.run('rm', args=('-f', self.TEST_CONFIG_PATH))
        self.host.run('restart cros-camera')
 
        # Restore GENERATE_CAMERA_PROFILE to regenerate camera profile on DUT.
        logging.info('Restore camera profile in ARC++ container')
        self.host.run(
                'mv',
                args=(self.GENERATE_CAMERA_PROFILE_BACKUP,
                      self.GENERATE_CAMERA_PROFILE),
                ignore_status=True)
        self.host.run('restart ui')