huangcm
2025-09-01 53d8e046ac1bf2ebe94f671983e3d3be059df91a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
#!/usr/bin/python
#pylint: disable-msg=C0111
 
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
"""Host scheduler.
 
If run as a standalone service, the host scheduler ensures the following:
    1. Hosts will not be assigned to multiple hqes simultaneously. The process
       of assignment in this case refers to the modification of the host_id
       column of a row in the afe_host_queue_entries table, to reflect the host
       id of a leased host matching the dependencies of the job.
    2. Hosts that are not being used by active hqes or incomplete special tasks
       will be released back to the available hosts pool, for acquisition by
       subsequent hqes.
In addition to these guarantees, the host scheduler also confirms that no 2
active hqes/special tasks are assigned the same host, and sets the leased bit
for hosts needed by frontend special tasks. The need for the latter is only
apparent when viewed in the context of the job-scheduler (monitor_db), which
runs special tasks only after their hosts have been leased.
 
** Suport minimum duts requirement for suites (non-inline mode) **
 
Each suite can specify the minimum number of duts it requires by
dropping a 'suite_min_duts' job keyval which defaults to 0.
 
When suites are competing for duts, if any suite has not got minimum duts
it requires, the host scheduler will try to meet the requirement first,
even if other suite may have higher priority or earlier timestamp. Once
all suites' minimum duts requirement have been fullfilled, the host
scheduler will allocate the rest of duts based on job priority and suite job id.
This is to prevent low priority suites from starving when sharing pool with
high-priority suites.
 
Note:
    1. Prevent potential starvation:
       We need to carefully choose |suite_min_duts| for both low and high
       priority suites. If a high priority suite didn't specify it but a low
       priority one does, the high priority suite can be starved!
    2. Restart requirement:
       Restart host scheduler if you manually released a host by setting
       leased=0 in db. This is needed because host scheduler maintains internal
       state of host assignment for suites.
    3. Exchanging duts triggers provisioning:
       TODO(fdeng): There is a chance two suites can exchange duts,
       if the two suites are for different builds, the exchange
       will trigger provisioning. This can be optimized by preferring getting
       hosts with the same build.
"""
 
import argparse
import collections
import datetime
import logging
import os
import signal
import sys
import time
 
import common
from autotest_lib.client.common_lib import utils
from autotest_lib.frontend import setup_django_environment
 
# This import needs to come earlier to avoid using autotest's version of
# httplib2, which is out of date.
try:
    from chromite.lib import metrics
    from chromite.lib import ts_mon_config
except ImportError:
    metrics = utils.metrics_mock
    ts_mon_config = utils.metrics_mock
 
from autotest_lib.client.common_lib import global_config
from autotest_lib.scheduler import email_manager
from autotest_lib.scheduler import query_managers
from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import rdb_utils
from autotest_lib.scheduler import scheduler_lib
from autotest_lib.scheduler import scheduler_models
from autotest_lib.site_utils import server_manager_utils
 
 
_db_manager = None
_shutdown = False
_tick_pause_sec = global_config.global_config.get_config_value(
        'SCHEDULER', 'tick_pause_sec', type=int, default=5)
_monitor_db_host_acquisition = global_config.global_config.get_config_value(
        'SCHEDULER', 'inline_host_acquisition', type=bool, default=True)
_METRICS_PREFIX = 'chromeos/autotest/host_scheduler'
 
class SuiteRecorder(object):
    """Recording the host assignment for suites.
 
    The recorder holds two things:
        * suite_host_num, records how many duts a suite is holding,
          which is a map <suite_job_id -> num_of_hosts>
        * hosts_to_suites, records which host is assigned to which
          suite, it is a map <host_id -> suite_job_id>
    The two datastructure got updated when a host is assigned to or released
    by a job.
 
    The reason to maintain hosts_to_suites is that, when a host is released,
    we need to know which suite it was leased to. Querying the db for the
    latest completed job that has run on a host is slow.  Therefore, we go with
    an alternative: keeping a <host id, suite job id> map
    in memory (for 10K hosts, the map should take less than 1M memory on
    64-bit machine with python 2.7)
 
    """
 
 
    def __init__(self, job_query_manager):
        """Initialize.
 
        @param job_queue_manager: A JobQueueryManager object.
        """
        self.job_query_manager = job_query_manager
        self.suite_host_num, self.hosts_to_suites = (
                self.job_query_manager.get_suite_host_assignment())
 
 
    def record_assignment(self, queue_entry):
        """Record that the hqe has got a host.
 
        @param queue_entry: A scheduler_models.HostQueueEntry object which has
                            got a host.
        """
        parent_id = queue_entry.job.parent_job_id
        if not parent_id:
            return
        if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id:
            logging.error('HQE (id: %d, parent_job_id: %d, host: %s) '
                          'seems already recorded', queue_entry.id,
                          parent_id, queue_entry.host.hostname)
            return
        num_hosts = self.suite_host_num.get(parent_id, 0)
        self.suite_host_num[parent_id] = num_hosts + 1
        self.hosts_to_suites[queue_entry.host_id] = parent_id
        logging.debug('Suite %d got host %s, currently holding %d hosts',
                      parent_id, queue_entry.host.hostname,
                      self.suite_host_num[parent_id])
 
 
    def record_release(self, hosts):
        """Update the record with host releasing event.
 
        @param hosts: A list of scheduler_models.Host objects.
        """
        for host in hosts:
            if host.id in self.hosts_to_suites:
                parent_job_id = self.hosts_to_suites.pop(host.id)
                count = self.suite_host_num[parent_job_id] - 1
                if count == 0:
                    del self.suite_host_num[parent_job_id]
                else:
                    self.suite_host_num[parent_job_id] = count
                logging.debug(
                        'Suite %d releases host %s, currently holding %d hosts',
                        parent_job_id, host.hostname, count)
 
 
    def get_min_duts(self, suite_job_ids):
        """Figure out min duts to request.
 
        Given a set ids of suite jobs, figure out minimum duts to request for
        each suite. It is determined by two factors: min_duts specified
        for each suite in its job keyvals, and how many duts a suite is
        currently holding.
 
        @param suite_job_ids: A set of suite job ids.
 
        @returns: A dictionary, the key is suite_job_id, the value
                  is the minimum number of duts to request.
        """
        suite_min_duts = self.job_query_manager.get_min_duts_of_suites(
                suite_job_ids)
        for parent_id in suite_job_ids:
            min_duts = suite_min_duts.get(parent_id, 0)
            cur_duts = self.suite_host_num.get(parent_id, 0)
            suite_min_duts[parent_id] = max(0, min_duts - cur_duts)
        logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s',
                      suite_min_duts)
        return suite_min_duts
 
 
class BaseHostScheduler(object):
    """Base class containing host acquisition logic.
 
    This class contains all the core host acquisition logic needed by the
    scheduler to run jobs on hosts. It is only capable of releasing hosts
    back to the rdb through its tick, any other action must be instigated by
    the job scheduler.
    """
 
 
    host_assignment = collections.namedtuple('host_assignment', ['host', 'job'])
 
 
    def __init__(self):
        self.host_query_manager = query_managers.AFEHostQueryManager()
 
 
    def _release_hosts(self):
        """Release hosts to the RDB.
 
        Release all hosts that are ready and are currently not being used by an
        active hqe, and don't have a new special task scheduled against them.
 
        @return a list of hosts that are released.
        """
        release_hosts = self.host_query_manager.find_unused_healty_hosts()
        release_hostnames = [host.hostname for host in release_hosts]
        if release_hostnames:
            self.host_query_manager.set_leased(
                    False, hostname__in=release_hostnames)
        return release_hosts
 
 
    @classmethod
    def schedule_host_job(cls, host, queue_entry):
        """Schedule a job on a host.
 
        Scheduling a job involves:
            1. Setting the active bit on the queue_entry.
            2. Scheduling a special task on behalf of the queue_entry.
        Performing these actions will lead the job scheduler through a chain of
        events, culminating in running the test and collecting results from
        the host.
 
        @param host: The host against which to schedule the job.
        @param queue_entry: The queue_entry to schedule.
        """
        if queue_entry.host_id is None:
            queue_entry.set_host(host)
        elif host.id != queue_entry.host_id:
                raise rdb_utils.RDBException('The rdb returned host: %s '
                        'but the job:%s was already assigned a host: %s. ' %
                        (host.hostname, queue_entry.job_id,
                         queue_entry.host.hostname))
        queue_entry.update_field('active', True)
 
        # TODO: crbug.com/373936. The host scheduler should only be assigning
        # jobs to hosts, but the criterion we use to release hosts depends
        # on it not being used by an active hqe. Since we're activating the
        # hqe here, we also need to schedule its first prejob task. OTOH,
        # we could converge to having the host scheduler manager all special
        # tasks, since their only use today is to verify/cleanup/reset a host.
        logging.info('Scheduling pre job tasks for entry: %s', queue_entry)
        queue_entry.schedule_pre_job_tasks()
 
 
    def acquire_hosts(self, host_jobs):
        """Accquire hosts for given jobs.
 
        This method sends jobs that need hosts to rdb.
        Child class can override this method to pipe more args
        to rdb.
 
        @param host_jobs: A list of queue entries that either require hosts,
            or require host assignment validation through the rdb.
 
        @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper
                       for each host acquired on behalf of a queue_entry,
                       or None if a host wasn't found.
        """
        return rdb_lib.acquire_hosts(host_jobs)
 
 
    def find_hosts_for_jobs(self, host_jobs):
        """Find and verify hosts for a list of jobs.
 
        @param host_jobs: A list of queue entries that either require hosts,
            or require host assignment validation through the rdb.
        @return: A generator of tuples of the form (host, queue_entry) for each
            valid host-queue_entry assignment.
        """
        hosts = self.acquire_hosts(host_jobs)
        for host, job in zip(hosts, host_jobs):
            if host:
                yield self.host_assignment(host, job)
 
 
    def tick(self):
        """Schedule core host management activities."""
        self._release_hosts()
 
 
class HostScheduler(BaseHostScheduler):
    """A scheduler capable managing host acquisition for new jobs."""
 
 
    def __init__(self):
        super(HostScheduler, self).__init__()
        self.job_query_manager = query_managers.AFEJobQueryManager()
        # Keeping track on how many hosts each suite is holding
        # {suite_job_id: num_hosts}
        self._suite_recorder = SuiteRecorder(self.job_query_manager)
 
 
    def _record_host_assignment(self, host, queue_entry):
        """Record that |host| is assigned to |queue_entry|.
 
        Record:
            1. How long it takes to assign a host to a job in metadata db.
            2. Record host assignment of a suite.
 
        @param host: A Host object.
        @param queue_entry: A HostQueueEntry object.
        """
        secs_in_queued = (datetime.datetime.now() -
                          queue_entry.job.created_on).total_seconds()
        self._suite_recorder.record_assignment(queue_entry)
 
 
    @metrics.SecondsTimerDecorator(
            '%s/schedule_jobs_duration' % _METRICS_PREFIX)
    def _schedule_jobs(self):
        """Schedule new jobs against hosts."""
 
        new_jobs_with_hosts = 0
        queue_entries = self.job_query_manager.get_pending_queue_entries(
                only_hostless=False)
        unverified_host_jobs = [job for job in queue_entries
                                if not job.is_hostless()]
        if unverified_host_jobs:
            for acquisition in self.find_hosts_for_jobs(unverified_host_jobs):
                self.schedule_host_job(acquisition.host, acquisition.job)
                self._record_host_assignment(acquisition.host, acquisition.job)
                new_jobs_with_hosts += 1
            metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX
                            ).increment_by(new_jobs_with_hosts)
 
        num_jobs_without_hosts = (len(unverified_host_jobs) -
                                  new_jobs_with_hosts)
        metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX
                      ).set(num_jobs_without_hosts)
 
        metrics.Counter('%s/tick' % _METRICS_PREFIX).increment()
 
    @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX)
    def _lease_hosts_of_frontend_tasks(self):
        """Lease hosts of tasks scheduled through the frontend."""
        # We really don't need to get all the special tasks here, just the ones
        # without hqes, but reusing the method used by the scheduler ensures
        # we prioritize the same way.
        lease_hostnames = [
                task.host.hostname for task in
                self.job_query_manager.get_prioritized_special_tasks(
                    only_tasks_with_leased_hosts=False)
                if task.queue_entry_id is None and not task.host.leased]
        # Leasing a leased hosts here shouldn't be a problem:
        # 1. The only way a host can be leased is if it's been assigned to
        #    an active hqe or another similar frontend task, but doing so will
        #    have already precluded it from the list of tasks returned by the
        #    job_query_manager.
        # 2. The unleasing is done based on global conditions. Eg: Even if a
        #    task has already leased a host and we lease it again, the
        #    host scheduler won't release the host till both tasks are complete.
        if lease_hostnames:
            self.host_query_manager.set_leased(
                    True, hostname__in=lease_hostnames)
 
 
    def acquire_hosts(self, host_jobs):
        """Override acquire_hosts.
 
        This method overrides the method in parent class.
        It figures out a set of suites that |host_jobs| belong to;
        and get min_duts requirement for each suite.
        It pipes min_duts for each suite to rdb.
 
        """
        parent_job_ids = set([q.job.parent_job_id
                              for q in host_jobs if q.job.parent_job_id])
        suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids)
        return rdb_lib.acquire_hosts(host_jobs, suite_min_duts)
 
 
    @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX)
    def tick(self):
        logging.info('Calling new tick.')
        logging.info('Leasing hosts for frontend tasks.')
        self._lease_hosts_of_frontend_tasks()
        logging.info('Finding hosts for new jobs.')
        self._schedule_jobs()
        logging.info('Releasing unused hosts.')
        released_hosts = self._release_hosts()
        logging.info('Updating suite assignment with released hosts')
        self._suite_recorder.record_release(released_hosts)
        logging.info('Calling email_manager.')
        email_manager.manager.send_queued_emails()
 
 
class DummyHostScheduler(BaseHostScheduler):
    """A dummy host scheduler that doesn't acquire or release hosts."""
 
    def __init__(self):
        pass
 
 
    def tick(self):
        pass
 
 
def handle_signal(signum, frame):
    """Sigint handler so we don't crash mid-tick."""
    global _shutdown
    _shutdown = True
    logging.info("Shutdown request received.")
 
 
def initialize(testing=False):
    """Initialize the host scheduler."""
    if testing:
        # Don't import testing utilities unless we're in testing mode,
        # as the database imports have side effects.
        from autotest_lib.scheduler import rdb_testing_utils
        rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing(
                db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE)
    global _db_manager
    _db_manager = scheduler_lib.ConnectionManager()
    scheduler_lib.setup_logging(
            os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
            None, timestamped_logfile_prefix='host_scheduler')
    logging.info("Setting signal handler")
    signal.signal(signal.SIGINT, handle_signal)
    signal.signal(signal.SIGTERM, handle_signal)
    scheduler_models.initialize()
 
 
def parse_arguments(argv):
    """
    Parse command line arguments
 
    @param argv: argument list to parse
    @returns:    parsed arguments.
    """
    parser = argparse.ArgumentParser(description='Host scheduler.')
    parser.add_argument('--testing', action='store_true', default=False,
                        help='Start the host scheduler in testing mode.')
    parser.add_argument('--production',
                        help=('Indicate that scheduler is running in production'
                              ' environment and it can use database that is not'
                              ' hosted in localhost. If it is set to False, '
                              'scheduler will fail if database is not in '
                              'localhost.'),
                        action='store_true', default=False)
    parser.add_argument(
            '--lifetime-hours',
            type=float,
            default=None,
            help='If provided, number of hours the scheduler should run for. '
                 'At the expiry of this time, the process will exit '
                 'gracefully.',
    )
    parser.add_argument(
            '--metrics-file',
            help='If provided, drop metrics to this local file instead of '
                 'reporting to ts_mon',
            type=str,
            default=None,
    )
    options = parser.parse_args(argv)
 
    return options
 
 
def main():
    if _monitor_db_host_acquisition:
        logging.info('Please set inline_host_acquisition=False in the shadow '
                     'config before starting the host scheduler.')
        sys.exit(0)
    try:
        options = parse_arguments(sys.argv[1:])
        scheduler_lib.check_production_settings(options)
 
        # If server database is enabled, check if the server has role
        # `host_scheduler`. If the server does not have host_scheduler role,
        # exception will be raised and host scheduler will not continue to run.
        if server_manager_utils.use_server_db():
            server_manager_utils.confirm_server_has_role(hostname='localhost',
                                                         role='host_scheduler')
 
        initialize(options.testing)
 
        with ts_mon_config.SetupTsMonGlobalState(
                'autotest_host_scheduler',
                indirect=True,
                debug_file=options.metrics_file,
        ):
            metrics.Counter('%s/start' % _METRICS_PREFIX).increment()
            process_start_time = time.time()
            host_scheduler = HostScheduler()
            minimum_tick_sec = global_config.global_config.get_config_value(
                    'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float)
            while not _shutdown:
                if _lifetime_expired(options.lifetime_hours,
                                     process_start_time):
                    break
                start = time.time()
                host_scheduler.tick()
                curr_tick_sec = time.time() - start
                if (minimum_tick_sec > curr_tick_sec):
                    time.sleep(minimum_tick_sec - curr_tick_sec)
                else:
                    time.sleep(0.0001)
            logging.info('Shutdown request recieved. Bye! Bye!')
    except server_manager_utils.ServerActionError:
        # This error is expected when the server is not in primary status
        # for host-scheduler role. Thus do not send email for it.
        raise
    except Exception:
        metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment()
        raise
    finally:
        email_manager.manager.send_queued_emails()
        if _db_manager:
            _db_manager.disconnect()
 
 
def _lifetime_expired(lifetime_hours, process_start_time):
    """Returns True if we've expired the process lifetime, False otherwise.
 
    Also sets the global _shutdown so that any background processes also take
    the cue to exit.
    """
    if lifetime_hours is None:
        return False
    if time.time() - process_start_time > lifetime_hours * 3600:
        logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
                     lifetime_hours)
        global _shutdown
        _shutdown = True
        return True
    return False
 
 
if __name__ == '__main__':
    main()