# Copyright 2015 The Chromium OS Authors. All rights reserved.
|
# Use of this source code is governed by a BSD-style license that can be
|
# found in the LICENSE file.
|
|
import logging
|
|
from chromite.lib import metrics
|
|
|
DRONE_ACCESSIBILITY_METRIC = metrics.Boolean(
|
'chromeos/autotest/scheduler/drone_accessibility')
|
|
class DroneTaskQueueException(Exception):
|
"""Generic task queue exception."""
|
pass
|
|
|
class DroneTaskQueue(object):
|
"""A manager to run queued tasks in drones and gather results from them."""
|
|
def __init__(self):
|
self.results = dict()
|
|
|
def get_results(self):
|
"""Get a results dictionary keyed on drones.
|
|
@return: A dictionary of return values from drones.
|
"""
|
results_copy = self.results.copy()
|
self.results.clear()
|
return results_copy
|
|
|
def execute(self, drones, wait=True):
|
"""Invoke methods via SSH to a drone.
|
|
@param drones: A list of drones with calls to execute.
|
@param wait: If True, this method will only return when all the drones
|
have returned the result of their respective invocations of
|
drone_utility. The `results` map will be cleared.
|
If False, the caller must clear the map before the next invocation
|
of `execute`, by calling `get_results`.
|
|
@return: A dictionary keyed on the drones, containing a list of return
|
values from the execution of drone_utility.
|
|
@raises DroneTaskQueueException: If the results map isn't empty at the
|
time of invocation.
|
"""
|
if self.results:
|
raise DroneTaskQueueException(
|
'Cannot clobber results map: %s, it should be cleared '
|
'through get_results.' % self.results)
|
for drone in drones:
|
if not drone.get_calls():
|
logging.debug("Drone %s has no work, skipping. crbug.com/853861"
|
, drone)
|
continue
|
metric_fields = {
|
'drone_hostname': drone.hostname,
|
'call_count': len(drone.get_calls())
|
}
|
drone_reachable = True
|
try:
|
drone_results = drone.execute_queued_calls()
|
logging.debug("Drone %s scheduled. crbug.com/853861", drone)
|
except IOError:
|
drone_reachable = False
|
logging.error(
|
"Drone %s is not reachable by the scheduler.", drone)
|
continue
|
finally:
|
DRONE_ACCESSIBILITY_METRIC.set(
|
drone_reachable, fields=metric_fields)
|
if drone in self.results:
|
raise DroneTaskQueueException(
|
'Task queue has recorded results for drone %s: %s' %
|
(drone, self.results))
|
self.results[drone] = drone_results
|
return self.get_results() if wait else None
|