lin
2025-08-14 dae8bad597b6607a449b32bf76c523423f7720ed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/env python
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
"""Standalone service to monitor AFE servers and report to ts_mon"""
import sys
import time
import logging
import multiprocessing
import urllib2
 
import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.frontend.afe.json_rpc import proxy
from autotest_lib.server import frontend
# import needed to setup host_attributes
# pylint: disable=unused-import
from autotest_lib.server import site_host_attributes
from autotest_lib.site_utils import server_manager_utils
from chromite.lib import commandline
from chromite.lib import metrics
from chromite.lib import ts_mon_config
 
METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
METRIC_TICK = METRIC_ROOT + '/tick'
METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
 
FAILURE_REASONS = {
        proxy.JSONRPCException: 'JSONRPCException',
        }
 
def afe_rpc_call(hostname):
    """Perform one rpc call set on server
 
    @param hostname: server's hostname to poll
    """
    afe_monitor = AfeMonitor(hostname)
    try:
        afe_monitor.run()
    except Exception as e:
        metrics.Counter(METRIC_MONITOR_ERROR).increment(
                fields={'target_hostname': hostname})
        logging.exception('Exception when running against host %s', hostname)
 
 
def update_shards(shards, shards_lock, period=600, stop_event=None):
    """Updates dict of shards
 
    @param shards: list of shards to be updated
    @param shards_lock: shared lock for accessing shards
    @param period: time between polls
    @param stop_event: Event that can be set to stop polling
    """
    while(not stop_event or not stop_event.is_set()):
        start_time = time.time()
 
        logging.debug('Updating Shards')
        new_shards = set(server_manager_utils.get_shards())
 
        with shards_lock:
            current_shards = set(shards)
            rm_shards = current_shards - new_shards
            add_shards = new_shards - current_shards
 
            if rm_shards:
                for s in rm_shards:
                    shards.remove(s)
 
            if add_shards:
                shards.extend(add_shards)
 
        if rm_shards:
            logging.info('Servers left production: %s', str(rm_shards))
 
        if add_shards:
            logging.info('Servers entered production: %s',
                    str(add_shards))
 
        wait_time = (start_time + period) - time.time()
        if wait_time > 0:
            time.sleep(wait_time)
 
 
def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
                     stop_event=None):
    """Blocking function that polls all servers and shards
 
    @param servers: list of servers to poll
    @param servers_lock: lock to be used when accessing servers or shards
    @param shards: list of shards to poll
    @param period: time between polls
    @param stop_event: Event that can be set to stop polling
    """
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
 
    while(not stop_event or not stop_event.is_set()):
        start_time = time.time()
        with servers_lock:
            all_servers = set(servers).union(shards)
 
        logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
        pool.map(afe_rpc_call, all_servers)
 
        logging.debug('Finished Server Polling')
 
        metrics.Counter(METRIC_TICK).increment()
 
        wait_time = (start_time + period) - time.time()
        if wait_time > 0:
            time.sleep(wait_time)
 
 
class RpcFlightRecorder(object):
    """Monitors a list of AFE"""
    def __init__(self, servers, with_shards=True, poll_period=60):
        """
        @param servers: list of afe services to monitor
        @param with_shards: also record status on shards
        @param poll_period: frequency to poll all services, in seconds
        """
        self._manager = multiprocessing.Manager()
 
        self._poll_period = poll_period
 
        self._servers = self._manager.list(servers)
        self._servers_lock = self._manager.RLock()
 
        self._with_shards = with_shards
        self._shards = self._manager.list()
        self._update_shards_ps = None
        self._poll_rpc_server_ps = None
 
        self._stop_event = multiprocessing.Event()
 
    def start(self):
        """Call to start recorder"""
        if(self._with_shards):
            shard_args = [self._shards, self._servers_lock]
            shard_kwargs = {'stop_event': self._stop_event}
            self._update_shards_ps = multiprocessing.Process(
                    name='update_shards',
                    target=update_shards,
                    args=shard_args,
                    kwargs=shard_kwargs)
 
            self._update_shards_ps.start()
 
        poll_args = [self._servers, self._servers_lock]
        poll_kwargs= {'shards':self._shards,
                     'period':self._poll_period,
                     'stop_event':self._stop_event}
        self._poll_rpc_server_ps = multiprocessing.Process(
                name='poll_rpc_servers',
                target=poll_rpc_servers,
                args=poll_args,
                kwargs=poll_kwargs)
 
        self._poll_rpc_server_ps.start()
 
    def close(self):
        """Send close event to all sub processes"""
        self._stop_event.set()
 
 
    def termitate(self):
        """Terminate processes"""
        self.close()
        if self._poll_rpc_server_ps:
            self._poll_rpc_server_ps.terminate()
 
        if self._update_shards_ps:
            self._update_shards_ps.terminate()
 
        if self._manager:
            self._manager.shutdown()
 
 
    def join(self, timeout=None):
        """Blocking call until closed and processes complete
 
        @param timeout: passed to each process, so could be >timeout"""
        if self._poll_rpc_server_ps:
            self._poll_rpc_server_ps.join(timeout)
 
        if self._update_shards_ps:
            self._update_shards_ps.join(timeout)
 
def _failed(fields, msg_str, reason, err=None):
    """Mark current run failed
 
    @param fields, ts_mon fields to mark as failed
    @param msg_str, message string to be filled
    @param reason: why it failed
    @param err: optional error to log more debug info
    """
    fields['success'] = False
    fields['failure_reason'] = reason
    logging.warning("%s failed - %s", msg_str, reason)
    if err:
        logging.debug("%s fail_err - %s", msg_str, str(err))
 
class AfeMonitor(object):
    """Object that runs rpc calls against the given afe frontend"""
 
    def __init__(self, hostname):
        """
        @param hostname: hostname of server to monitor, string
        """
        self._hostname = hostname
        self._afe = frontend.AFE(server=self._hostname)
        self._metric_fields = {'target_hostname': self._hostname}
 
 
    def run_cmd(self, cmd, expected=None):
        """Runs rpc command and log metrics
 
        @param cmd: string of rpc command to send
        @param expected: expected result of rpc
        """
        metric_fields = self._metric_fields.copy()
        metric_fields['command'] = cmd
        metric_fields['success'] = True
        metric_fields['failure_reason'] = ''
 
        with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
                fields=dict(metric_fields), scale=0.001) as f:
 
            msg_str = "%s:%s" % (self._hostname, cmd)
 
 
            try:
                result = self._afe.run(cmd)
                logging.debug("%s result = %s", msg_str, result)
                if expected is not None and expected != result:
                    _failed(f, msg_str, 'IncorrectResponse')
 
            except urllib2.HTTPError as e:
                _failed(f, msg_str, 'HTTPError:%d' % e.code)
 
            except Exception as e:
                _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
                        err=e)
 
                if type(e) not in FAILURE_REASONS:
                    raise
 
            if f['success']:
                logging.info("%s success", msg_str)
 
 
    def run(self):
        """Tests server and returns the result"""
        self.run_cmd('get_server_time')
        self.run_cmd('ping_db', [True])
 
 
def get_parser():
    """Returns argparse parser"""
    parser = commandline.ArgumentParser(description=__doc__)
 
    parser.add_argument('-a', '--afe', action='append', default=[],
                        help='Autotest FrontEnd server to monitor')
 
    parser.add_argument('-p', '--poll-period', type=int, default=60,
                        help='Frequency to poll AFE servers')
 
    parser.add_argument('--no-shards', action='store_false', dest='with_shards',
                        help='Disable shard updating')
 
    return parser
 
 
def main(argv):
    """Main function
 
    @param argv: commandline arguments passed
    """
    parser = get_parser()
    options = parser.parse_args(argv[1:])
 
 
    if not options.afe:
        options.afe = [global_config.global_config.get_config_value(
                        'SERVER', 'global_afe_hostname', default='cautotest')]
 
    with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
                                             indirect=True):
        flight_recorder = RpcFlightRecorder(options.afe,
                                            with_shards=options.with_shards,
                                            poll_period=options.poll_period)
 
        flight_recorder.start()
        flight_recorder.join()
 
 
if __name__ == '__main__':
    main(sys.argv)