ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import logging
import unittest
import re
import csv
import common
import os
 
from itertools import imap
from autotest_lib.server.cros import resource_monitor
from autotest_lib.server.hosts import abstract_ssh
from autotest_lib.server import utils
 
class HostMock(abstract_ssh.AbstractSSHHost):
    """Mocks a host object."""
 
    TOP_PID = '1234'
 
    def _initialize(self, test_env):
        self.top_is_running = False
 
        # Keep track of whether the top raw output file exists on the system,
        # and if it does, where it is.
        self.top_output_file_path = None
 
        # Keep track of whether the raw top output file is currently being
        # written to by top.
        self.top_output_file_is_open = False
        self.test_env = test_env
 
 
    def get_file(self, src, dest):
        pass
 
 
    def called_unsupported_command(self, command):
        """Raises assertion error when called.
 
        @param command string the unsupported command called.
 
        """
        raise AssertionError(
                "ResourceMonitor called unsupported command %s" % command)
 
 
    def _process_top(self, cmd_args, cmd_line):
        """Process top command.
 
        @param cmd_args string_list of command line args.
        @param cmd_line string the command to run.
 
        """
        self.test_env.assertFalse(self.top_is_running,
                msg="Top must not already be running.")
        self.test_env.assertFalse(self.top_output_file_is_open,
                msg="The top output file should not be being written "
                "to before top is started")
        self.test_env.assertIsNone(self.top_output_file_path,
                msg="The top output file should not exist"
                "before top is started")
        try:
            self.redirect_index = cmd_args.index(">")
            self.top_output_file_path = cmd_args[self.redirect_index + 1]
        except ValueError, IndexError:
            self.called_unsupported_command(cmd_line)
 
        self.top_is_running = True
        self.top_output_file_is_open = True
 
        return HostMock.TOP_PID
 
 
    def _process_kill(self, cmd_args, cmd_line):
        """Process kill command.
 
        @param cmd_args string_list of command line args.
        @param cmd_line string the command to run.
 
        """
        try:
            if cmd_args[1].startswith('-'):
                pid_to_kill = cmd_args[2]
            else:
                pid_to_kill = cmd_args[1]
        except IndexError:
            self.called_unsupported_command(cmd_line)
 
        self.test_env.assertEqual(pid_to_kill, HostMock.TOP_PID,
                msg="Wrong pid (%r) killed . Top pid is %r." % (pid_to_kill,
                HostMock.TOP_PID))
        self.test_env.assertTrue(self.top_is_running,
                msg="Top must be running before we try to kill it")
 
        self.top_is_running = False
        self.top_output_file_is_open = False
 
 
    def _process_rm(self, cmd_args, cmd_line):
        """Process rm command.
 
        @param cmd_args string list list of command line args.
        @param cmd_line string the command to run.
 
        """
        try:
            if cmd_args[1].startswith('-'):
                file_to_rm = cmd_args[2]
            else:
                file_to_rm = cmd_args[1]
        except IndexError:
            self.called_unsupported_command(cmd_line)
 
        self.test_env.assertEqual(file_to_rm, self.top_output_file_path,
                msg="Tried to remove file that is not the top output file.")
        self.test_env.assertFalse(self.top_output_file_is_open,
                msg="Tried to remove top output file while top is still "
                "writing to it.")
        self.test_env.assertFalse(self.top_is_running,
                msg="Top was still running when we tried to remove"
                "the top output file.")
        self.test_env.assertIsNotNone(self.top_output_file_path)
 
        self.top_output_file_path = None
 
 
    def _run_single_cmd(self, cmd_line, *args, **kwargs):
        """Run a single command on host.
 
        @param cmd_line command to run on the host.
 
        """
        # Make the input a little nicer.
        cmd_line = cmd_line.strip()
        cmd_line = re.sub(">", " > ", cmd_line)
 
        cmd_args = re.split("\s+", cmd_line)
        self.test_env.assertTrue(len(cmd_args) >= 1)
        command = cmd_args[0]
        if (command == "top"):
            return self._process_top(cmd_args, cmd_line)
        elif (command == "kill"):
            return self._process_kill(cmd_args, cmd_line)
        elif(command == "rm"):
            return self._process_rm(cmd_args, cmd_line)
        else:
            logging.warning("Called unemulated command %r", cmd_line)
            return None
 
 
    def run(self, cmd_line, *args, **kwargs):
        """Run command(s) on host.
 
        @param cmd_line command to run on the host.
        @return CmdResult object.
 
        """
        cmds = re.split("&&", cmd_line)
        for cmd in cmds:
            self._run_single_cmd(cmd)
        return utils.CmdResult(exit_status=0)
 
 
    def run_background(self, cmd_line, *args, **kwargs):
        """Run command in background on host.
 
        @param cmd_line command to run on the host.
 
        """
        return self._run_single_cmd(cmd_line, args, kwargs)
 
 
    def is_monitoring(self):
        """Return true iff host is currently running top and writing output
            to a file.
        """
        return self.top_is_running and self.top_output_file_is_open and (
            self.top_output_file_path is not None)
 
 
    def monitoring_stopped(self):
        """Return true iff host is not running top and all top output files are
            closed.
        """
        return not self.is_monitoring()
 
 
class ResourceMonitorTest(unittest.TestCase):
    """Tests the non-trivial functionality of ResourceMonitor."""
 
    def setUp(self):
        self.topoutfile = '/tmp/resourcemonitorunittest-1234'
        self.monitor_period = 1
        self.rm_conf = resource_monitor.ResourceMonitorConfig(
                monitor_period=self.monitor_period,
                rawresult_output_filename=self.topoutfile)
        self.host = HostMock(self)
 
 
    def test_normal_operation(self):
        """Checks that normal (i.e. no exceptions, etc.) execution works."""
        self.assertFalse(self.host.is_monitoring())
        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
            self.assertFalse(self.host.is_monitoring())
            for i in range(3):
                rm.start()
                self.assertTrue(self.host.is_monitoring())
                rm.stop()
                self.assertTrue(self.host.monitoring_stopped())
        self.assertTrue(self.host.monitoring_stopped())
 
 
    def test_forgot_to_stop_monitor(self):
        """Checks that resource monitor is cleaned up even if user forgets to
            explicitly stop it.
        """
        self.assertFalse(self.host.is_monitoring())
        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
            self.assertFalse(self.host.is_monitoring())
            rm.start()
            self.assertTrue(self.host.is_monitoring())
        self.assertTrue(self.host.monitoring_stopped())
 
 
    def test_unexpected_interruption_while_monitoring(self):
        """Checks that monitor is cleaned up upon unexpected interrupt."""
        self.assertFalse(self.host.is_monitoring())
 
        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
            self.assertFalse(self.host.is_monitoring())
            rm.start()
            self.assertTrue(self.host.is_monitoring())
            raise KeyboardInterrupt
 
        self.assertTrue(self.host.monitoring_stopped())
 
 
class ResourceMonitorResultTest(unittest.TestCase):
    """Functional tests for ResourceMonitorParsedResult."""
 
    def setUp(self):
        self._res_dir = os.path.join(
                            os.path.dirname(os.path.realpath(__file__)),
                            'res_resource_monitor')
 
 
    def run_with_test_data(self, testdata_file, testans_file):
        """Parses testdata_file with the parses, and checks that results
            are the same as those in testans_file.
 
        @param testdata_file string filename containing top output to test.
        @param testans_file string filename containing answers to the test.
 
        """
        parsed_results = resource_monitor.ResourceMonitorParsedResult(
                testdata_file)
        with open(testans_file, "rb") as testans:
            csvreader = csv.reader(testans)
            columns = csvreader.next()
            self.assertEqual(list(columns),
                    resource_monitor.ResourceMonitorParsedResult._columns)
            utils_over_time = []
            for util_val in imap(
                    resource_monitor.
                            ResourceMonitorParsedResult.UtilValues._make,
                    csvreader):
                utils_over_time.append(util_val)
            self.assertEqual(utils_over_time, parsed_results._utils_over_time)
 
 
    def test_full_data(self):
        """General test with many possible changes to input."""
        self.run_with_test_data(
                os.path.join(self._res_dir, 'top_test_data.txt'),
                os.path.join(self._res_dir, 'top_test_data_ans.csv'))
 
 
    def test_whitespace_ridden(self):
        """Tests resilience to arbitrary whitespace characters between fields"""
        self.run_with_test_data(
                os.path.join(self._res_dir, 'top_whitespace_ridden.txt'),
                os.path.join(self._res_dir, 'top_whitespace_ridden_ans.csv'))
 
 
    def test_field_order_changed(self):
        """Tests resilience to changes in the order of fields
            (for e.g, if the Mem free/used fields change orders in the input).
        """
        self.run_with_test_data(
                os.path.join(self._res_dir, 'top_field_order_changed.txt'),
                os.path.join(self._res_dir, 'top_field_order_changed_ans.csv'))
 
 
if __name__ == '__main__':
    unittest.main()