lin
2025-07-30 fcd736bf35fd93b563e9bbf594f2aa7b62028cc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
#!/usr/bin/python
#
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
"""Tool to validate code in prod branch before pushing to lab.
 
The script runs push_to_prod suite to verify code in prod branch is ready to be
pushed. Link to design document:
https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
 
To verify if prod branch can be pushed to lab, run following command in
chromeos-staging-master2.hot server:
/usr/local/autotest/site_utils/test_push.py -e someone@company.com
 
The script uses latest gandof stable build as test build by default.
 
"""
 
import argparse
import ast
import datetime
import getpass
import multiprocessing
import os
import re
import subprocess
import sys
import time
import traceback
import urllib2
 
import common
try:
    from autotest_lib.frontend import setup_django_environment
    from autotest_lib.frontend.afe import models
    from autotest_lib.frontend.afe import rpc_utils
except ImportError:
    # Unittest may not have Django database configured and will fail to import.
    pass
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import priorities
from autotest_lib.client.common_lib.cros import retry
from autotest_lib.frontend.afe import rpc_client_lib
from autotest_lib.server import constants
from autotest_lib.server import site_utils
from autotest_lib.server import utils
from autotest_lib.server.cros import provision
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.site_utils import test_push_common
 
AUTOTEST_DIR=common.autotest_dir
CONFIG = global_config.global_config
 
AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
 
MAIL_FROM = 'chromeos-test@google.com'
BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
RUN_SUITE_COMMAND = 'run_suite.py'
PUSH_TO_PROD_SUITE = 'push_to_prod'
DUMMY_SUITE = 'dummy'
DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
DEFAULT_NUM_DUTS = (
        ('gandof', 4),
        ('quawks', 2),
)
 
SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
                              'tab_id=view_job&object_id=(\d+)$')
 
URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
 
# Some test could be extra / missing or have mismatched results for various
# reasons. Add such test in this list and explain the reason.
_IGNORED_TESTS = [
    # test_push uses a stable image build to test, which is quite behind ToT.
    # The following expectations are correct at ToT, but need to be ignored
    # until stable image is recent enough.
 
    # TODO(pprabhu): Remove once R70 is stable.
    'dummy_Fail.RetrySuccess',
    'dummy_Fail.RetryFail',
]
 
# Multiprocessing proxy objects that are used to share data between background
# suite-running processes and main process. The multiprocessing-compatible
# versions are initialized in _main.
_run_suite_output = []
_all_suite_ids = []
 
DEFAULT_SERVICE_RESPAWN_LIMIT = 2
 
 
class TestPushException(Exception):
    """Exception to be raised when the test to push to prod failed."""
    pass
 
@retry.retry(TestPushException, timeout_min=5, delay_sec=30)
def check_dut_inventory(required_num_duts, pool):
    """Check DUT inventory for each board in the pool specified..
 
    @param required_num_duts: a dict specifying the number of DUT each platform
                              requires in order to finish push tests.
    @param pool: the pool used by test_push.
    @raise TestPushException: if number of DUTs are less than the requirement.
    """
    print 'Checking DUT inventory...'
    pool_label = constants.Labels.POOL_PREFIX + pool
    hosts = AFE.run('get_hosts', status='Ready', locked=False)
    hosts = [h for h in hosts if pool_label in h.get('labels', [])]
    platforms = [host['platform'] for host in hosts]
    current_inventory = {p : platforms.count(p) for p in platforms}
    error_msg = ''
    for platform, req_num in required_num_duts.items():
        curr_num = current_inventory.get(platform, 0)
        if curr_num < req_num:
            error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
                          ' now' % (req_num, platform, pool, curr_num))
    if error_msg:
        raise TestPushException('Not enough DUTs to run push tests. %s' %
                                error_msg)
 
 
def powerwash_dut_to_test_repair(hostname, timeout):
    """Powerwash dut to test repair workflow.
 
    @param hostname: hostname of the dut.
    @param timeout: seconds of the powerwash test to hit timeout.
    @raise TestPushException: if DUT fail to run the test.
    """
    t = models.Test.objects.get(name='platform_Powerwash')
    c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path))
    job_id = rpc_utils.create_job_common(
             'powerwash', priority=priorities.Priority.SUPER,
             control_type='Server', control_file=c, hosts=[hostname])
 
    end = time.time() + timeout
    while not TKO.get_job_test_statuses_from_db(job_id):
        if time.time() >= end:
            AFE.run('abort_host_queue_entries', job=job_id)
            raise TestPushException(
                'Powerwash test on %s timeout after %ds, abort it.' %
                (hostname, timeout))
        time.sleep(10)
    verify_test_results(job_id,
                        test_push_common.EXPECTED_TEST_RESULTS_POWERWASH)
    # Kick off verify, verify will fail and a repair should be triggered.
    AFE.reverify_hosts(hostnames=[hostname])
 
 
def reverify_all_push_duts():
    """Reverify all the push DUTs."""
    print 'Reverifying all DUTs.'
    hosts = [h.hostname for h in AFE.get_hosts()]
    AFE.reverify_hosts(hostnames=hosts)
 
 
def parse_arguments(argv):
    """Parse arguments for test_push tool.
 
    @param argv   Argument vector, as for `sys.argv`, including the
                  command name in `argv[0]`.
    @return: Parsed arguments.
 
    """
    parser = argparse.ArgumentParser(prog=argv[0])
    parser.add_argument('-b', '--board', dest='board', default='gandof',
                        help='Default is gandof.')
    parser.add_argument('-sb', '--shard_board', dest='shard_board',
                        default='quawks',
                        help='Default is quawks.')
    parser.add_argument('-i', '--build', dest='build', default=None,
                        help='Default is the latest stale build of given '
                             'board. Must be a stable build, otherwise AU test '
                             'will fail. (ex: gandolf-release/R54-8743.25.0)')
    parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
                        help='Default is the latest stable build of given '
                             'board. Must be a stable build, otherwise AU test '
                             'will fail.')
    parser.add_argument('-p', '--pool', dest='pool', default='bvt')
    parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
                        default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
                        help='Time in mins to wait before abort the jobs we '
                             'are waiting on. Only for the asynchronous suites '
                             'triggered by create_and_return flag.')
    parser.add_argument('-ud', '--num_duts', dest='num_duts',
                        default=dict(DEFAULT_NUM_DUTS),
                        type=ast.literal_eval,
                        help="Python dict literal that specifies the required"
                        " number of DUTs for each board. E.g {'gandof':4}")
    parser.add_argument('-c', '--continue_on_failure', action='store_true',
                        dest='continue_on_failure',
                        help='All tests continue to run when there is failure')
    parser.add_argument('-sl', '--service_respawn_limit', type=int,
                        default=DEFAULT_SERVICE_RESPAWN_LIMIT,
                        help='If a service crashes more than this, the test '
                             'push is considered failed.')
 
    arguments = parser.parse_args(argv[1:])
 
    # Get latest stable build as default build.
    version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE)
    if not arguments.build:
        arguments.build = version_map.get_image_name(arguments.board)
    if not arguments.shard_build:
        arguments.shard_build = version_map.get_image_name(
            arguments.shard_board)
    return arguments
 
 
def do_run_suite(suite_name, arguments, use_shard=False,
                 create_and_return=False):
    """Call run_suite to run a suite job, and return the suite job id.
 
    The script waits the suite job to finish before returning the suite job id.
    Also it will echo the run_suite output to stdout.
 
    @param suite_name: Name of a suite, e.g., dummy.
    @param arguments: Arguments for run_suite command.
    @param use_shard: If true, suite is scheduled for shard board.
    @param create_and_return: If True, run_suite just creates the suite, print
                              the job id, then finish immediately.
 
    @return: Suite job ID.
 
    """
    if use_shard:
        board = arguments.shard_board
        build = arguments.shard_build
    else:
        board = arguments.board
        build = arguments.build
 
    # Remove cros-version label to force provision.
    hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
                          locked=False)
    for host in hosts:
        labels_to_remove = [
                l for l in host.labels
                if l.startswith(provision.CROS_VERSION_PREFIX)]
        if labels_to_remove:
            AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
 
        # Test repair work flow on shards, powerwash test will timeout after 7m.
        if use_shard and not create_and_return:
            powerwash_dut_to_test_repair(host.hostname, timeout=420)
 
    current_dir = os.path.dirname(os.path.realpath(__file__))
    cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
           '-s', suite_name,
           '-b', board,
           '-i', build,
           '-p', arguments.pool,
           '--minimum_duts', str(arguments.num_duts[board])]
    if create_and_return:
        cmd += ['-c']
 
    suite_job_id = None
 
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)
 
    while True:
        line = proc.stdout.readline()
 
        # Break when run_suite process completed.
        if not line and proc.poll() != None:
            break
        print line.rstrip()
        _run_suite_output.append(line.rstrip())
 
        if not suite_job_id:
            m = re.match(SUITE_JOB_START_INFO_REGEX, line)
            if m and m.group(1):
                suite_job_id = int(m.group(1))
                _all_suite_ids.append(suite_job_id)
 
    if not suite_job_id:
        raise TestPushException('Failed to retrieve suite job ID.')
 
    # If create_and_return specified, wait for the suite to finish.
    if create_and_return:
        end = time.time() + arguments.timeout_min * 60
        while not AFE.get_jobs(id=suite_job_id, finished=True):
            if time.time() < end:
                time.sleep(10)
            else:
                AFE.run('abort_host_queue_entries', job=suite_job_id)
                raise TestPushException(
                        'Asynchronous suite triggered by create_and_return '
                        'flag has timed out after %d mins. Aborting it.' %
                        arguments.timeout_min)
 
    print 'Suite job %s is completed.' % suite_job_id
    return suite_job_id
 
 
def check_dut_image(build, suite_job_id):
    """Confirm all DUTs used for the suite are imaged to expected build.
 
    @param build: Expected build to be imaged.
    @param suite_job_id: job ID of the suite job.
    @raise TestPushException: If a DUT does not have expected build imaged.
    """
    print 'Checking image installed in DUTs...'
    job_ids = [job.id for job in
               models.Job.objects.filter(parent_job_id=suite_job_id)]
    hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
            for job_id in job_ids]
    hostnames = set([hqe.host.hostname for hqe in hqes])
    for hostname in hostnames:
        found_build = site_utils.get_build_from_afe(hostname, AFE)
        if found_build != build:
            raise TestPushException('DUT is not imaged properly. Host %s has '
                                    'build %s, while build %s is expected.' %
                                    (hostname, found_build, build))
 
 
def test_suite(suite_name, expected_results, arguments, use_shard=False,
               create_and_return=False):
    """Call run_suite to start a suite job and verify results.
 
    @param suite_name: Name of a suite, e.g., dummy
    @param expected_results: A dictionary of test name to test result.
    @param arguments: Arguments for run_suite command.
    @param use_shard: If true, suite is scheduled for shard board.
    @param create_and_return: If True, run_suite just creates the suite, print
                              the job id, then finish immediately.
    """
    suite_job_id = do_run_suite(suite_name, arguments, use_shard,
                                create_and_return)
 
    # Confirm all DUTs used for the suite are imaged to expected build.
    # hqe.host_id for jobs running in shard is not synced back to master db,
    # therefore, skip verifying dut build for jobs running in shard.
    build_expected = arguments.build
    if not use_shard:
        check_dut_image(build_expected, suite_job_id)
 
    # Verify test results are the expected results.
    verify_test_results(suite_job_id, expected_results)
 
 
def verify_test_results(job_id, expected_results):
    """Verify the test results with the expected results.
 
    @param job_id: id of the running jobs. For suite job, it is suite_job_id.
    @param expected_results: A dictionary of test name to test result.
    @raise TestPushException: If verify fails.
    """
    print 'Comparing test results...'
    test_views = site_utils.get_test_views_from_tko(job_id, TKO)
    summary = test_push_common.summarize_push(test_views, expected_results,
                                              _IGNORED_TESTS)
 
    # Test link to log can be loaded.
    job_name = '%s-%s' % (job_id, getpass.getuser())
    log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name)
    try:
        urllib2.urlopen(log_link).read()
    except urllib2.URLError:
        summary.append('Failed to load page for link to log: %s.' % log_link)
 
    if summary:
        raise TestPushException('\n'.join(summary))
 
def test_suite_wrapper(queue, suite_name, expected_results, arguments,
                       use_shard=False, create_and_return=False):
    """Wrapper to call test_suite. Handle exception and pipe it to parent
    process.
 
    @param queue: Queue to save exception to be accessed by parent process.
    @param suite_name: Name of a suite, e.g., dummy
    @param expected_results: A dictionary of test name to test result.
    @param arguments: Arguments for run_suite command.
    @param use_shard: If true, suite is scheduled for shard board.
    @param create_and_return: If True, run_suite just creates the suite, print
                              the job id, then finish immediately.
    """
    try:
        test_suite(suite_name, expected_results, arguments, use_shard,
                   create_and_return)
    except Exception:
        # Store the whole exc_info leads to a PicklingError.
        except_type, except_value, tb = sys.exc_info()
        queue.put((except_type, except_value, traceback.extract_tb(tb)))
 
 
def check_queue(queue):
    """Check the queue for any exception being raised.
 
    @param queue: Queue used to store exception for parent process to access.
    @raise: Any exception found in the queue.
    """
    if queue.empty():
        return
    exc_info = queue.get()
    # Raise the exception with original backtrace.
    print 'Original stack trace of the exception:\n%s' % exc_info[2]
    raise exc_info[0](exc_info[1])
 
 
def _run_test_suites(arguments):
    """Run the actual tests that comprise the test_push."""
    # Use daemon flag will kill child processes when parent process fails.
    use_daemon = not arguments.continue_on_failure
    queue = multiprocessing.Queue()
 
    push_to_prod_suite = multiprocessing.Process(
            target=test_suite_wrapper,
            args=(queue, PUSH_TO_PROD_SUITE,
                  test_push_common.EXPECTED_TEST_RESULTS, arguments))
    push_to_prod_suite.daemon = use_daemon
    push_to_prod_suite.start()
 
    # suite test with --create_and_return flag
    asynchronous_suite = multiprocessing.Process(
            target=test_suite_wrapper,
            args=(queue, DUMMY_SUITE,
                  test_push_common.EXPECTED_TEST_RESULTS_DUMMY,
                  arguments, True, True))
    asynchronous_suite.daemon = True
    asynchronous_suite.start()
 
    while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive():
        check_queue(queue)
        time.sleep(5)
    check_queue(queue)
    push_to_prod_suite.join()
    asynchronous_suite.join()
 
 
def check_service_crash(respawn_limit, start_time):
  """Check whether scheduler or host_scheduler crash during testing.
 
  Since the testing push is kicked off at the beginning of a given hour, the way
  to check whether a service is crashed is to check whether the times of the
  service being respawn during testing push is over the respawn_limit.
 
  @param respawn_limit: The maximum number of times the service is allowed to
                        be respawn.
  @param start_time: The time that testing push is kicked off.
  """
  def _parse(filename_prefix, filename):
    """Helper method to parse the time of the log.
 
    @param filename_prefix: The prefix of the filename.
    @param filename: The name of the log file.
    """
    return datetime.datetime.strptime(filename[len(filename_prefix):],
                                      "%Y-%m-%d-%H.%M.%S")
 
  services = ['scheduler', 'host_scheduler']
  logs = os.listdir('%s/logs/' % AUTOTEST_DIR)
  curr_time = datetime.datetime.now()
 
  error_msg = ''
  for service in services:
    log_prefix = '%s.log.' % service
    respawn_count = sum(1 for l in logs if l.startswith(log_prefix)
                        and start_time <= _parse(log_prefix, l) <= curr_time)
 
    if respawn_count > respawn_limit:
      error_msg += ('%s has been respawned %s times during testing push at %s. '
                    'It is very likely crashed. Please check!\n' %
                    (service, respawn_count,
                     start_time.strftime("%Y-%m-%d-%H")))
  if error_msg:
    raise TestPushException(error_msg)
 
 
_SUCCESS_MSG = """
All staging tests completed successfully.
 
Instructions for pushing to prod are available at
https://goto.google.com/autotest-to-prod
"""
 
 
def _main(arguments):
    """Run test and promote repo branches if tests succeed.
 
    @param arguments: command line arguments.
    """
 
    # TODO Use chromite.lib.parallel.Manager instead, to workaround the
    # too-long-tmp-path problem.
    mpmanager = multiprocessing.Manager()
    # These are globals used by other functions in this module to communicate
    # back from worker processes.
    global _run_suite_output
    _run_suite_output = mpmanager.list()
    global _all_suite_ids
    _all_suite_ids = mpmanager.list()
 
    try:
        start_time = datetime.datetime.now()
        reverify_all_push_duts()
        time.sleep(15) # Wait for the verify test to start.
        check_dut_inventory(arguments.num_duts, arguments.pool)
        _run_test_suites(arguments)
        check_service_crash(arguments.service_respawn_limit, start_time)
        print _SUCCESS_MSG
    except Exception:
        # Abort running jobs unless flagged to continue when there is a failure.
        if not arguments.continue_on_failure:
            for suite_id in _all_suite_ids:
                if AFE.get_jobs(id=suite_id, finished=False):
                    AFE.run('abort_host_queue_entries', job=suite_id)
        raise
 
 
def main():
    """Entry point."""
    arguments = parse_arguments(sys.argv)
    _main(arguments)
 
 
if __name__ == '__main__':
    main()