#!/usr/bin/env python
|
# Copyright 2017 The Chromium OS Authors. All rights reserved.
|
# Use of this source code is governed by a BSD-style license that can be
|
# found in the LICENSE file.
|
|
"""Standalone service to monitor AFE servers and report to ts_mon"""
|
import sys
|
import time
|
import logging
|
import multiprocessing
|
import urllib2
|
|
import common
|
from autotest_lib.client.common_lib import global_config
|
from autotest_lib.frontend.afe.json_rpc import proxy
|
from autotest_lib.server import frontend
|
# import needed to setup host_attributes
|
# pylint: disable=unused-import
|
from autotest_lib.server import site_host_attributes
|
from autotest_lib.site_utils import server_manager_utils
|
from chromite.lib import commandline
|
from chromite.lib import metrics
|
from chromite.lib import ts_mon_config
|
|
METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
|
METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
|
METRIC_TICK = METRIC_ROOT + '/tick'
|
METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
|
|
FAILURE_REASONS = {
|
proxy.JSONRPCException: 'JSONRPCException',
|
}
|
|
def afe_rpc_call(hostname):
|
"""Perform one rpc call set on server
|
|
@param hostname: server's hostname to poll
|
"""
|
afe_monitor = AfeMonitor(hostname)
|
try:
|
afe_monitor.run()
|
except Exception as e:
|
metrics.Counter(METRIC_MONITOR_ERROR).increment(
|
fields={'target_hostname': hostname})
|
logging.exception('Exception when running against host %s', hostname)
|
|
|
def update_shards(shards, shards_lock, period=600, stop_event=None):
|
"""Updates dict of shards
|
|
@param shards: list of shards to be updated
|
@param shards_lock: shared lock for accessing shards
|
@param period: time between polls
|
@param stop_event: Event that can be set to stop polling
|
"""
|
while(not stop_event or not stop_event.is_set()):
|
start_time = time.time()
|
|
logging.debug('Updating Shards')
|
new_shards = set(server_manager_utils.get_shards())
|
|
with shards_lock:
|
current_shards = set(shards)
|
rm_shards = current_shards - new_shards
|
add_shards = new_shards - current_shards
|
|
if rm_shards:
|
for s in rm_shards:
|
shards.remove(s)
|
|
if add_shards:
|
shards.extend(add_shards)
|
|
if rm_shards:
|
logging.info('Servers left production: %s', str(rm_shards))
|
|
if add_shards:
|
logging.info('Servers entered production: %s',
|
str(add_shards))
|
|
wait_time = (start_time + period) - time.time()
|
if wait_time > 0:
|
time.sleep(wait_time)
|
|
|
def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
|
stop_event=None):
|
"""Blocking function that polls all servers and shards
|
|
@param servers: list of servers to poll
|
@param servers_lock: lock to be used when accessing servers or shards
|
@param shards: list of shards to poll
|
@param period: time between polls
|
@param stop_event: Event that can be set to stop polling
|
"""
|
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
|
|
while(not stop_event or not stop_event.is_set()):
|
start_time = time.time()
|
with servers_lock:
|
all_servers = set(servers).union(shards)
|
|
logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
|
pool.map(afe_rpc_call, all_servers)
|
|
logging.debug('Finished Server Polling')
|
|
metrics.Counter(METRIC_TICK).increment()
|
|
wait_time = (start_time + period) - time.time()
|
if wait_time > 0:
|
time.sleep(wait_time)
|
|
|
class RpcFlightRecorder(object):
|
"""Monitors a list of AFE"""
|
def __init__(self, servers, with_shards=True, poll_period=60):
|
"""
|
@param servers: list of afe services to monitor
|
@param with_shards: also record status on shards
|
@param poll_period: frequency to poll all services, in seconds
|
"""
|
self._manager = multiprocessing.Manager()
|
|
self._poll_period = poll_period
|
|
self._servers = self._manager.list(servers)
|
self._servers_lock = self._manager.RLock()
|
|
self._with_shards = with_shards
|
self._shards = self._manager.list()
|
self._update_shards_ps = None
|
self._poll_rpc_server_ps = None
|
|
self._stop_event = multiprocessing.Event()
|
|
def start(self):
|
"""Call to start recorder"""
|
if(self._with_shards):
|
shard_args = [self._shards, self._servers_lock]
|
shard_kwargs = {'stop_event': self._stop_event}
|
self._update_shards_ps = multiprocessing.Process(
|
name='update_shards',
|
target=update_shards,
|
args=shard_args,
|
kwargs=shard_kwargs)
|
|
self._update_shards_ps.start()
|
|
poll_args = [self._servers, self._servers_lock]
|
poll_kwargs= {'shards':self._shards,
|
'period':self._poll_period,
|
'stop_event':self._stop_event}
|
self._poll_rpc_server_ps = multiprocessing.Process(
|
name='poll_rpc_servers',
|
target=poll_rpc_servers,
|
args=poll_args,
|
kwargs=poll_kwargs)
|
|
self._poll_rpc_server_ps.start()
|
|
def close(self):
|
"""Send close event to all sub processes"""
|
self._stop_event.set()
|
|
|
def termitate(self):
|
"""Terminate processes"""
|
self.close()
|
if self._poll_rpc_server_ps:
|
self._poll_rpc_server_ps.terminate()
|
|
if self._update_shards_ps:
|
self._update_shards_ps.terminate()
|
|
if self._manager:
|
self._manager.shutdown()
|
|
|
def join(self, timeout=None):
|
"""Blocking call until closed and processes complete
|
|
@param timeout: passed to each process, so could be >timeout"""
|
if self._poll_rpc_server_ps:
|
self._poll_rpc_server_ps.join(timeout)
|
|
if self._update_shards_ps:
|
self._update_shards_ps.join(timeout)
|
|
def _failed(fields, msg_str, reason, err=None):
|
"""Mark current run failed
|
|
@param fields, ts_mon fields to mark as failed
|
@param msg_str, message string to be filled
|
@param reason: why it failed
|
@param err: optional error to log more debug info
|
"""
|
fields['success'] = False
|
fields['failure_reason'] = reason
|
logging.warning("%s failed - %s", msg_str, reason)
|
if err:
|
logging.debug("%s fail_err - %s", msg_str, str(err))
|
|
class AfeMonitor(object):
|
"""Object that runs rpc calls against the given afe frontend"""
|
|
def __init__(self, hostname):
|
"""
|
@param hostname: hostname of server to monitor, string
|
"""
|
self._hostname = hostname
|
self._afe = frontend.AFE(server=self._hostname)
|
self._metric_fields = {'target_hostname': self._hostname}
|
|
|
def run_cmd(self, cmd, expected=None):
|
"""Runs rpc command and log metrics
|
|
@param cmd: string of rpc command to send
|
@param expected: expected result of rpc
|
"""
|
metric_fields = self._metric_fields.copy()
|
metric_fields['command'] = cmd
|
metric_fields['success'] = True
|
metric_fields['failure_reason'] = ''
|
|
with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
|
fields=dict(metric_fields), scale=0.001) as f:
|
|
msg_str = "%s:%s" % (self._hostname, cmd)
|
|
|
try:
|
result = self._afe.run(cmd)
|
logging.debug("%s result = %s", msg_str, result)
|
if expected is not None and expected != result:
|
_failed(f, msg_str, 'IncorrectResponse')
|
|
except urllib2.HTTPError as e:
|
_failed(f, msg_str, 'HTTPError:%d' % e.code)
|
|
except Exception as e:
|
_failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
|
err=e)
|
|
if type(e) not in FAILURE_REASONS:
|
raise
|
|
if f['success']:
|
logging.info("%s success", msg_str)
|
|
|
def run(self):
|
"""Tests server and returns the result"""
|
self.run_cmd('get_server_time')
|
self.run_cmd('ping_db', [True])
|
|
|
def get_parser():
|
"""Returns argparse parser"""
|
parser = commandline.ArgumentParser(description=__doc__)
|
|
parser.add_argument('-a', '--afe', action='append', default=[],
|
help='Autotest FrontEnd server to monitor')
|
|
parser.add_argument('-p', '--poll-period', type=int, default=60,
|
help='Frequency to poll AFE servers')
|
|
parser.add_argument('--no-shards', action='store_false', dest='with_shards',
|
help='Disable shard updating')
|
|
return parser
|
|
|
def main(argv):
|
"""Main function
|
|
@param argv: commandline arguments passed
|
"""
|
parser = get_parser()
|
options = parser.parse_args(argv[1:])
|
|
|
if not options.afe:
|
options.afe = [global_config.global_config.get_config_value(
|
'SERVER', 'global_afe_hostname', default='cautotest')]
|
|
with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
|
indirect=True):
|
flight_recorder = RpcFlightRecorder(options.afe,
|
with_shards=options.with_shards,
|
poll_period=options.poll_period)
|
|
flight_recorder.start()
|
flight_recorder.join()
|
|
|
if __name__ == '__main__':
|
main(sys.argv)
|