lin
2025-08-21 57113df3a0e2be01232281fad9a5f2c060567981
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
# Copyright 2017, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
#pylint: disable=too-many-lines
"""
Command Line Translator for atest.
"""
 
from __future__ import print_function
 
import fnmatch
import json
import logging
import os
import sys
import time
 
import atest_error
import atest_utils
import constants
import test_finder_handler
import test_mapping
 
from metrics import metrics
from metrics import metrics_utils
from test_finders import module_finder
 
TEST_MAPPING = 'TEST_MAPPING'
FUZZY_FINDER = 'FUZZY'
 
 
#pylint: disable=no-self-use
class CLITranslator(object):
    """
    CLITranslator class contains public method translate() and some private
    helper methods. The atest tool can call the translate() method with a list
    of strings, each string referencing a test to run. Translate() will
    "translate" this list of test strings into a list of build targets and a
    list of TradeFederation run commands.
 
    Translation steps for a test string reference:
        1. Narrow down the type of reference the test string could be, i.e.
           whether it could be referencing a Module, Class, Package, etc.
        2. Try to find the test files assuming the test string is one of these
           types of reference.
        3. If test files found, generate Build Targets and the Run Command.
    """
 
    def __init__(self, module_info=None):
        """CLITranslator constructor
 
        Args:
            module_info: ModuleInfo class that has cached module-info.json.
        """
        self.mod_info = module_info
 
    def _find_test_infos(self, test, tm_test_detail):
        """Return set of TestInfos based on a given test.
 
        Args:
            test: A string representing test references.
            tm_test_detail: The TestDetail of test configured in TEST_MAPPING
                files.
 
        Returns:
            Set of TestInfos based on the given test.
        """
        test_infos = set()
        test_find_starts = time.time()
        test_found = False
        test_finders = []
        test_info_str = ''
        find_test_err_msg = None
        for finder in test_finder_handler.get_find_methods_for_test(
                self.mod_info, test):
            # For tests in TEST_MAPPING, find method is only related to
            # test name, so the details can be set after test_info object
            # is created.
            try:
                test_info = finder.find_method(finder.test_finder_instance,
                                               test)
            except atest_error.TestDiscoveryException as e:
                find_test_err_msg = e
            if test_info:
                if tm_test_detail:
                    test_info.data[constants.TI_MODULE_ARG] = (
                        tm_test_detail.options)
                    test_info.from_test_mapping = True
                    test_info.host = tm_test_detail.host
                test_infos.add(test_info)
                test_found = True
                finder_info = finder.finder_info
                print("Found '%s' as %s" % (
                    atest_utils.colorize(test, constants.GREEN),
                    finder_info))
                test_finders.append(finder_info)
                test_info_str = str(test_info)
                break
        if not test_found:
            f_results = self._fuzzy_search_and_msg(test, find_test_err_msg)
            if f_results:
                test_infos.add(f_results)
                test_found = True
                test_finders.append(FUZZY_FINDER)
        metrics.FindTestFinishEvent(
            duration=metrics_utils.convert_duration(
                time.time() - test_find_starts),
            success=test_found,
            test_reference=test,
            test_finders=test_finders,
            test_info=test_info_str)
        return test_infos
 
    def _fuzzy_search_and_msg(self, test, find_test_err_msg):
        """ Fuzzy search and print message.
 
        Args:
            test: A string representing test references
            find_test_err_msg: A string of find test error message.
 
        Returns:
            A TestInfos if found, otherwise None.
        """
        print('No test found for: %s' %
              atest_utils.colorize(test, constants.RED))
        # Currently we focus on guessing module names. Append names on
        # results if more finders support fuzzy searching.
        mod_finder = module_finder.ModuleFinder(self.mod_info)
        results = mod_finder.get_fuzzy_searching_results(test)
        if len(results) == 1 and self._confirm_running(results):
            test_info = mod_finder.find_test_by_module_name(results[0])
            if test_info:
                return test_info
        elif len(results) > 1:
            self._print_fuzzy_searching_results(results)
        else:
            print('No matching result for {0}.'.format(test))
        if find_test_err_msg:
            print('%s\n' % (atest_utils.colorize(
                find_test_err_msg, constants.MAGENTA)))
        else:
            print('(This can happen after a repo sync or if the test'
                  ' is new. Running: with "%s" may resolve the issue.)'
                  '\n' % (atest_utils.colorize(
                      constants.REBUILD_MODULE_INFO_FLAG,
                      constants.RED)))
        return None
 
    def _get_test_infos(self, tests, test_mapping_test_details=None):
        """Return set of TestInfos based on passed in tests.
 
        Args:
            tests: List of strings representing test references.
            test_mapping_test_details: List of TestDetail for tests configured
                in TEST_MAPPING files.
 
        Returns:
            Set of TestInfos based on the passed in tests.
        """
        test_infos = set()
        if not test_mapping_test_details:
            test_mapping_test_details = [None] * len(tests)
        for test, tm_test_detail in zip(tests, test_mapping_test_details):
            found_test_infos = self._find_test_infos(test, tm_test_detail)
            test_infos.update(found_test_infos)
        return test_infos
 
    def _confirm_running(self, results):
        """Listen to an answer from raw input.
 
        Args:
            results: A list of results.
 
        Returns:
            True is the answer is affirmative.
        """
        decision = raw_input('Did you mean {0}? [Y/n] '.format(
            atest_utils.colorize(results[0], constants.GREEN)))
        return decision in constants.AFFIRMATIVES
 
    def _print_fuzzy_searching_results(self, results):
        """Print modules when fuzzy searching gives multiple results.
 
        If the result is lengthy, just print the first 10 items only since we
        have already given enough-accurate result.
 
        Args:
            results: A list of guessed testable module names.
 
        """
        atest_utils.colorful_print('Did you mean the following modules?',
                                   constants.WHITE)
        for mod in results[:10]:
            atest_utils.colorful_print(mod, constants.GREEN)
 
    def _read_tests_in_test_mapping(self, test_mapping_file):
        """Read tests from a TEST_MAPPING file.
 
        Args:
            test_mapping_file: Path to a TEST_MAPPING file.
 
        Returns:
            A tuple of (all_tests, imports), where
            all_tests is a dictionary of all tests in the TEST_MAPPING file,
                grouped by test group.
            imports is a list of test_mapping.Import to include other test
                mapping files.
        """
        all_tests = {}
        imports = []
        test_mapping_dict = None
        with open(test_mapping_file) as json_file:
            test_mapping_dict = json.load(json_file)
        for test_group_name, test_list in test_mapping_dict.items():
            if test_group_name == constants.TEST_MAPPING_IMPORTS:
                for import_detail in test_list:
                    imports.append(
                        test_mapping.Import(test_mapping_file, import_detail))
            else:
                grouped_tests = all_tests.setdefault(test_group_name, set())
                tests = []
                for test in test_list:
                    test_mod_info = self.mod_info.name_to_module_info.get(
                        test['name'])
                    if not test_mod_info:
                        print('WARNING: %s is not a valid build target and '
                              'may not be discoverable by TreeHugger. If you '
                              'want to specify a class or test-package, '
                              'please set \'name\' to the test module and use '
                              '\'options\' to specify the right tests via '
                              '\'include-filter\'.\nNote: this can also occur '
                              'if the test module is not built for your '
                              'current lunch target.\n' %
                              atest_utils.colorize(test['name'], constants.RED))
                    elif not any(x in test_mod_info['compatibility_suites'] for
                                 x in constants.TEST_MAPPING_SUITES):
                        print('WARNING: Please add %s to either suite: %s for '
                              'this TEST_MAPPING file to work with TreeHugger.' %
                              (atest_utils.colorize(test['name'],
                                                    constants.RED),
                               atest_utils.colorize(constants.TEST_MAPPING_SUITES,
                                                    constants.GREEN)))
                    tests.append(test_mapping.TestDetail(test))
                grouped_tests.update(tests)
        return all_tests, imports
 
    def _find_files(self, path, file_name=TEST_MAPPING):
        """Find all files with given name under the given path.
 
        Args:
            path: A string of path in source.
 
        Returns:
            A list of paths of the files with the matching name under the given
            path.
        """
        test_mapping_files = []
        for root, _, filenames in os.walk(path):
            for filename in fnmatch.filter(filenames, file_name):
                test_mapping_files.append(os.path.join(root, filename))
        return test_mapping_files
 
    def _get_tests_from_test_mapping_files(
            self, test_group, test_mapping_files):
        """Get tests in the given test mapping files with the match group.
 
        Args:
            test_group: Group of tests to run. Default is set to `presubmit`.
            test_mapping_files: A list of path of TEST_MAPPING files.
 
        Returns:
            A tuple of (tests, all_tests, imports), where,
            tests is a set of tests (test_mapping.TestDetail) defined in
            TEST_MAPPING file of the given path, and its parent directories,
            with matching test_group.
            all_tests is a dictionary of all tests in TEST_MAPPING files,
            grouped by test group.
            imports is a list of test_mapping.Import objects that contains the
            details of where to import a TEST_MAPPING file.
        """
        all_imports = []
        # Read and merge the tests in all TEST_MAPPING files.
        merged_all_tests = {}
        for test_mapping_file in test_mapping_files:
            all_tests, imports = self._read_tests_in_test_mapping(
                test_mapping_file)
            all_imports.extend(imports)
            for test_group_name, test_list in all_tests.items():
                grouped_tests = merged_all_tests.setdefault(
                    test_group_name, set())
                grouped_tests.update(test_list)
 
        tests = set(merged_all_tests.get(test_group, []))
        # Postsubmit tests shall include all presubmit tests as well.
        if test_group == constants.TEST_GROUP_POSTSUBMIT:
            tests.update(merged_all_tests.get(
                constants.TEST_GROUP_PRESUBMIT, set()))
        elif test_group == constants.TEST_GROUP_ALL:
            for grouped_tests in merged_all_tests.values():
                tests.update(grouped_tests)
        return tests, merged_all_tests, all_imports
 
    # pylint: disable=too-many-arguments
    # pylint: disable=too-many-locals
    def _find_tests_by_test_mapping(
            self, path='', test_group=constants.TEST_GROUP_PRESUBMIT,
            file_name=TEST_MAPPING, include_subdirs=False, checked_files=None):
        """Find tests defined in TEST_MAPPING in the given path.
 
        Args:
            path: A string of path in source. Default is set to '', i.e., CWD.
            test_group: Group of tests to run. Default is set to `presubmit`.
            file_name: Name of TEST_MAPPING file. Default is set to
                `TEST_MAPPING`. The argument is added for testing purpose.
            include_subdirs: True to include tests in TEST_MAPPING files in sub
                directories.
            checked_files: Paths of TEST_MAPPING files that have been checked.
 
        Returns:
            A tuple of (tests, all_tests), where,
            tests is a set of tests (test_mapping.TestDetail) defined in
            TEST_MAPPING file of the given path, and its parent directories,
            with matching test_group.
            all_tests is a dictionary of all tests in TEST_MAPPING files,
            grouped by test group.
        """
        path = os.path.realpath(path)
        test_mapping_files = set()
        all_tests = {}
        test_mapping_file = os.path.join(path, file_name)
        if os.path.exists(test_mapping_file):
            test_mapping_files.add(test_mapping_file)
        # Include all TEST_MAPPING files in sub-directories if `include_subdirs`
        # is set to True.
        if include_subdirs:
            test_mapping_files.update(self._find_files(path, file_name))
        # Include all possible TEST_MAPPING files in parent directories.
        root_dir = os.environ.get(constants.ANDROID_BUILD_TOP, os.sep)
        while path != root_dir and path != os.sep:
            path = os.path.dirname(path)
            test_mapping_file = os.path.join(path, file_name)
            if os.path.exists(test_mapping_file):
                test_mapping_files.add(test_mapping_file)
 
        if checked_files is None:
            checked_files = set()
        test_mapping_files.difference_update(checked_files)
        checked_files.update(test_mapping_files)
        if not test_mapping_files:
            return test_mapping_files, all_tests
 
        tests, all_tests, imports = self._get_tests_from_test_mapping_files(
            test_group, test_mapping_files)
 
        # Load TEST_MAPPING files from imports recursively.
        if imports:
            for import_detail in imports:
                path = import_detail.get_path()
                # (b/110166535 #19) Import path might not exist if a project is
                # located in different directory in different branches.
                if path is None:
                    logging.warn(
                        'Failed to import TEST_MAPPING at %s', import_detail)
                    continue
                # Search for tests based on the imported search path.
                import_tests, import_all_tests = (
                    self._find_tests_by_test_mapping(
                        path, test_group, file_name, include_subdirs,
                        checked_files))
                # Merge the collections
                tests.update(import_tests)
                for group, grouped_tests in import_all_tests.items():
                    all_tests.setdefault(group, set()).update(grouped_tests)
 
        return tests, all_tests
 
    def _gather_build_targets(self, test_infos):
        targets = set()
        for test_info in test_infos:
            targets |= test_info.build_targets
        return targets
 
    def _get_test_mapping_tests(self, args):
        """Find the tests in TEST_MAPPING files.
 
        Args:
            args: arg parsed object.
 
        Returns:
            A tuple of (test_names, test_details_list), where
            test_names: a list of test name
            test_details_list: a list of test_mapping.TestDetail objects for
                the tests in TEST_MAPPING files with matching test group.
        """
        # Pull out tests from test mapping
        src_path = ''
        test_group = constants.TEST_GROUP_PRESUBMIT
        if args.tests:
            if ':' in args.tests[0]:
                src_path, test_group = args.tests[0].split(':')
            else:
                src_path = args.tests[0]
 
        test_details, all_test_details = self._find_tests_by_test_mapping(
            path=src_path, test_group=test_group,
            include_subdirs=args.include_subdirs, checked_files=set())
        test_details_list = list(test_details)
        if not test_details_list:
            logging.warn(
                'No tests of group `%s` found in TEST_MAPPING at %s or its '
                'parent directories.\nYou might be missing atest arguments,'
                ' try `atest --help` for more information',
                test_group, os.path.realpath(''))
            if all_test_details:
                tests = ''
                for test_group, test_list in all_test_details.items():
                    tests += '%s:\n' % test_group
                    for test_detail in sorted(test_list):
                        tests += '\t%s\n' % test_detail
                logging.warn(
                    'All available tests in TEST_MAPPING files are:\n%s',
                    tests)
            metrics_utils.send_exit_event(constants.EXIT_CODE_TEST_NOT_FOUND)
            sys.exit(constants.EXIT_CODE_TEST_NOT_FOUND)
 
        logging.debug(
            'Test details:\n%s',
            '\n'.join([str(detail) for detail in test_details_list]))
        test_names = [detail.name for detail in test_details_list]
        return test_names, test_details_list
 
 
    def translate(self, args):
        """Translate atest command line into build targets and run commands.
 
        Args:
            args: arg parsed object.
 
        Returns:
            A tuple with set of build_target strings and list of TestInfos.
        """
        tests = args.tests
        # Test details from TEST_MAPPING files
        test_details_list = None
        if atest_utils.is_test_mapping(args):
            tests, test_details_list = self._get_test_mapping_tests(args)
        atest_utils.colorful_print("\nFinding Tests...", constants.CYAN)
        logging.debug('Finding Tests: %s', tests)
        start = time.time()
        test_infos = self._get_test_infos(tests, test_details_list)
        logging.debug('Found tests in %ss', time.time() - start)
        for test_info in test_infos:
            logging.debug('%s\n', test_info)
        build_targets = self._gather_build_targets(test_infos)
        return build_targets, test_infos