liyujie
2025-08-28 b3810562527858a3b3d98ffa6e9c9c5b0f4a9a8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import logging
 
from chromite.lib import metrics
 
 
DRONE_ACCESSIBILITY_METRIC = metrics.Boolean(
    'chromeos/autotest/scheduler/drone_accessibility')
 
class DroneTaskQueueException(Exception):
    """Generic task queue exception."""
    pass
 
 
class DroneTaskQueue(object):
    """A manager to run queued tasks in drones and gather results from them."""
 
    def __init__(self):
        self.results = dict()
 
 
    def get_results(self):
        """Get a results dictionary keyed on drones.
 
        @return: A dictionary of return values from drones.
        """
        results_copy = self.results.copy()
        self.results.clear()
        return results_copy
 
 
    def execute(self, drones, wait=True):
        """Invoke methods via SSH to a drone.
 
        @param drones: A list of drones with calls to execute.
        @param wait: If True, this method will only return when all the drones
            have returned the result of their respective invocations of
            drone_utility. The `results` map will be cleared.
            If False, the caller must clear the map before the next invocation
            of `execute`, by calling `get_results`.
 
        @return: A dictionary keyed on the drones, containing a list of return
            values from the execution of drone_utility.
 
        @raises DroneTaskQueueException: If the results map isn't empty at the
            time of invocation.
        """
        if self.results:
            raise DroneTaskQueueException(
                    'Cannot clobber results map: %s, it should be cleared '
                    'through get_results.' % self.results)
        for drone in drones:
            if not drone.get_calls():
                logging.debug("Drone %s has no work, skipping. crbug.com/853861"
                              , drone)
                continue
            metric_fields = {
                'drone_hostname': drone.hostname,
                'call_count': len(drone.get_calls())
            }
            drone_reachable = True
            try:
                drone_results = drone.execute_queued_calls()
                logging.debug("Drone %s scheduled. crbug.com/853861", drone)
            except IOError:
                drone_reachable = False
                logging.error(
                    "Drone %s is not reachable by the scheduler.", drone)
                continue
            finally:
                DRONE_ACCESSIBILITY_METRIC.set(
                  drone_reachable, fields=metric_fields)
            if drone in self.results:
                raise DroneTaskQueueException(
                        'Task queue has recorded results for drone %s: %s' %
                        (drone, self.results))
            self.results[drone] = drone_results
        return self.get_results() if wait else None