lin
2025-08-14 dae8bad597b6607a449b32bf76c523423f7720ed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/python
#
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import datetime as datetime_base
import logging
from datetime import datetime
 
import common
 
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import time_utils
from autotest_lib.server import utils
from autotest_lib.server.cros.dynamic_suite import reporting_utils
from autotest_lib.server.lib import status_history
 
CONFIG = global_config.global_config
 
 
class DUTsNotAvailableError(utils.TestLabException):
    """Raised when a DUT label combination is not available in the lab."""
 
 
class NotEnoughDutsError(utils.TestLabException):
    """Rasied when the lab doesn't have the minimum number of duts."""
 
    def __init__(self, labels, num_available, num_required, hosts):
        """Initialize instance.
 
        Please pass arguments by keyword.
 
        @param labels: Labels required, including board an pool labels.
        @param num_available: Number of available hosts.
        @param num_required: Number of hosts required.
        @param hosts: Sequence of Host instances for given board and pool.
        """
        self.labels = labels
        self.num_available = num_available
        self.num_required = num_required
        self.hosts = hosts
        self.bug_id = None
        self.suite_name = None
        self.build = None
 
 
    def __repr__(self):
        return (
            '<{cls} at 0x{id:x} with'
            ' labels={this.labels!r},'
            ' num_available={this.num_available!r},'
            ' num_required={this.num_required!r},'
            ' bug_id={this.bug_id!r},'
            ' suite_name={this.suite_name!r},'
            ' build={this.build!r}>'
            .format(cls=type(self).__name__, id=id(self), this=self)
        )
 
 
    def __str__(self):
        msg_parts = [
            'Not enough DUTs for requirements: {this.labels};'
            ' required: {this.num_required}, found: {this.num_available}'
        ]
        format_dict = {'this': self}
        if self.bug_id is not None:
            msg_parts.append('bug: {bug_url}')
            format_dict['bug_url'] = reporting_utils.link_crbug(self.bug_id)
        if self.suite_name is not None:
            msg_parts.append('suite: {this.suite_name}')
        if self.build is not None:
            msg_parts.append('build: {this.build}')
        return ', '.join(msg_parts).format(**format_dict)
 
 
    def add_bug_id(self, bug_id):
        """Add crbug id associated with this exception.
 
        @param bug_id  crbug id whose str() value is used in a crbug URL.
        """
        self.bug_id = bug_id
 
 
    def add_suite_name(self, suite_name):
        """Add name of test suite that needed the DUTs.
 
        @param suite_name  Name of test suite.
        """
        self.suite_name = suite_name
 
 
    def add_build(self, build):
        """Add name of build of job that needed the DUTs.
 
        @param build  Name of build.
        """
        self.build = build
 
 
class SimpleTimer(object):
    """A simple timer used to periodically check if a deadline has passed."""
 
    def _reset(self):
        """Reset the deadline."""
        if not self.interval_hours or self.interval_hours < 0:
            logging.error('Bad interval %s', self.interval_hours)
            self.deadline = None
            return
        self.deadline = datetime.now() + datetime_base.timedelta(
                hours=self.interval_hours)
 
 
    def __init__(self, interval_hours=0.5):
        """Initialize a simple periodic deadline timer.
 
        @param interval_hours: Interval of the deadline.
        """
        self.interval_hours = interval_hours
        self._reset()
 
 
    def poll(self):
        """Poll the timer to see if we've hit the deadline.
 
        This method resets the deadline if it has passed. If the deadline
        hasn't been set, or the current time is less than the deadline, the
        method returns False.
 
        @return: True if the deadline has passed, False otherwise.
        """
        if not self.deadline or datetime.now() < self.deadline:
            return False
        self._reset()
        return True
 
 
class JobTimer(object):
    """Utility class capable of measuring job timeouts.
    """
 
    # Format used in datetime - string conversion.
    time_format = '%m-%d-%Y [%H:%M:%S]'
 
    def __init__(self, job_created_time, timeout_mins):
        """JobTimer constructor.
 
        @param job_created_time: float representing the time a job was
            created. Eg: time.time()
        @param timeout_mins: float representing the timeout in minutes.
        """
        self.job_created_time = datetime.fromtimestamp(job_created_time)
        self.timeout_hours = datetime_base.timedelta(hours=timeout_mins/60.0)
        self.debug_output_timer = SimpleTimer(interval_hours=0.5)
        self.past_halftime = False
 
 
    @classmethod
    def format_time(cls, datetime_obj):
        """Get the string formatted version of the datetime object.
 
        @param datetime_obj: A datetime.datetime object.
            Eg: datetime.datetime.now()
 
        @return: A formatted string containing the date/time of the
            input datetime.
        """
        return datetime_obj.strftime(cls.time_format)
 
 
    def elapsed_time(self):
        """Get the time elapsed since this job was created.
 
        @return: A timedelta object representing the elapsed time.
        """
        return datetime.now() - self.job_created_time
 
 
    def is_suite_timeout(self):
        """Check if the suite timed out.
 
        @return: True if more than timeout_hours has elapsed since the suite job
            was created.
        """
        if self.elapsed_time() >= self.timeout_hours:
            logging.info('Suite timed out. Started on %s, timed out on %s',
                         self.format_time(self.job_created_time),
                         self.format_time(datetime.now()))
            return True
        return False
 
 
    def first_past_halftime(self):
        """Check if we just crossed half time.
 
        This method will only return True once, the first time it is called
        after a job's elapsed time is past half its timeout.
 
        @return True: If this is the first call of the method after halftime.
        """
        if (not self.past_halftime and
            self.elapsed_time() > self.timeout_hours/2):
            self.past_halftime = True
            return True
        return False
 
 
class RPCHelper(object):
    """A class to help diagnose a suite run through the rpc interface.
    """
 
    def __init__(self, rpc_interface):
        """Constructor for rpc helper class.
 
        @param rpc_interface: An rpc object, eg: A RetryingAFE instance.
        """
        self.rpc_interface = rpc_interface
 
 
    def diagnose_pool(self, labels, time_delta_hours, limit=10):
        """Log diagnostic information about a timeout for a board/pool.
 
        @param labels: DUT label dependencies, including board and pool
                       labels.
        @param time_delta_hours: The time from which we should log information.
            This is a datetime.timedelta object, as stored by the JobTimer.
        @param limit: The maximum number of jobs per host, to log.
 
        @raises proxy.JSONRPCException: For exceptions thrown across the wire.
        """
        end_time = datetime.now()
        start_time = end_time - time_delta_hours
        host_histories = status_history.HostJobHistory.get_multiple_histories(
                self.rpc_interface,
                time_utils.to_epoch_time(start_time),
                time_utils.to_epoch_time(end_time),
                labels,
        )
        if not host_histories:
            logging.error('No hosts found for labels %r', labels)
            return
        status_map = {
            status_history.UNUSED: 'Unused',
            status_history.UNKNOWN: 'No job history',
            status_history.WORKING: 'Working',
            status_history.BROKEN: 'Failed repair'
        }
        for history in host_histories:
            count = 0
            job_info =''
            for job in history:
                start_time = (
                        time_utils.epoch_time_to_date_string(job.start_time))
                job_info += ('%s %s started on: %s status %s\n' %
                        (job.id, job.name, start_time, job.job_status))
                count += 1
                if count >= limit:
                    break
            host = history.host
            logging.error('host: %s, status: %s, locked: %s '
                          'diagnosis: %s\n'
                          'labels: %s\nLast %s jobs within %s:\n'
                          '%s',
                          history.hostname, host.status, host.locked,
                          status_map[history.last_diagnosis()[0]],
                          host.labels, limit, time_delta_hours,
                          job_info)
 
 
    def check_dut_availability(self, labels, minimum_duts=0,
                               skip_duts_check=False):
        """Check if DUT availability for a given board and pool is less than
        minimum.
 
        @param labels: DUT label dependencies, including board and pool
                       labels.
        @param minimum_duts: Minimum Number of available machines required to
                             run the suite. Default is set to 0, which means do
                             not force the check of available machines before
                             running the suite.
        @param skip_duts_check: If True, skip minimum available DUTs check.
        @raise: NotEnoughDutsError if DUT availability is lower than minimum.
        @raise: DUTsNotAvailableError if no host found for requested
                board/pool.
        """
        if minimum_duts == 0:
            return
 
        hosts = self.rpc_interface.get_hosts(
                invalid=False, multiple_labels=labels)
        if not hosts:
            raise DUTsNotAvailableError(
                    'No hosts found for labels %r. The test lab '
                    'currently does not cover test for those DUTs.' %
                    (labels,))
 
        if skip_duts_check:
            # Bypass minimum avilable DUTs check
            logging.debug('skip_duts_check is on, do not enforce minimum '
                          'DUTs check.')
            return
 
        if len(hosts) < minimum_duts:
            logging.debug('The total number of DUTs for %r is %d, '
                          'which is less than %d, the required minimum '
                          'number of available DUTS', labels, len(hosts),
                          minimum_duts)
 
        available_hosts = 0
        for host in hosts:
            if host.is_available():
                available_hosts += 1
        logging.debug('%d of %d DUTs are available for %r.',
                      available_hosts, len(hosts), labels)
        if available_hosts < minimum_duts:
            raise NotEnoughDutsError(
                labels=labels,
                num_available=available_hosts,
                num_required=minimum_duts,
                hosts=hosts)
 
 
    def diagnose_job(self, job_id, instance_server):
        """Diagnose a suite job.
 
        Logs information about the jobs that are still to run in the suite.
 
        @param job_id: The id of the suite job to get information about.
            No meaningful information gets logged if the id is for a sub-job.
        @param instance_server: The instance server.
            Eg: cautotest, cautotest-cq, localhost.
        """
        incomplete_jobs = self.rpc_interface.get_jobs(
                parent_job_id=job_id, summary=True,
                hostqueueentry__complete=False)
        if incomplete_jobs:
            logging.info('\n%s printing summary of incomplete jobs (%s):\n',
                         JobTimer.format_time(datetime.now()),
                         len(incomplete_jobs))
            for job in incomplete_jobs:
                logging.info('%s: %s', job.testname[job.testname.rfind('/')+1:],
                             reporting_utils.link_job(job.id, instance_server))
        else:
            logging.info('All jobs in suite have already completed.')