liyujie
2025-08-28 b3810562527858a3b3d98ffa6e9c9c5b0f4a9a8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
# Copyright Martin J. Bligh, Google Inc 2008
# Released under the GPL v2
 
"""
This class allows you to communicate with the frontend to submit jobs etc
It is designed for writing more sophisiticated server-side control files that
can recursively add and manage other jobs.
 
We turn the JSON dictionaries into real objects that are more idiomatic
 
For docs, see:
    http://www.chromium.org/chromium-os/testing/afe-rpc-infrastructure
    http://docs.djangoproject.com/en/dev/ref/models/querysets/#queryset-api
"""
 
#pylint: disable=missing-docstring
 
import getpass
import os
import re
 
import common
 
from autotest_lib.frontend.afe import rpc_client_lib
from autotest_lib.client.common_lib import control_data
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_states
from autotest_lib.client.common_lib import priorities
from autotest_lib.client.common_lib import utils
from autotest_lib.tko import db
 
try:
    from chromite.lib import metrics
except ImportError:
    metrics = utils.metrics_mock
 
try:
    from autotest_lib.server.site_common import site_utils as server_utils
except:
    from autotest_lib.server import utils as server_utils
form_ntuples_from_machines = server_utils.form_ntuples_from_machines
 
GLOBAL_CONFIG = global_config.global_config
DEFAULT_SERVER = 'autotest'
 
 
def dump_object(header, obj):
    """
    Standard way to print out the frontend objects (eg job, host, acl, label)
    in a human-readable fashion for debugging
    """
    result = header + '\n'
    for key in obj.hash:
        if key == 'afe' or key == 'hash':
            continue
        result += '%20s: %s\n' % (key, obj.hash[key])
    return result
 
 
class RpcClient(object):
    """
    Abstract RPC class for communicating with the autotest frontend
    Inherited for both TKO and AFE uses.
 
    All the constructors go in the afe / tko class.
    Manipulating methods go in the object classes themselves
    """
    def __init__(self, path, user, server, print_log, debug, reply_debug):
        """
        Create a cached instance of a connection to the frontend
 
            user: username to connect as
            server: frontend server to connect to
            print_log: pring a logging message to stdout on every operation
            debug: print out all RPC traffic
        """
        if not user and utils.is_in_container():
            user = GLOBAL_CONFIG.get_config_value('SSP', 'user', default=None)
        if not user:
            user = getpass.getuser()
        if not server:
            if 'AUTOTEST_WEB' in os.environ:
                server = os.environ['AUTOTEST_WEB']
            else:
                server = GLOBAL_CONFIG.get_config_value('SERVER', 'hostname',
                                                        default=DEFAULT_SERVER)
        self.server = server
        self.user = user
        self.print_log = print_log
        self.debug = debug
        self.reply_debug = reply_debug
        headers = {'AUTHORIZATION': self.user}
        rpc_server = rpc_client_lib.add_protocol(server) + path
        if debug:
            print 'SERVER: %s' % rpc_server
            print 'HEADERS: %s' % headers
        self.proxy = rpc_client_lib.get_proxy(rpc_server, headers=headers)
 
 
    def run(self, call, **dargs):
        """
        Make a RPC call to the AFE server
        """
        rpc_call = getattr(self.proxy, call)
        if self.debug:
            print 'DEBUG: %s %s' % (call, dargs)
        try:
            result = utils.strip_unicode(rpc_call(**dargs))
            if self.reply_debug:
                print result
            return result
        except Exception:
            raise
 
 
    def log(self, message):
        if self.print_log:
            print message
 
 
class TKO(RpcClient):
    def __init__(self, user=None, server=None, print_log=True, debug=False,
                 reply_debug=False):
        super(TKO, self).__init__(path='/new_tko/server/noauth/rpc/',
                                  user=user,
                                  server=server,
                                  print_log=print_log,
                                  debug=debug,
                                  reply_debug=reply_debug)
        self._db = None
 
 
    @metrics.SecondsTimerDecorator(
            'chromeos/autotest/tko/get_job_status_duration')
    def get_job_test_statuses_from_db(self, job_id):
        """Get job test statuses from the database.
 
        Retrieve a set of fields from a job that reflect the status of each test
        run within a job.
        fields retrieved: status, test_name, reason, test_started_time,
                          test_finished_time, afe_job_id, job_owner, hostname.
 
        @param job_id: The afe job id to look up.
        @returns a TestStatus object of the resulting information.
        """
        if self._db is None:
            self._db = db.db()
        fields = ['status', 'test_name', 'subdir', 'reason',
                  'test_started_time', 'test_finished_time', 'afe_job_id',
                  'job_owner', 'hostname', 'job_tag']
        table = 'tko_test_view_2'
        where = 'job_tag like "%s-%%"' % job_id
        test_status = []
        # Run commit before we query to ensure that we are pulling the latest
        # results.
        self._db.commit()
        for entry in self._db.select(','.join(fields), table, (where, None)):
            status_dict = {}
            for key,value in zip(fields, entry):
                # All callers expect values to be a str object.
                status_dict[key] = str(value)
            # id is used by TestStatus to uniquely identify each Test Status
            # obj.
            status_dict['id'] = [status_dict['reason'], status_dict['hostname'],
                                 status_dict['test_name']]
            test_status.append(status_dict)
 
        return [TestStatus(self, e) for e in test_status]
 
 
    def get_status_counts(self, job, **data):
        entries = self.run('get_status_counts',
                           group_by=['hostname', 'test_name', 'reason'],
                           job_tag__startswith='%s-' % job, **data)
        return [TestStatus(self, e) for e in entries['groups']]
 
 
class _StableVersionMap(object):
    """
    A mapping from board names to strings naming software versions.
 
    The mapping is meant to allow finding a nominally "stable" version
    of software associated with a given board.  The mapping identifies
    specific versions of software that should be installed during
    operations such as repair.
 
    Conceptually, there are multiple version maps, each handling
    different types of image.  For instance, a single board may have
    both a stable OS image (e.g. for CrOS), and a separate stable
    firmware image.
 
    Each different type of image requires a certain amount of special
    handling, implemented by a subclass of `StableVersionMap`.  The
    subclasses take care of pre-processing of arguments, delegating
    actual RPC calls to this superclass.
 
    @property _afe      AFE object through which to make the actual RPC
                        calls.
    @property _android  Value of the `android` parameter to be passed
                        when calling the `get_stable_version` RPC.
    """
 
    def __init__(self, afe):
        self._afe = afe
 
 
    def get_all_versions(self):
        """
        Get all mappings in the stable versions table.
 
        Extracts the full content of the `stable_version` table
        in the AFE database, and returns it as a dictionary
        mapping board names to version strings.
 
        @return A dictionary mapping board names to version strings.
        """
        return self._afe.run('get_all_stable_versions')
 
 
    def get_version(self, board):
        """
        Get the mapping of one board in the stable versions table.
 
        Look up and return the version mapped to the given board in the
        `stable_versions` table in the AFE database.
 
        @param board  The board to be looked up.
 
        @return The version mapped for the given board.
        """
        return self._afe.run('get_stable_version', board=board)
 
 
    def set_version(self, board, version):
        """
        Change the mapping of one board in the stable versions table.
 
        Set the mapping in the `stable_versions` table in the AFE
        database for the given board to the given version.
 
        @param board    The board to be updated.
        @param version  The new version to be assigned to the board.
        """
        self._afe.run('set_stable_version',
                      version=version, board=board)
 
 
    def delete_version(self, board):
        """
        Remove the mapping of one board in the stable versions table.
 
        Remove the mapping in the `stable_versions` table in the AFE
        database for the given board.
 
        @param board    The board to be updated.
        """
        self._afe.run('delete_stable_version', board=board)
 
 
class _OSVersionMap(_StableVersionMap):
    """
    Abstract stable version mapping for full OS images of various types.
    """
 
    def _version_is_valid(self, version):
        return True
 
    def get_all_versions(self):
        versions = super(_OSVersionMap, self).get_all_versions()
        for board in versions.keys():
            if ('/' in board
                    or not self._version_is_valid(versions[board])):
                del versions[board]
        return versions
 
    def get_version(self, board):
        version = super(_OSVersionMap, self).get_version(board)
        return version if self._version_is_valid(version) else None
 
 
def format_cros_image_name(board, version):
    """
    Return an image name for a given `board` and `version`.
 
    This formats `board` and `version` into a string identifying an
    image file.  The string represents part of a URL for access to
    the image.
 
    The returned image name is typically of a form like
    "falco-release/R55-8872.44.0".
    """
    build_pattern = GLOBAL_CONFIG.get_config_value(
            'CROS', 'stable_build_pattern')
    return build_pattern % (board, version)
 
 
class _CrosVersionMap(_OSVersionMap):
    """
    Stable version mapping for Chrome OS release images.
 
    This class manages a mapping of Chrome OS board names to known-good
    release (or canary) images.  The images selected can be installed on
    DUTs during repair tasks, as a way of getting a DUT into a known
    working state.
    """
 
    def _version_is_valid(self, version):
        return version is not None and '/' not in version
 
    def get_image_name(self, board):
        """
        Return the full image name of the stable version for `board`.
 
        This finds the stable version for `board`, and returns a string
        identifying the associated image as for `format_image_name()`,
        above.
 
        @return A string identifying the image file for the stable
                image for `board`.
        """
        return format_cros_image_name(board, self.get_version(board))
 
 
class _SuffixHackVersionMap(_StableVersionMap):
    """
    Abstract super class for mappings using a pseudo-board name.
 
    For non-OS image type mappings, we look them up in the
    `stable_versions` table by constructing a "pseudo-board" from the
    real board name plus a suffix string that identifies the image type.
    So, for instance the name "lulu/firmware" is used to look up the
    FAFT firmware version for lulu boards.
    """
 
    # _SUFFIX - The suffix used in constructing the "pseudo-board"
    # lookup key.  Each subclass must define this value for itself.
    #
    _SUFFIX = None
 
    def get_all_versions(self):
        # Get all the mappings from the AFE, extract just the mappings
        # with our suffix, and replace the pseudo-board name keys with
        # the real board names.
        #
        all_versions = super(
                _SuffixHackVersionMap, self).get_all_versions()
        return {
            board[0 : -len(self._SUFFIX)]: all_versions[board]
                for board in all_versions.keys()
                    if board.endswith(self._SUFFIX)
        }
 
 
    def get_version(self, board):
        board += self._SUFFIX
        return super(_SuffixHackVersionMap, self).get_version(board)
 
 
    def set_version(self, board, version):
        board += self._SUFFIX
        super(_SuffixHackVersionMap, self).set_version(board, version)
 
 
    def delete_version(self, board):
        board += self._SUFFIX
        super(_SuffixHackVersionMap, self).delete_version(board)
 
 
class _FAFTVersionMap(_SuffixHackVersionMap):
    """
    Stable version mapping for firmware versions used in FAFT repair.
 
    When DUTs used for FAFT fail repair, stable firmware may need to be
    flashed directly from original tarballs.  The FAFT firmware version
    mapping finds the appropriate tarball for a given board.
    """
 
    _SUFFIX = '/firmware'
 
    def get_version(self, board):
        # If there's no mapping for `board`, the lookup will return the
        # default CrOS version mapping.  To eliminate that case, we
        # require a '/' character in the version, since CrOS versions
        # won't match that.
        #
        # TODO(jrbarnette):  This is, of course, a hack.  Ultimately,
        # the right fix is to move handling to the RPC server side.
        #
        version = super(_FAFTVersionMap, self).get_version(board)
        return version if '/' in version else None
 
 
class _FirmwareVersionMap(_SuffixHackVersionMap):
    """
    Stable version mapping for firmware supplied in Chrome OS images.
 
    A Chrome OS image bundles a version of the firmware that the
    device should update to when the OS version is installed during
    AU.
 
    Test images suppress the firmware update during AU.  Instead, during
    repair and verify we check installed firmware on a DUT, compare it
    against the stable version mapping for the board, and update when
    the DUT is out-of-date.
    """
 
    _SUFFIX = '/rwfw'
 
    def get_version(self, board):
        # If there's no mapping for `board`, the lookup will return the
        # default CrOS version mapping.  To eliminate that case, we
        # require the version start with "Google_", since CrOS versions
        # won't match that.
        #
        # TODO(jrbarnette):  This is, of course, a hack.  Ultimately,
        # the right fix is to move handling to the RPC server side.
        #
        version = super(_FirmwareVersionMap, self).get_version(board)
        return version if version.startswith('Google_') else None
 
 
class AFE(RpcClient):
 
    # Known image types for stable version mapping objects.
    # CROS_IMAGE_TYPE - Mappings for Chrome OS images.
    # FAFT_IMAGE_TYPE - Mappings for Firmware images for FAFT repair.
    # FIRMWARE_IMAGE_TYPE - Mappings for released RW Firmware images.
    #
    CROS_IMAGE_TYPE = 'cros'
    FAFT_IMAGE_TYPE = 'faft'
    FIRMWARE_IMAGE_TYPE = 'firmware'
 
    _IMAGE_MAPPING_CLASSES = {
        CROS_IMAGE_TYPE: _CrosVersionMap,
        FAFT_IMAGE_TYPE: _FAFTVersionMap,
        FIRMWARE_IMAGE_TYPE: _FirmwareVersionMap,
    }
 
 
    def __init__(self, user=None, server=None, print_log=True, debug=False,
                 reply_debug=False, job=None):
        self.job = job
        super(AFE, self).__init__(path='/afe/server/noauth/rpc/',
                                  user=user,
                                  server=server,
                                  print_log=print_log,
                                  debug=debug,
                                  reply_debug=reply_debug)
 
 
    def get_stable_version_map(self, image_type):
        """
        Return a stable version mapping for the given image type.
 
        @return An object mapping board names to version strings for
                software of the given image type.
        """
        return self._IMAGE_MAPPING_CLASSES[image_type](self)
 
 
    def host_statuses(self, live=None):
        dead_statuses = ['Repair Failed', 'Repairing']
        statuses = self.run('get_static_data')['host_statuses']
        if live == True:
            return list(set(statuses) - set(dead_statuses))
        if live == False:
            return dead_statuses
        else:
            return statuses
 
 
    @staticmethod
    def _dict_for_host_query(hostnames=(), status=None, label=None):
        query_args = {}
        if hostnames:
            query_args['hostname__in'] = hostnames
        if status:
            query_args['status'] = status
        if label:
            query_args['labels__name'] = label
        return query_args
 
 
    def get_hosts(self, hostnames=(), status=None, label=None, **dargs):
        query_args = dict(dargs)
        query_args.update(self._dict_for_host_query(hostnames=hostnames,
                                                    status=status,
                                                    label=label))
        hosts = self.run('get_hosts', **query_args)
        return [Host(self, h) for h in hosts]
 
 
    def get_hostnames(self, status=None, label=None, **dargs):
        """Like get_hosts() but returns hostnames instead of Host objects."""
        # This implementation can be replaced with a more efficient one
        # that does not query for entire host objects in the future.
        return [host_obj.hostname for host_obj in
                self.get_hosts(status=status, label=label, **dargs)]
 
 
    def reverify_hosts(self, hostnames=(), status=None, label=None):
        query_args = dict(locked=False,
                          aclgroup__users__login=self.user)
        query_args.update(self._dict_for_host_query(hostnames=hostnames,
                                                    status=status,
                                                    label=label))
        return self.run('reverify_hosts', **query_args)
 
 
    def repair_hosts(self, hostnames=(), status=None, label=None):
        query_args = dict(locked=False,
                          aclgroup__users__login=self.user)
        query_args.update(self._dict_for_host_query(hostnames=hostnames,
                                                    status=status,
                                                    label=label))
        return self.run('repair_hosts', **query_args)
 
 
    def create_host(self, hostname, **dargs):
        id = self.run('add_host', hostname=hostname, **dargs)
        return self.get_hosts(id=id)[0]
 
 
    def get_host_attribute(self, attr, **dargs):
        host_attrs = self.run('get_host_attribute', attribute=attr, **dargs)
        return [HostAttribute(self, a) for a in host_attrs]
 
 
    def set_host_attribute(self, attr, val, **dargs):
        self.run('set_host_attribute', attribute=attr, value=val, **dargs)
 
 
    def get_labels(self, **dargs):
        labels = self.run('get_labels', **dargs)
        return [Label(self, l) for l in labels]
 
 
    def create_label(self, name, **dargs):
        id = self.run('add_label', name=name, **dargs)
        return self.get_labels(id=id)[0]
 
 
    def get_acls(self, **dargs):
        acls = self.run('get_acl_groups', **dargs)
        return [Acl(self, a) for a in acls]
 
 
    def create_acl(self, name, **dargs):
        id = self.run('add_acl_group', name=name, **dargs)
        return self.get_acls(id=id)[0]
 
 
    def get_users(self, **dargs):
        users = self.run('get_users', **dargs)
        return [User(self, u) for u in users]
 
 
    def generate_control_file(self, tests, **dargs):
        ret = self.run('generate_control_file', tests=tests, **dargs)
        return ControlFile(self, ret)
 
 
    def get_jobs(self, summary=False, **dargs):
        if summary:
            jobs_data = self.run('get_jobs_summary', **dargs)
        else:
            jobs_data = self.run('get_jobs', **dargs)
        jobs = []
        for j in jobs_data:
            job = Job(self, j)
            # Set up some extra information defaults
            job.testname = re.sub('\s.*', '', job.name) # arbitrary default
            job.platform_results = {}
            job.platform_reasons = {}
            jobs.append(job)
        return jobs
 
 
    def get_host_queue_entries(self, **kwargs):
        """Find JobStatus objects matching some constraints.
 
        @param **kwargs: Arguments to pass to the RPC
        """
        entries = self.run('get_host_queue_entries', **kwargs)
        return self._entries_to_statuses(entries)
 
 
    def get_host_queue_entries_by_insert_time(self, **kwargs):
        """Like get_host_queue_entries, but using the insert index table.
 
        @param **kwargs: Arguments to pass to the RPC
        """
        entries = self.run('get_host_queue_entries_by_insert_time', **kwargs)
        return self._entries_to_statuses(entries)
 
 
    def _entries_to_statuses(self, entries):
        """Converts HQEs to JobStatuses
 
        Sadly, get_host_queue_entries doesn't return platforms, we have
        to get those back from an explicit get_hosts queury, then patch
        the new host objects back into the host list.
 
        :param entries: A list of HQEs from get_host_queue_entries or
          get_host_queue_entries_by_insert_time.
        """
        job_statuses = [JobStatus(self, e) for e in entries]
        hostnames = [s.host.hostname for s in job_statuses if s.host]
        hosts = {}
        for host in self.get_hosts(hostname__in=hostnames):
            hosts[host.hostname] = host
        for status in job_statuses:
            if status.host:
                status.host = hosts.get(status.host.hostname)
        # filter job statuses that have either host or meta_host
        return [status for status in job_statuses if (status.host or
                                                      status.meta_host)]
 
 
    def get_special_tasks(self, **data):
        tasks = self.run('get_special_tasks', **data)
        return [SpecialTask(self, t) for t in tasks]
 
 
    def get_host_special_tasks(self, host_id, **data):
        tasks = self.run('get_host_special_tasks',
                         host_id=host_id, **data)
        return [SpecialTask(self, t) for t in tasks]
 
 
    def get_host_status_task(self, host_id, end_time):
        task = self.run('get_host_status_task',
                        host_id=host_id, end_time=end_time)
        return SpecialTask(self, task) if task else None
 
 
    def get_host_diagnosis_interval(self, host_id, end_time, success):
        return self.run('get_host_diagnosis_interval',
                        host_id=host_id, end_time=end_time,
                        success=success)
 
 
    def create_job(self, control_file, name=' ',
                   priority=priorities.Priority.DEFAULT,
                   control_type=control_data.CONTROL_TYPE_NAMES.CLIENT,
                   **dargs):
        id = self.run('create_job', name=name, priority=priority,
                 control_file=control_file, control_type=control_type, **dargs)
        return self.get_jobs(id=id)[0]
 
 
    def abort_jobs(self, jobs):
        """Abort a list of jobs.
 
        Already completed jobs will not be affected.
 
        @param jobs: List of job ids to abort.
        """
        for job in jobs:
            self.run('abort_host_queue_entries', job_id=job)
 
 
    def get_hosts_by_attribute(self, attribute, value):
        """
        Get the list of hosts that share the same host attribute value.
 
        @param attribute: String of the host attribute to check.
        @param value: String of the value that is shared between hosts.
 
        @returns List of hostnames that all have the same host attribute and
                 value.
        """
        return self.run('get_hosts_by_attribute',
                        attribute=attribute, value=value)
 
 
    def lock_host(self, host, lock_reason, fail_if_locked=False):
        """
        Lock the given host with the given lock reason.
 
        Locking a host that's already locked using the 'modify_hosts' rpc
        will raise an exception. That's why fail_if_locked exists so the
        caller can determine if the lock succeeded or failed.  This will
        save every caller from wrapping lock_host in a try-except.
 
        @param host: hostname of host to lock.
        @param lock_reason: Reason for locking host.
        @param fail_if_locked: Return False if host is already locked.
 
        @returns Boolean, True if lock was successful, False otherwise.
        """
        try:
            self.run('modify_hosts',
                     host_filter_data={'hostname': host},
                     update_data={'locked': True,
                                  'lock_reason': lock_reason})
        except Exception:
            return not fail_if_locked
        return True
 
 
    def unlock_hosts(self, locked_hosts):
        """
        Unlock the hosts.
 
        Unlocking a host that's already unlocked will do nothing so we don't
        need any special try-except clause here.
 
        @param locked_hosts: List of hostnames of hosts to unlock.
        """
        self.run('modify_hosts',
                 host_filter_data={'hostname__in': locked_hosts},
                 update_data={'locked': False,
                              'lock_reason': ''})
 
 
class TestResults(object):
    """
    Container class used to hold the results of the tests for a job
    """
    def __init__(self):
        self.good = []
        self.fail = []
        self.pending = []
 
 
    def add(self, result):
        if result.complete_count > result.pass_count:
            self.fail.append(result)
        elif result.incomplete_count > 0:
            self.pending.append(result)
        else:
            self.good.append(result)
 
 
class RpcObject(object):
    """
    Generic object used to construct python objects from rpc calls
    """
    def __init__(self, afe, hash):
        self.afe = afe
        self.hash = hash
        self.__dict__.update(hash)
 
 
    def __str__(self):
        return dump_object(self.__repr__(), self)
 
 
class ControlFile(RpcObject):
    """
    AFE control file object
 
    Fields: synch_count, dependencies, control_file, is_server
    """
    def __repr__(self):
        return 'CONTROL FILE: %s' % self.control_file
 
 
class Label(RpcObject):
    """
    AFE label object
 
    Fields:
        name, invalid, platform, kernel_config, id, only_if_needed
    """
    def __repr__(self):
        return 'LABEL: %s' % self.name
 
 
    def add_hosts(self, hosts):
        # We must use the label's name instead of the id because label ids are
        # not consistent across master-shard.
        return self.afe.run('label_add_hosts', id=self.name, hosts=hosts)
 
 
    def remove_hosts(self, hosts):
        # We must use the label's name instead of the id because label ids are
        # not consistent across master-shard.
        return self.afe.run('label_remove_hosts', id=self.name, hosts=hosts)
 
 
class Acl(RpcObject):
    """
    AFE acl object
 
    Fields:
        users, hosts, description, name, id
    """
    def __repr__(self):
        return 'ACL: %s' % self.name
 
 
    def add_hosts(self, hosts):
        self.afe.log('Adding hosts %s to ACL %s' % (hosts, self.name))
        return self.afe.run('acl_group_add_hosts', self.id, hosts)
 
 
    def remove_hosts(self, hosts):
        self.afe.log('Removing hosts %s from ACL %s' % (hosts, self.name))
        return self.afe.run('acl_group_remove_hosts', self.id, hosts)
 
 
    def add_users(self, users):
        self.afe.log('Adding users %s to ACL %s' % (users, self.name))
        return self.afe.run('acl_group_add_users', id=self.name, users=users)
 
 
class Job(RpcObject):
    """
    AFE job object
 
    Fields:
        name, control_file, control_type, synch_count, reboot_before,
        run_verify, priority, email_list, created_on, dependencies,
        timeout, owner, reboot_after, id
    """
    def __repr__(self):
        return 'JOB: %s' % self.id
 
 
class JobStatus(RpcObject):
    """
    AFE job_status object
 
    Fields:
        status, complete, deleted, meta_host, host, active, execution_subdir, id
    """
    def __init__(self, afe, hash):
        super(JobStatus, self).__init__(afe, hash)
        self.job = Job(afe, self.job)
        if getattr(self, 'host'):
            self.host = Host(afe, self.host)
 
 
    def __repr__(self):
        if self.host and self.host.hostname:
            hostname = self.host.hostname
        else:
            hostname = 'None'
        return 'JOB STATUS: %s-%s' % (self.job.id, hostname)
 
 
class SpecialTask(RpcObject):
    """
    AFE special task object
    """
    def __init__(self, afe, hash):
        super(SpecialTask, self).__init__(afe, hash)
        self.host = Host(afe, self.host)
 
 
    def __repr__(self):
        return 'SPECIAL TASK: %s' % self.id
 
 
class Host(RpcObject):
    """
    AFE host object
 
    Fields:
        status, lock_time, locked_by, locked, hostname, invalid,
        labels, platform, protection, dirty, id
    """
    def __repr__(self):
        return 'HOST OBJECT: %s' % self.hostname
 
 
    def show(self):
        labels = list(set(self.labels) - set([self.platform]))
        print '%-6s %-7s %-7s %-16s %s' % (self.hostname, self.status,
                                           self.locked, self.platform,
                                           ', '.join(labels))
 
 
    def delete(self):
        return self.afe.run('delete_host', id=self.id)
 
 
    def modify(self, **dargs):
        return self.afe.run('modify_host', id=self.id, **dargs)
 
 
    def get_acls(self):
        return self.afe.get_acls(hosts__hostname=self.hostname)
 
 
    def add_acl(self, acl_name):
        self.afe.log('Adding ACL %s to host %s' % (acl_name, self.hostname))
        return self.afe.run('acl_group_add_hosts', id=acl_name,
                            hosts=[self.hostname])
 
 
    def remove_acl(self, acl_name):
        self.afe.log('Removing ACL %s from host %s' % (acl_name, self.hostname))
        return self.afe.run('acl_group_remove_hosts', id=acl_name,
                            hosts=[self.hostname])
 
 
    def get_labels(self):
        return self.afe.get_labels(host__hostname__in=[self.hostname])
 
 
    def add_labels(self, labels):
        self.afe.log('Adding labels %s to host %s' % (labels, self.hostname))
        return self.afe.run('host_add_labels', id=self.id, labels=labels)
 
 
    def remove_labels(self, labels):
        self.afe.log('Removing labels %s from host %s' % (labels,self.hostname))
        return self.afe.run('host_remove_labels', id=self.id, labels=labels)
 
 
    def is_available(self):
        """Check whether DUT host is available.
 
        @return: bool
        """
        return not (self.locked
                    or self.status in host_states.UNAVAILABLE_STATES)
 
 
class User(RpcObject):
    def __repr__(self):
        return 'USER: %s' % self.login
 
 
class TestStatus(RpcObject):
    """
    TKO test status object
 
    Fields:
        test_idx, hostname, testname, id
        complete_count, incomplete_count, group_count, pass_count
    """
    def __repr__(self):
        return 'TEST STATUS: %s' % self.id
 
 
class HostAttribute(RpcObject):
    """
    AFE host attribute object
 
    Fields:
        id, host, attribute, value
    """
    def __repr__(self):
        return 'HOST ATTRIBUTE %d' % self.id