#!/usr/bin/env python2
|
|
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
# Use of this source code is governed by a BSD-style license that can be
|
# found in the LICENSE file.
|
|
"""Load generator for devserver.
|
|
Example usage:
|
|
# Find DUTs in suites pool to test with:
|
atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready
|
|
# Lock DUTs:
|
atest host mod -l -r 'quick provision testing' DUT1 DUT2
|
|
# Create config file with DUTs to test and builds to use.
|
cat >config.json <<EOD
|
{
|
"BOARD": {
|
"duts": [
|
"chromeosX-rowY-rackZ-hostA",
|
"chromeosX-rowY-rackZ-hostB",
|
],
|
"versions": [
|
"auron_paine-paladin/R65-10208.0.0-rc2",
|
"auron_paine-paladin/R65-10208.0.0-rc3",
|
"auron_paine-paladin/R65-10209.0.0-rc1"
|
]
|
},
|
}
|
EOD
|
|
# Do 100 total provisions, aiming to have 10 active simultaneously.
|
loadtest.py $DS config.json --simultaneous 10 --total 100
|
|
# Unlock DUTs:
|
atest host mod -u DUT1 DUT2
|
"""
|
|
import collections
|
import datetime
|
import json
|
import random
|
import re
|
import signal
|
import subprocess
|
import sys
|
import time
|
|
import common
|
from autotest_lib.client.common_lib import time_utils
|
from autotest_lib.client.common_lib.cros import dev_server
|
from chromite.lib import commandline
|
from chromite.lib import cros_logging as logging
|
from chromite.lib import locking
|
from chromite.lib import parallel
|
|
# Paylods to stage.
|
PAYLOADS = ['quick_provision', 'stateful']
|
|
# Number of seconds between full status checks.
|
STATUS_POLL_SECONDS = 2
|
|
# Number of successes/failures to blacklist a DUT.
|
BLACKLIST_CONSECUTIVE_FAILURE = 2
|
BLACKLIST_TOTAL_SUCCESS = 0
|
BLACKLIST_TOTAL_FAILURE = 5
|
|
def get_parser():
|
"""Creates the argparse parser."""
|
parser = commandline.ArgumentParser(description=__doc__)
|
parser.add_argument('server', type=str, action='store',
|
help='Devserver to load test.')
|
parser.add_argument('config', type=str, action='store',
|
help='Path to JSON config file.'
|
'Config file is indexed by board with keys of '
|
'"duts" and "versions", each a list.')
|
parser.add_argument('--blacklist-consecutive', '-C', type=int,
|
action='store',
|
help=('Consecutive number of failures before '
|
'blacklisting DUT (default %d).') %
|
BLACKLIST_CONSECUTIVE_FAILURE,
|
default=BLACKLIST_CONSECUTIVE_FAILURE)
|
parser.add_argument('--blacklist-success', '-S', type=int, action='store',
|
help=('Total number of successes before blacklisting '
|
'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS,
|
default=BLACKLIST_TOTAL_SUCCESS)
|
parser.add_argument('--blacklist-total', '-T', type=int, action='store',
|
help=('Total number of failures before blacklisting '
|
'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE,
|
default=BLACKLIST_TOTAL_FAILURE)
|
parser.add_argument('--boards', '-b', type=str, action='store',
|
help='Comma-separated list of boards to provision.')
|
parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun',
|
help='Do not attempt to provision.')
|
parser.add_argument('--duts', '-d', type=str, action='store',
|
help='Comma-separated list of duts to provision.')
|
parser.add_argument('--outputlog', '-l', type=str, action='store',
|
help='Path to append JSON entries to.')
|
parser.add_argument('--output', '-o', type=str, action='store',
|
help='Path to write JSON file to.')
|
parser.add_argument('--ping', '-p', action='store_true',
|
help='Ping DUTs and blacklist unresponsive ones.')
|
parser.add_argument('--simultaneous', '-s', type=int, action='store',
|
help='Number of simultaneous provisions to run.',
|
default=1)
|
parser.add_argument('--no-stage', action='store_false',
|
dest='stage', default=True,
|
help='Do not attempt to stage builds.')
|
parser.add_argument('--total', '-t', type=int, action='store',
|
help='Number of total provisions to run.',
|
default=0)
|
return parser
|
|
def make_entry(entry_id, name, status, start_time,
|
finish_time=None, parent=None, **kwargs):
|
"""Generate an event log entry to be stored in Cloud Datastore.
|
|
@param entry_id: A (Kind, id) tuple representing the key.
|
@param name: A string identifying the event
|
@param status: A string identifying the status of the event.
|
@param start_time: A datetime of the start of the event.
|
@param finish_time: A datetime of the finish of the event.
|
@param parent: A (Kind, id) tuple representing the parent key.
|
|
@return A dictionary representing the entry suitable for dumping via JSON.
|
"""
|
entry = {
|
'id': entry_id,
|
'name': name,
|
'status': status,
|
'start_time': time_utils.to_epoch_time(start_time),
|
}
|
if finish_time is not None:
|
entry['finish_time'] = time_utils.to_epoch_time(finish_time)
|
if parent is not None:
|
entry['parent'] = parent
|
return entry
|
|
class Job(object):
|
"""Tracks a single provision job."""
|
def __init__(self, ds, host_name, build_name,
|
entry_id=0, parent=None, board=None,
|
start_active=0,
|
force_update=False, full_update=False,
|
clobber_stateful=True, quick_provision=True,
|
ping=False, dryrun=False):
|
|
self.ds = ds
|
self.host_name = host_name
|
self.build_name = build_name
|
|
self.entry_id = ('Job', entry_id)
|
self.parent = parent
|
self.board = board
|
self.start_active = start_active
|
self.end_active = None
|
self.check_active_sum = 0
|
self.check_active_count = 0
|
|
self.start_time = datetime.datetime.now()
|
self.finish_time = None
|
self.trigger_response = None
|
|
self.ping = ping
|
self.pre_ping = None
|
self.post_ping = None
|
|
self.kwargs = {
|
'host_name': host_name,
|
'build_name': build_name,
|
'force_update': force_update,
|
'full_update': full_update,
|
'clobber_stateful': clobber_stateful,
|
'quick_provision': quick_provision,
|
}
|
|
if dryrun:
|
self.finish_time = datetime.datetime.now()
|
self.raised_error = None
|
self.success = True
|
self.pid = 0
|
else:
|
if self.ping:
|
self.pre_ping = ping_dut(self.host_name)
|
self.trigger_response = ds._trigger_auto_update(**self.kwargs)
|
|
def as_entry(self):
|
"""Generate an entry for exporting to datastore."""
|
entry = make_entry(self.entry_id, self.host_name,
|
'pass' if self.success else 'fail',
|
self.start_time, self.finish_time, self.parent)
|
entry.update({
|
'build_name': self.build_name,
|
'board': self.board,
|
'devserver': self.ds.hostname,
|
'start_active': self.start_active,
|
'end_active': self.end_active,
|
'force_update': self.kwargs['force_update'],
|
'full_update': self.kwargs['full_update'],
|
'clobber_stateful': self.kwargs['clobber_stateful'],
|
'quick_provision': self.kwargs['quick_provision'],
|
'elapsed': int(self.elapsed().total_seconds()),
|
'trigger_response': self.trigger_response,
|
'pre_ping': self.pre_ping,
|
'post_ping': self.post_ping,
|
})
|
if self.check_active_count:
|
entry['avg_active'] = (self.check_active_sum /
|
self.check_active_count)
|
return entry
|
|
def check(self, active_count):
|
"""Checks if a job has completed.
|
|
@param active_count: Number of active provisions at time of the check.
|
@return: True if the job has completed, False otherwise.
|
"""
|
if self.finish_time is not None:
|
return True
|
|
self.check_active_sum += active_count
|
self.check_active_count += 1
|
|
finished, raised_error, pid = self.ds.check_for_auto_update_finished(
|
self.trigger_response, wait=False, **self.kwargs)
|
if finished:
|
self.finish_time = datetime.datetime.now()
|
self.raised_error = raised_error
|
self.success = self.raised_error is None
|
self.pid = pid
|
self.end_active = active_count
|
if self.ping:
|
self.post_ping = ping_dut(self.host_name)
|
|
return finished
|
|
def elapsed(self):
|
"""Determine the elapsed time of the task."""
|
finish_time = self.finish_time or datetime.datetime.now()
|
return finish_time - self.start_time
|
|
class Runner(object):
|
"""Parallel provision load test runner."""
|
def __init__(self, ds, duts, config, simultaneous=1, total=0,
|
outputlog=None, ping=False, blacklist_consecutive=None,
|
blacklist_success=None, blacklist_total=None, dryrun=False):
|
self.ds = ds
|
self.duts = duts
|
self.config = config
|
self.start_time = datetime.datetime.now()
|
self.finish_time = None
|
self.simultaneous = simultaneous
|
self.total = total
|
self.outputlog = outputlog
|
self.ping = ping
|
self.blacklist_consecutive = blacklist_consecutive
|
self.blacklist_success = blacklist_success
|
self.blacklist_total = blacklist_total
|
self.dryrun = dryrun
|
|
self.active = []
|
self.started = 0
|
self.completed = []
|
# Track DUTs which have failed multiple times.
|
self.dut_blacklist = set()
|
# Track versions of each DUT to provision in order.
|
self.last_versions = {}
|
|
# id for the parent entry.
|
# TODO: This isn't the most unique.
|
self.entry_id = ('Runner',
|
int(time_utils.to_epoch_time(datetime.datetime.now())))
|
|
# ids for the job entries.
|
self.next_id = 0
|
|
if self.outputlog:
|
dump_entries_as_json([self.as_entry()], self.outputlog)
|
|
def signal_handler(self, signum, frame):
|
"""Signal handle to dump current status."""
|
logging.info('Received signal %s', signum)
|
if signum == signal.SIGUSR1:
|
now = datetime.datetime.now()
|
logging.info('%d active provisions, %d completed provisions, '
|
'%s elapsed:',
|
len(self.active), len(self.completed),
|
now - self.start_time)
|
for job in self.active:
|
logging.info(' %s -> %s, %s elapsed',
|
job.host_name, job.build_name,
|
now - job.start_time)
|
|
def as_entry(self):
|
"""Generate an entry for exporting to datastore."""
|
entry = make_entry(self.entry_id, 'Runner', 'pass',
|
self.start_time, self.finish_time)
|
entry.update({
|
'devserver': self.ds.hostname,
|
})
|
return entry
|
|
def get_completed_entries(self):
|
"""Retrieves all completed jobs as entries for datastore."""
|
entries = [self.as_entry()]
|
entries.extend([job.as_entry() for job in self.completed])
|
return entries
|
|
def get_next_id(self):
|
"""Get the next Job id."""
|
entry_id = self.next_id
|
self.next_id += 1
|
return entry_id
|
|
def spawn(self, host_name, build_name):
|
"""Spawn a single provision job."""
|
job = Job(self.ds, host_name, build_name,
|
entry_id=self.get_next_id(), parent=self.entry_id,
|
board=self.get_dut_board_type(host_name),
|
start_active=len(self.active), ping=self.ping,
|
dryrun=self.dryrun)
|
self.active.append(job)
|
logging.info('Provision (%d) of %s to %s started',
|
job.entry_id[1], job.host_name, job.build_name)
|
self.last_versions[host_name] = build_name
|
self.started += 1
|
|
def replenish(self):
|
"""Replenish the number of active provisions to match goals."""
|
while ((self.simultaneous == 0 or
|
len(self.active) < self.simultaneous) and
|
(self.total == 0 or self.started < self.total)):
|
host_name = self.find_idle_dut()
|
if host_name:
|
build_name = self.find_build_for_dut(host_name)
|
self.spawn(host_name, build_name)
|
elif self.simultaneous:
|
logging.warn('Insufficient DUTs to satisfy goal')
|
return False
|
else:
|
return len(self.active) > 0
|
return True
|
|
def check_all(self):
|
"""Check the status of outstanding provisions."""
|
still_active = []
|
for job in self.active:
|
if job.check(len(self.active)):
|
logging.info('Provision (%d) of %s to %s %s in %s: %s',
|
job.entry_id[1], job.host_name, job.build_name,
|
'completed' if job.success else 'failed',
|
job.elapsed(), job.raised_error)
|
entry = job.as_entry()
|
logging.debug(json.dumps(entry))
|
if self.outputlog:
|
dump_entries_as_json([entry], self.outputlog)
|
self.completed.append(job)
|
if self.should_blacklist(job.host_name):
|
logging.error('Blacklisting DUT %s', job.host_name)
|
self.dut_blacklist.add(job.host_name)
|
else:
|
still_active.append(job)
|
self.active = still_active
|
|
def should_blacklist(self, host_name):
|
"""Determines if a given DUT should be blacklisted."""
|
jobs = [job for job in self.completed if job.host_name == host_name]
|
total = 0
|
consecutive = 0
|
successes = 0
|
for job in jobs:
|
if not job.success:
|
total += 1
|
consecutive += 1
|
if ((self.blacklist_total is not None and
|
total >= self.blacklist_total) or
|
(self.blacklist_consecutive is not None and
|
consecutive >= self.blacklist_consecutive)):
|
return True
|
else:
|
successes += 1
|
if (self.blacklist_success is not None and
|
successes >= self.blacklist_success):
|
return True
|
consecutive = 0
|
return False
|
|
def find_idle_dut(self):
|
"""Find an idle DUT to provision.."""
|
active_duts = {job.host_name for job in self.active}
|
idle_duts = [d for d in self.duts
|
if d not in active_duts | self.dut_blacklist]
|
return random.choice(idle_duts) if len(idle_duts) else None
|
|
def get_dut_board_type(self, host_name):
|
"""Determine the board type of a DUT."""
|
return self.duts[host_name]
|
|
def get_board_versions(self, board):
|
"""Determine the versions to provision for a board."""
|
return self.config[board]['versions']
|
|
def find_build_for_dut(self, host_name):
|
"""Determine a build to provision on a DUT."""
|
board = self.get_dut_board_type(host_name)
|
versions = self.get_board_versions(board)
|
last_version = self.last_versions.get(host_name)
|
try:
|
last_index = versions.index(last_version)
|
except ValueError:
|
return versions[0]
|
return versions[(last_index + 1) % len(versions)]
|
|
def stage(self, build):
|
"""Stage artifacts for a given build."""
|
logging.debug('Staging %s', build)
|
self.ds.stage_artifacts(build, PAYLOADS)
|
|
def stage_all(self):
|
"""Stage all necessary artifacts."""
|
boards = set(self.duts.values())
|
logging.info('Staging for %d boards', len(boards))
|
funcs = []
|
for board in boards:
|
for build in self.get_board_versions(board):
|
funcs.append(lambda build_=build: self.stage(build_))
|
parallel.RunParallelSteps(funcs)
|
|
def loop(self):
|
"""Run the main provision loop."""
|
# Install a signal handler for status updates.
|
old_handler = signal.signal(signal.SIGUSR1, self.signal_handler)
|
signal.siginterrupt(signal.SIGUSR1, False)
|
|
try:
|
while True:
|
self.check_all()
|
if self.total != 0 and len(self.completed) >= self.total:
|
break
|
if not self.replenish() and len(self.active) == 0:
|
logging.error('Unable to replenish with no active '
|
'provisions')
|
return False
|
logging.debug('%d provisions active', len(self.active))
|
time.sleep(STATUS_POLL_SECONDS)
|
return True
|
except KeyboardInterrupt:
|
return False
|
finally:
|
self.finish_time = datetime.datetime.now()
|
# Clean up signal handler.
|
signal.signal(signal.SIGUSR1, old_handler)
|
|
def elapsed(self):
|
"""Determine the elapsed time of the task."""
|
finish_time = self.finish_time or datetime.datetime.now()
|
return finish_time - self.start_time
|
|
def dump_entries_as_json(entries, output_file):
|
"""Dump event log entries as json to a file.
|
|
@param entries: A list of event log entries to dump.
|
@param output_file: The file to write to.
|
"""
|
# Write the entries out as JSON.
|
logging.debug('Dumping %d entries' % len(entries))
|
for e in entries:
|
json.dump(e, output_file, sort_keys=True)
|
output_file.write('\n')
|
output_file.flush()
|
|
def ping_dut(hostname):
|
"""Checks if a host is responsive to pings."""
|
if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname):
|
hostname += '.cros'
|
|
response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname],
|
stdout=subprocess.PIPE)
|
return response == 0
|
|
def main(argv):
|
"""Load generator for a devserver."""
|
parser = get_parser()
|
options = parser.parse_args(argv)
|
|
# Parse devserver.
|
if options.server:
|
if re.match(r'^https?://', options.server):
|
server = options.server
|
else:
|
server = 'http://%s/' % options.server
|
ds = dev_server.ImageServer(server)
|
else:
|
parser.print_usage()
|
logging.error('Must specify devserver')
|
sys.exit(1)
|
|
# Parse config file and determine master list of duts and their board type,
|
# filtering by board type if specified.
|
duts = {}
|
if options.config:
|
with open(options.config, 'r') as f:
|
config = json.load(f)
|
boards = (options.boards.split(',')
|
if options.boards else config.keys())
|
duts_specified = (set(options.duts.split(','))
|
if options.duts else None)
|
for board in boards:
|
duts.update({dut: board for dut in config[board]['duts']
|
if duts_specified is None or
|
dut in duts_specified})
|
logging.info('Config file %s: %d boards, %d duts',
|
options.config, len(boards), len(duts))
|
else:
|
parser.print_usage()
|
logging.error('Must specify config file')
|
sys.exit(1)
|
|
if options.ping:
|
logging.info('Performing ping tests')
|
duts_alive = {}
|
for dut, board in duts.items():
|
if ping_dut(dut):
|
duts_alive[dut] = board
|
else:
|
logging.error('Ignoring DUT %s (%s) for failing initial '
|
'ping check', dut, board)
|
duts = duts_alive
|
logging.info('After ping tests: %d boards, %d duts', len(boards),
|
len(duts))
|
|
# Set up the test runner and stage all the builds.
|
outputlog = open(options.outputlog, 'a') if options.outputlog else None
|
runner = Runner(ds, duts, config,
|
simultaneous=options.simultaneous, total=options.total,
|
outputlog=outputlog, ping=options.ping,
|
blacklist_consecutive=options.blacklist_consecutive,
|
blacklist_success=options.blacklist_success,
|
blacklist_total=options.blacklist_total,
|
dryrun=options.dryrun)
|
if options.stage:
|
runner.stage_all()
|
|
# Run all the provisions.
|
with locking.FileLock(options.config, blocking=True).lock():
|
completed = runner.loop()
|
logging.info('%s in %s', 'Completed' if completed else 'Interrupted',
|
runner.elapsed())
|
# Write all entries as JSON.
|
entries = runner.get_completed_entries()
|
if options.output:
|
with open(options.output, 'w') as f:
|
dump_entries_as_json(entries, f)
|
else:
|
dump_entries_as_json(entries, sys.stdout)
|
logging.info('Summary: %s',
|
dict(collections.Counter([e['status'] for e in entries
|
if e['name'] != 'Runner'])))
|
|
# List blacklisted DUTs.
|
if runner.dut_blacklist:
|
logging.warn('Blacklisted DUTs:')
|
for host_name in runner.dut_blacklist:
|
logging.warn(' %s', host_name)
|
|
if __name__ == '__main__':
|
sys.exit(main(sys.argv[1:]))
|