ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
# Copyright (c) 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
 
import contextlib
import grp
import httplib
import json
import logging
import os
import random
import re
import time
import traceback
import urllib2
 
import common
from autotest_lib.client.bin.result_tools import utils as result_utils
from autotest_lib.client.bin.result_tools import utils_lib as result_utils_lib
from autotest_lib.client.bin.result_tools import view as result_view
from autotest_lib.client.common_lib import lsbrelease_utils
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import file_utils
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_queue_entry_states
from autotest_lib.client.common_lib import host_states
from autotest_lib.server.cros import provision
from autotest_lib.server.cros.dynamic_suite import constants
from autotest_lib.server.cros.dynamic_suite import job_status
 
try:
    from chromite.lib import metrics
except ImportError:
    metrics = utils.metrics_mock
 
 
CONFIG = global_config.global_config
 
_SHERIFF_JS = CONFIG.get_config_value('NOTIFICATIONS', 'sheriffs', default='')
_LAB_SHERIFF_JS = CONFIG.get_config_value(
        'NOTIFICATIONS', 'lab_sheriffs', default='')
_CHROMIUM_BUILD_URL = CONFIG.get_config_value(
        'NOTIFICATIONS', 'chromium_build_url', default='')
 
LAB_GOOD_STATES = ('open', 'throttled')
 
ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
        'CROS', 'enable_drone_in_restricted_subnet', type=bool,
        default=False)
 
# Wait at most 10 mins for duts to go idle.
IDLE_DUT_WAIT_TIMEOUT = 600
 
# Mapping between board name and build target. This is for special case handling
# for certain Android board that the board name and build target name does not
# match.
ANDROID_TARGET_TO_BOARD_MAP = {
        'seed_l8150': 'gm4g_sprout',
        'bat_land': 'bat'
        }
ANDROID_BOARD_TO_TARGET_MAP = {
        'gm4g_sprout': 'seed_l8150',
        'bat': 'bat_land'
        }
# Prefix for the metrics name for result size information.
RESULT_METRICS_PREFIX = 'chromeos/autotest/result_collection/'
 
class TestLabException(Exception):
    """Exception raised when the Test Lab blocks a test or suite."""
    pass
 
 
class ParseBuildNameException(Exception):
    """Raised when ParseBuildName() cannot parse a build name."""
    pass
 
 
class Singleton(type):
    """Enforce that only one client class is instantiated per process."""
    _instances = {}
 
    def __call__(cls, *args, **kwargs):
        """Fetch the instance of a class to use for subsequent calls."""
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(
                    *args, **kwargs)
        return cls._instances[cls]
 
class EmptyAFEHost(object):
    """Object to represent an AFE host object when there is no AFE."""
 
    def __init__(self):
        """
        We'll be setting the instance attributes as we use them.  Right now
        we only use attributes and labels but as time goes by and other
        attributes are used from an actual AFE Host object (check
        rpc_interfaces.get_hosts()), we'll add them in here so users won't be
        perplexed why their host's afe_host object complains that attribute
        doesn't exist.
        """
        self.attributes = {}
        self.labels = []
 
 
def ParseBuildName(name):
    """Format a build name, given board, type, milestone, and manifest num.
 
    @param name: a build name, e.g. 'x86-alex-release/R20-2015.0.0' or a
                 relative build name, e.g. 'x86-alex-release/LATEST'
 
    @return board: board the manifest is for, e.g. x86-alex.
    @return type: one of 'release', 'factory', or 'firmware'
    @return milestone: (numeric) milestone the manifest was associated with.
                        Will be None for relative build names.
    @return manifest: manifest number, e.g. '2015.0.0'.
                      Will be None for relative build names.
 
    """
    match = re.match(r'(trybot-)?(?P<board>[\w-]+?)(?:-chrome)?(?:-chromium)?'
                     r'-(?P<type>\w+)/(R(?P<milestone>\d+)-'
                     r'(?P<manifest>[\d.ab-]+)|LATEST)',
                     name)
    if match and len(match.groups()) >= 5:
        return (match.group('board'), match.group('type'),
                match.group('milestone'), match.group('manifest'))
    raise ParseBuildNameException('%s is a malformed build name.' % name)
 
 
def get_labels_from_afe(hostname, label_prefix, afe):
    """Retrieve a host's specific labels from the AFE.
 
    Looks for the host labels that have the form <label_prefix>:<value>
    and returns the "<value>" part of the label. None is returned
    if there is not a label matching the pattern
 
    @param hostname: hostname of given DUT.
    @param label_prefix: prefix of label to be matched, e.g., |board:|
    @param afe: afe instance.
 
    @returns A list of labels that match the prefix or 'None'
 
    """
    labels = afe.get_labels(name__startswith=label_prefix,
                            host__hostname__in=[hostname])
    if labels:
        return [l.name.split(label_prefix, 1)[1] for l in labels]
 
 
def get_label_from_afe(hostname, label_prefix, afe):
    """Retrieve a host's specific label from the AFE.
 
    Looks for a host label that has the form <label_prefix>:<value>
    and returns the "<value>" part of the label. None is returned
    if there is not a label matching the pattern
 
    @param hostname: hostname of given DUT.
    @param label_prefix: prefix of label to be matched, e.g., |board:|
    @param afe: afe instance.
    @returns the label that matches the prefix or 'None'
 
    """
    labels = get_labels_from_afe(hostname, label_prefix, afe)
    if labels and len(labels) == 1:
        return labels[0]
 
 
def get_board_from_afe(hostname, afe):
    """Retrieve given host's board from its labels in the AFE.
 
    Looks for a host label of the form "board:<board>", and
    returns the "<board>" part of the label.  `None` is returned
    if there is not a single, unique label matching the pattern.
 
    @param hostname: hostname of given DUT.
    @param afe: afe instance.
    @returns board from label, or `None`.
 
    """
    return get_label_from_afe(hostname, constants.BOARD_PREFIX, afe)
 
 
def get_build_from_afe(hostname, afe):
    """Retrieve the current build for given host from the AFE.
 
    Looks through the host's labels in the AFE to determine its build.
 
    @param hostname: hostname of given DUT.
    @param afe: afe instance.
    @returns The current build or None if it could not find it or if there
             were multiple build labels assigned to this host.
 
    """
    prefix = provision.CROS_VERSION_PREFIX
    build = get_label_from_afe(hostname, prefix + ':', afe)
    if build:
        return build
    return None
 
 
# TODO(fdeng): fix get_sheriffs crbug.com/483254
def get_sheriffs(lab_only=False):
    """
    Polls the javascript file that holds the identity of the sheriff and
    parses it's output to return a list of chromium sheriff email addresses.
    The javascript file can contain the ldap of more than one sheriff, eg:
    document.write('sheriff_one, sheriff_two').
 
    @param lab_only: if True, only pulls lab sheriff.
    @return: A list of chroium.org sheriff email addresses to cc on the bug.
             An empty list if failed to parse the javascript.
    """
    sheriff_ids = []
    sheriff_js_list = _LAB_SHERIFF_JS.split(',')
    if not lab_only:
        sheriff_js_list.extend(_SHERIFF_JS.split(','))
 
    for sheriff_js in sheriff_js_list:
        try:
            url_content = utils.urlopen('%s%s'% (
                _CHROMIUM_BUILD_URL, sheriff_js)).read()
        except (ValueError, IOError) as e:
            logging.warning('could not parse sheriff from url %s%s: %s',
                             _CHROMIUM_BUILD_URL, sheriff_js, str(e))
        except (urllib2.URLError, httplib.HTTPException) as e:
            logging.warning('unexpected error reading from url "%s%s": %s',
                             _CHROMIUM_BUILD_URL, sheriff_js, str(e))
        else:
            ldaps = re.search(r"document.write\('(.*)'\)", url_content)
            if not ldaps:
                logging.warning('Could not retrieve sheriff ldaps for: %s',
                                 url_content)
                continue
            sheriff_ids += ['%s@chromium.org' % alias.replace(' ', '')
                            for alias in ldaps.group(1).split(',')]
    return sheriff_ids
 
 
def remote_wget(source_url, dest_path, ssh_cmd):
    """wget source_url from localhost to dest_path on remote host using ssh.
 
    @param source_url: The complete url of the source of the package to send.
    @param dest_path: The path on the remote host's file system where we would
        like to store the package.
    @param ssh_cmd: The ssh command to use in performing the remote wget.
    """
    wget_cmd = ("wget -O - %s | %s 'cat >%s'" %
                (source_url, ssh_cmd, dest_path))
    utils.run(wget_cmd)
 
 
_MAX_LAB_STATUS_ATTEMPTS = 5
def _get_lab_status(status_url):
    """Grabs the current lab status and message.
 
    @returns The JSON object obtained from the given URL.
 
    """
    retry_waittime = 1
    for _ in range(_MAX_LAB_STATUS_ATTEMPTS):
        try:
            response = urllib2.urlopen(status_url)
        except IOError as e:
            logging.debug('Error occurred when grabbing the lab status: %s.',
                          e)
            time.sleep(retry_waittime)
            continue
        # Check for successful response code.
        if response.getcode() == 200:
            return json.load(response)
        time.sleep(retry_waittime)
    return None
 
 
def _decode_lab_status(lab_status, build):
    """Decode lab status, and report exceptions as needed.
 
    Take a deserialized JSON object from the lab status page, and
    interpret it to determine the actual lab status.  Raise
    exceptions as required to report when the lab is down.
 
    @param build: build name that we want to check the status of.
 
    @raises TestLabException Raised if a request to test for the given
                             status and build should be blocked.
    """
    # First check if the lab is up.
    if not lab_status['general_state'] in LAB_GOOD_STATES:
        raise TestLabException('Chromium OS Test Lab is closed: '
                               '%s.' % lab_status['message'])
 
    # Check if the build we wish to use is disabled.
    # Lab messages should be in the format of:
    #    Lab is 'status' [regex ...] (comment)
    # If the build name matches any regex, it will be blocked.
    build_exceptions = re.search('\[(.*)\]', lab_status['message'])
    if not build_exceptions or not build:
        return
    for build_pattern in build_exceptions.group(1).split():
        if re.match(build_pattern, build):
            raise TestLabException('Chromium OS Test Lab is closed: '
                                   '%s matches %s.' % (
                                           build, build_pattern))
    return
 
 
def is_in_lab():
    """Check if current Autotest instance is in lab
 
    @return: True if the Autotest instance is in lab.
    """
    test_server_name = CONFIG.get_config_value('SERVER', 'hostname')
    return test_server_name.startswith('cautotest')
 
 
def check_lab_status(build):
    """Check if the lab status allows us to schedule for a build.
 
    Checks if the lab is down, or if testing for the requested build
    should be blocked.
 
    @param build: Name of the build to be scheduled for testing.
 
    @raises TestLabException Raised if a request to test for the given
                             status and build should be blocked.
 
    """
    # Ensure we are trying to schedule on the actual lab.
    if not is_in_lab():
        return
 
    # Download the lab status from its home on the web.
    status_url = CONFIG.get_config_value('CROS', 'lab_status_url')
    json_status = _get_lab_status(status_url)
    if json_status is None:
        # We go ahead and say the lab is open if we can't get the status.
        logging.warning('Could not get a status from %s', status_url)
        return
    _decode_lab_status(json_status, build)
 
 
def host_in_lab(hostname):
    return (not utils.in_moblab_ssp()
            and not lsbrelease_utils.is_moblab()
            and utils.host_is_in_lab_zone(hostname))
 
 
def lock_host_with_labels(afe, lock_manager, labels):
    """Lookup and lock one host that matches the list of input labels.
 
    @param afe: An instance of the afe class, as defined in server.frontend.
    @param lock_manager: A lock manager capable of locking hosts, eg the
        one defined in server.cros.host_lock_manager.
    @param labels: A list of labels to look for on hosts.
 
    @return: The hostname of a host matching all labels, and locked through the
        lock_manager. The hostname will be as specified in the database the afe
        object is associated with, i.e if it exists in afe_hosts with a .cros
        suffix, the hostname returned will contain a .cros suffix.
 
    @raises: error.NoEligibleHostException: If no hosts matching the list of
        input labels are available.
    @raises: error.TestError: If unable to lock a host matching the labels.
    """
    potential_hosts = afe.get_hosts(multiple_labels=labels)
    if not potential_hosts:
        raise error.NoEligibleHostException(
                'No devices found with labels %s.' % labels)
 
    # This prevents errors where a fault might seem repeatable
    # because we lock, say, the same packet capturer for each test run.
    random.shuffle(potential_hosts)
    for host in potential_hosts:
        if lock_manager.lock([host.hostname]):
            logging.info('Locked device %s with labels %s.',
                         host.hostname, labels)
            return host.hostname
        else:
            logging.info('Unable to lock device %s with labels %s.',
                         host.hostname, labels)
 
    raise error.TestError('Could not lock a device with labels %s' % labels)
 
 
def get_test_views_from_tko(suite_job_id, tko):
    """Get test name and result for given suite job ID.
 
    @param suite_job_id: ID of suite job.
    @param tko: an instance of TKO as defined in server/frontend.py.
    @return: A dictionary of test status keyed by test name, e.g.,
             {'dummy_Fail.Error': 'ERROR', 'dummy_Fail.NAError': 'TEST_NA'}
    @raise: Exception when there is no test view found.
 
    """
    views = tko.run('get_detailed_test_views', afe_job_id=suite_job_id)
    relevant_views = filter(job_status.view_is_relevant, views)
    if not relevant_views:
        raise Exception('Failed to retrieve job results.')
 
    test_views = {}
    for view in relevant_views:
        test_views[view['test_name']] = view['status']
 
    return test_views
 
 
def get_data_key(prefix, suite, build, board):
    """
    Constructs a key string from parameters.
 
    @param prefix: Prefix for the generating key.
    @param suite: a suite name. e.g., bvt-cq, bvt-inline, dummy
    @param build: The build string. This string should have a consistent
        format eg: x86-mario-release/R26-3570.0.0. If the format of this
        string changes such that we can't determine build_type or branch
        we give up and use the parametes we're sure of instead (suite,
        board). eg:
            1. build = x86-alex-pgo-release/R26-3570.0.0
               branch = 26
               build_type = pgo-release
            2. build = lumpy-paladin/R28-3993.0.0-rc5
               branch = 28
               build_type = paladin
    @param board: The board that this suite ran on.
    @return: The key string used for a dictionary.
    """
    try:
        _board, build_type, branch = ParseBuildName(build)[:3]
    except ParseBuildNameException as e:
        logging.error(str(e))
        branch = 'Unknown'
        build_type = 'Unknown'
    else:
        embedded_str = re.search(r'x86-\w+-(.*)', _board)
        if embedded_str:
            build_type = embedded_str.group(1) + '-' + build_type
 
    data_key_dict = {
        'prefix': prefix,
        'board': board,
        'branch': branch,
        'build_type': build_type,
        'suite': suite,
    }
    return ('%(prefix)s.%(board)s.%(build_type)s.%(branch)s.%(suite)s'
            % data_key_dict)
 
 
def setup_logging(logfile=None, prefix=False):
    """Setup basic logging with all logging info stripped.
 
    Calls to logging will only show the message. No severity is logged.
 
    @param logfile: If specified dump output to a file as well.
    @param prefix: Flag for log prefix. Set to True to add prefix to log
        entries to include timestamp and log level. Default is False.
    """
    # TODO (xixuan): Delete this code when finishing replacing run_suite.py &
    # abort_suite.py in skylab.
    # Remove all existing handlers. client/common_lib/logging_config adds
    # a StreamHandler to logger when modules are imported, e.g.,
    # autotest_lib.client.bin.utils. A new StreamHandler will be added here to
    # log only messages, not severity.
    logging.getLogger().handlers = []
 
    if prefix:
        log_format = '%(asctime)s %(levelname)-5s| %(message)s'
    else:
        log_format = '%(message)s'
 
    screen_handler = logging.StreamHandler()
    screen_handler.setFormatter(logging.Formatter(log_format))
    logging.getLogger().addHandler(screen_handler)
    logging.getLogger().setLevel(logging.INFO)
    if logfile:
        file_handler = logging.FileHandler(logfile)
        file_handler.setFormatter(logging.Formatter(log_format))
        file_handler.setLevel(logging.DEBUG)
        logging.getLogger().addHandler(file_handler)
 
 
def is_shard():
    """Determines if this instance is running as a shard.
 
    Reads the global_config value shard_hostname in the section SHARD.
 
    @return True, if shard_hostname is set, False otherwise.
    """
    hostname = CONFIG.get_config_value('SHARD', 'shard_hostname', default=None)
    return bool(hostname)
 
 
def get_global_afe_hostname():
    """Read the hostname of the global AFE from the global configuration."""
    return CONFIG.get_config_value('SERVER', 'global_afe_hostname')
 
 
def is_restricted_user(username):
    """Determines if a user is in a restricted group.
 
    User in restricted group only have access to master.
 
    @param username: A string, representing a username.
 
    @returns: True if the user is in a restricted group.
    """
    if not username:
        return False
 
    restricted_groups = CONFIG.get_config_value(
            'AUTOTEST_WEB', 'restricted_groups', default='').split(',')
    for group in restricted_groups:
        try:
            if group and username in grp.getgrnam(group).gr_mem:
                return True
        except KeyError as e:
            logging.debug("%s is not a valid group.", group)
    return False
 
 
def get_special_task_status(is_complete, success, is_active):
    """Get the status of a special task.
 
    Emulate a host queue entry status for a special task
    Although SpecialTasks are not HostQueueEntries, it is helpful to
    the user to present similar statuses.
 
    @param is_complete    Boolean if the task is completed.
    @param success        Boolean if the task succeeded.
    @param is_active      Boolean if the task is active.
 
    @return The status of a special task.
    """
    if is_complete:
        if success:
            return host_queue_entry_states.Status.COMPLETED
        return host_queue_entry_states.Status.FAILED
    if is_active:
        return host_queue_entry_states.Status.RUNNING
    return host_queue_entry_states.Status.QUEUED
 
 
def get_special_task_exec_path(hostname, task_id, task_name, time_requested):
    """Get the execution path of the SpecialTask.
 
    This method returns different paths depending on where a
    the task ran:
        * Master: hosts/hostname/task_id-task_type
        * Shard: Master_path/time_created
    This is to work around the fact that a shard can fail independent
    of the master, and be replaced by another shard that has the same
    hosts. Without the time_created stamp the logs of the tasks running
    on the second shard will clobber the logs from the first in google
    storage, because task ids are not globally unique.
 
    @param hostname        Hostname
    @param task_id         Special task id
    @param task_name       Special task name (e.g., Verify, Repair, etc)
    @param time_requested  Special task requested time.
 
    @return An execution path for the task.
    """
    results_path = 'hosts/%s/%s-%s' % (hostname, task_id, task_name.lower())
 
    # If we do this on the master it will break backward compatibility,
    # as there are tasks that currently don't have timestamps. If a host
    # or job has been sent to a shard, the rpc for that host/job will
    # be redirected to the shard, so this global_config check will happen
    # on the shard the logs are on.
    if not is_shard():
        return results_path
 
    # Generate a uid to disambiguate special task result directories
    # in case this shard fails. The simplest uid is the job_id, however
    # in rare cases tasks do not have jobs associated with them (eg:
    # frontend verify), so just use the creation timestamp. The clocks
    # between a shard and master should always be in sync. Any discrepancies
    # will be brought to our attention in the form of job timeouts.
    uid = time_requested.strftime('%Y%d%m%H%M%S')
 
    # TODO: This is a hack, however it is the easiest way to achieve
    # correctness. There is currently some debate over the future of
    # tasks in our infrastructure and refactoring everything right
    # now isn't worth the time.
    return '%s/%s' % (results_path, uid)
 
 
def get_job_tag(id, owner):
    """Returns a string tag for a job.
 
    @param id    Job id
    @param owner Job owner
 
    """
    return '%s-%s' % (id, owner)
 
 
def get_hqe_exec_path(tag, execution_subdir):
    """Returns a execution path to a HQE's results.
 
    @param tag               Tag string for a job associated with a HQE.
    @param execution_subdir  Execution sub-directory string of a HQE.
 
    """
    return os.path.join(tag, execution_subdir)
 
 
def is_inside_chroot():
    """Check if the process is running inside chroot.
 
    This is a wrapper around chromite.lib.cros_build_lib.IsInsideChroot(). The
    method checks if cros_build_lib can be imported first.
 
    @return: True if the process is running inside chroot or cros_build_lib
             cannot be imported.
 
    """
    try:
        # TODO(crbug.com/739466) This module import is delayed because it adds
        # 1-2 seconds to the module import time and most users of site_utils
        # don't need it. The correct fix is to break apart site_utils into more
        # meaningful chunks.
        from chromite.lib import cros_build_lib
    except ImportError:
        logging.warn('Unable to import chromite. Can not detect chroot. '
                     'Defaulting to False')
        return False
    return cros_build_lib.IsInsideChroot()
 
 
def parse_job_name(name):
    """Parse job name to get information including build, board and suite etc.
 
    Suite job created by run_suite follows the naming convention of:
    [build]-test_suites/control.[suite]
    For example: lumpy-release/R46-7272.0.0-test_suites/control.bvt
    The naming convention is defined in rpc_interface.create_suite_job.
 
    Test job created by suite job follows the naming convention of:
    [build]/[suite]/[test name]
    For example: lumpy-release/R46-7272.0.0/bvt/login_LoginSuccess
    The naming convention is defined in
    server/cros/dynamic_suite/tools.create_job_name
 
    Note that pgo and chrome-perf builds will fail the method. Since lab does
    not run test for these builds, they can be ignored.
    Also, tests for Launch Control builds have different naming convention.
    The build ID will be used as build_version.
 
    @param name: Name of the job.
 
    @return: A dictionary containing the test information. The keyvals include:
             build: Name of the build, e.g., lumpy-release/R46-7272.0.0
             build_version: The version of the build, e.g., R46-7272.0.0
             board: Name of the board, e.g., lumpy
             suite: Name of the test suite, e.g., bvt
 
    """
    info = {}
    suite_job_regex = '([^/]*/[^/]*(?:/\d+)?)-test_suites/control\.(.*)'
    test_job_regex = '([^/]*/[^/]*(?:/\d+)?)/([^/]+)/.*'
    match = re.match(suite_job_regex, name)
    if not match:
        match = re.match(test_job_regex, name)
    if match:
        info['build'] = match.groups()[0]
        info['suite'] = match.groups()[1]
        info['build_version'] = info['build'].split('/')[1]
        try:
            info['board'], _, _, _ = ParseBuildName(info['build'])
        except ParseBuildNameException:
            # Try to parse it as Launch Control build
            # Launch Control builds have name format:
            # branch/build_target-build_type/build_id.
            try:
                _, target, build_id = utils.parse_launch_control_build(
                        info['build'])
                build_target, _ = utils.parse_launch_control_target(target)
                if build_target:
                    info['board'] = build_target
                    info['build_version'] = build_id
            except ValueError:
                pass
    return info
 
 
def verify_not_root_user():
    """Simple function to error out if running with uid == 0"""
    if os.getuid() == 0:
        raise error.IllegalUser('This script can not be ran as root.')
 
 
def get_hostname_from_machine(machine):
    """Lookup hostname from a machine string or dict.
 
    @returns: Machine hostname in string format.
    """
    hostname, _ = get_host_info_from_machine(machine)
    return hostname
 
 
def get_host_info_from_machine(machine):
    """Lookup host information from a machine string or dict.
 
    @returns: Tuple of (hostname, afe_host)
    """
    if isinstance(machine, dict):
        return (machine['hostname'], machine['afe_host'])
    else:
        return (machine, EmptyAFEHost())
 
 
def get_afe_host_from_machine(machine):
    """Return the afe_host from the machine dict if possible.
 
    @returns: AFE host object.
    """
    _, afe_host = get_host_info_from_machine(machine)
    return afe_host
 
 
def get_connection_pool_from_machine(machine):
    """Returns the ssh_multiplex.ConnectionPool from machine if possible."""
    if not isinstance(machine, dict):
        return None
    return machine.get('connection_pool')
 
 
def get_creds_abspath(creds_file):
    """Returns the abspath of the credentials file.
 
    If creds_file is already an absolute path, just return it.
    Otherwise, assume it is located in the creds directory
    specified in global_config and return the absolute path.
 
    @param: creds_path, a path to the credentials.
    @return: An absolute path to the credentials file.
    """
    if not creds_file:
        return None
    if os.path.isabs(creds_file):
        return creds_file
    creds_dir = CONFIG.get_config_value('SERVER', 'creds_dir', default='')
    if not creds_dir or not os.path.exists(creds_dir):
        creds_dir = common.autotest_dir
    return os.path.join(creds_dir, creds_file)
 
 
def SetupTsMonGlobalState(*args, **kwargs):
    """Import-safe wrap around chromite.lib.ts_mon_config's setup function.
 
    @param *args: Args to pass through.
    @param **kwargs: Kwargs to pass through.
    """
    try:
        # TODO(crbug.com/739466) This module import is delayed because it adds
        # 1-2 seconds to the module import time and most users of site_utils
        # don't need it. The correct fix is to break apart site_utils into more
        # meaningful chunks.
        from chromite.lib import ts_mon_config
    except ImportError:
        logging.warn('Unable to import chromite. Monarch is disabled.')
        return TrivialContextManager()
 
    try:
        context = ts_mon_config.SetupTsMonGlobalState(*args, **kwargs)
        if hasattr(context, '__exit__'):
            return context
    except Exception as e:
        logging.warning('Caught an exception trying to setup ts_mon, '
                        'monitoring is disabled: %s', e, exc_info=True)
    return TrivialContextManager()
 
 
@contextlib.contextmanager
def TrivialContextManager(*args, **kwargs):
    """Context manager that does nothing.
 
    @param *args: Ignored args
    @param **kwargs: Ignored kwargs.
    """
    yield
 
 
def wait_for_idle_duts(duts, afe, max_wait=IDLE_DUT_WAIT_TIMEOUT):
    """Wait for the hosts to all go idle.
 
    @param duts: List of duts to check for idle state.
    @param afe: afe instance.
    @param max_wait: Max wait time in seconds to wait for duts to be idle.
 
    @returns Boolean True if all hosts are idle or False if any hosts did not
            go idle within max_wait.
    """
    start_time = time.time()
    # We make a shallow copy since we're going to be modifying active_dut_list.
    active_dut_list = duts[:]
    while active_dut_list:
        # Let's rate-limit how often we hit the AFE.
        time.sleep(1)
 
        # Check if we've waited too long.
        if (time.time() - start_time) > max_wait:
            return False
 
        idle_duts = []
        # Get the status for the duts and see if they're in the idle state.
        afe_hosts = afe.get_hosts(active_dut_list)
        idle_duts = [afe_host.hostname for afe_host in afe_hosts
                     if afe_host.status in host_states.IDLE_STATES]
 
        # Take out idle duts so we don't needlessly check them
        # next time around.
        for idle_dut in idle_duts:
            active_dut_list.remove(idle_dut)
 
        logging.info('still waiting for following duts to go idle: %s',
                     active_dut_list)
    return True
 
 
@contextlib.contextmanager
def lock_duts_and_wait(duts, afe, lock_msg='default lock message',
                       max_wait=IDLE_DUT_WAIT_TIMEOUT):
    """Context manager to lock the duts and wait for them to go idle.
 
    @param duts: List of duts to lock.
    @param afe: afe instance.
    @param lock_msg: message for afe on locking this host.
    @param max_wait: Max wait time in seconds to wait for duts to be idle.
 
    @returns Boolean lock_success where True if all duts locked successfully or
             False if we timed out waiting too long for hosts to go idle.
    """
    try:
        locked_duts = []
        duts.sort()
        for dut in duts:
            if afe.lock_host(dut, lock_msg, fail_if_locked=True):
                locked_duts.append(dut)
            else:
                logging.info('%s already locked', dut)
        yield wait_for_idle_duts(locked_duts, afe, max_wait)
    finally:
        afe.unlock_hosts(locked_duts)
 
 
def _get_default_size_info(path):
    """Get the default result size information.
 
    In case directory summary is failed to build, assume the test result is not
    throttled and all result sizes are the size of existing test results.
 
    @return: A namedtuple of result size informations, including:
            client_result_collected_KB: The total size (in KB) of test results
                    collected from test device. Set to be the total size of the
                    given path.
            original_result_total_KB: The original size (in KB) of test results
                    before being trimmed. Set to be the total size of the given
                    path.
            result_uploaded_KB: The total size (in KB) of test results to be
                    uploaded. Set to be the total size of the given path.
            result_throttled: True if test results collection is throttled.
                    It's set to False in this default behavior.
    """
    total_size = file_utils.get_directory_size_kibibytes(path);
    return result_utils_lib.ResultSizeInfo(
            client_result_collected_KB=total_size,
            original_result_total_KB=total_size,
            result_uploaded_KB=total_size,
            result_throttled=False)
 
 
def _report_result_size_metrics(result_size_info):
    """Report result sizes information to metrics.
 
    @param result_size_info: A ResultSizeInfo namedtuple containing information
            of test result sizes.
    """
    fields = {'result_throttled' : result_size_info.result_throttled}
    metrics.Counter(RESULT_METRICS_PREFIX + 'client_result_collected_KB',
                    description='The total size (in KB) of test results '
                    'collected from test device. Set to be the total size of '
                    'the given path.'
                    ).increment_by(result_size_info.client_result_collected_KB,
                                   fields=fields)
    metrics.Counter(RESULT_METRICS_PREFIX + 'original_result_total_KB',
                    description='The original size (in KB) of test results '
                    'before being trimmed.'
                    ).increment_by(result_size_info.original_result_total_KB,
                                   fields=fields)
    metrics.Counter(RESULT_METRICS_PREFIX + 'result_uploaded_KB',
                    description='The total size (in KB) of test results to be '
                    'uploaded.'
                    ).increment_by(result_size_info.result_uploaded_KB,
                                   fields=fields)
 
 
@metrics.SecondsTimerDecorator(
        'chromeos/autotest/result_collection/collect_result_sizes_duration')
def collect_result_sizes(path, log=logging.debug):
    """Collect the result sizes information and build result summary.
 
    It first tries to merge directory summaries and calculate the result sizes
    including:
    client_result_collected_KB: The volume in KB that's transfered from the test
            device.
    original_result_total_KB: The volume in KB that's the original size of the
            result files before being trimmed.
    result_uploaded_KB: The volume in KB that will be uploaded.
    result_throttled: Indicating if the result files were throttled.
 
    If directory summary merging failed for any reason, fall back to use the
    total size of the given result directory.
 
    @param path: Path of the result directory to get size information.
    @param log: The logging method, default to logging.debug
    @return: A ResultSizeInfo namedtuple containing information of test result
             sizes.
    """
    try:
        client_collected_bytes, summary, files = result_utils.merge_summaries(
                path)
        result_size_info = result_utils_lib.get_result_size_info(
                client_collected_bytes, summary)
        html_file = os.path.join(path, result_view.DEFAULT_RESULT_SUMMARY_NAME)
        result_view.build(client_collected_bytes, summary, html_file)
 
        # Delete all summary files after final view is built.
        for summary_file in files:
            os.remove(summary_file)
    except:
        log('Failed to calculate result sizes based on directory summaries for '
            'directory %s. Fall back to record the total size.\nException: %s' %
            (path, traceback.format_exc()))
        result_size_info = _get_default_size_info(path)
 
    _report_result_size_metrics(result_size_info)
 
    return result_size_info