tzh
2024-08-22 c7d0944258c7d0943aa7b2211498fd612971ce27
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import logging
import sys
import threading
import time
from autotest_lib.client.common_lib import error
 
 
class BaseStressor(threading.Thread):
    """
    Implements common functionality for *Stressor classes.
 
    @var stressor: callable which performs a single stress event.
    """
    def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
        """
        Initialize the ControlledStressor.
 
        @param stressor: callable which performs a single stress event.
        @param on_exit: callable which will be called when the thread finishes.
        @param escalate_exceptions: whether to escalate exceptions to the parent
            thread; defaults to True.
        """
        super(BaseStressor, self).__init__()
        self.daemon = True
        self.stressor = stressor
        self.on_exit = on_exit
        self._escalate_exceptions = escalate_exceptions
        self._exc_info = None
 
 
    def start(self, start_condition=None, start_timeout_secs=None):
        """
        Creates a new thread which will call the run() method.
 
        Optionally takes a wait condition before the stressor loop. Returns
        immediately.
 
        @param start_condition: the new thread will wait until this optional
            callable returns True before running the stressor.
        @param start_timeout_secs: how long to wait for |start_condition| to
            become True, or None to wait forever.
        """
        self._start_condition = start_condition
        self._start_timeout_secs = start_timeout_secs
        super(BaseStressor, self).start()
 
 
    def run(self):
        """
        Wait for |_start_condition|, and then start the stressor loop.
 
        Overloaded from threading.Thread. This is run in a separate thread when
        start() is called.
        """
        try:
            self._wait_for_start_condition()
            self._loop_stressor()
        except Exception as e:
            if self._escalate_exceptions:
                self._exc_info = sys.exc_info()
            raise  # Terminates this thread. Caller continues to run.
        finally:
            if self.on_exit:
              self.on_exit()
 
 
    def _wait_for_start_condition(self):
        """
        Loop until _start_condition() returns True, or _start_timeout_secs
        have elapsed.
 
        @raise error.TestFail if we time out waiting for the start condition
        """
        if self._start_condition is None:
            return
 
        elapsed_secs = 0
        while not self._start_condition():
            if (self._start_timeout_secs and
                    elapsed_secs >= self._start_timeout_secs):
                raise error.TestFail('start condition did not become true '
                                     'within %d seconds' %
                                     self._start_timeout_secs)
            time.sleep(1)
            elapsed_secs += 1
 
 
    def _loop_stressor(self):
        """
        Apply stressor in a loop.
 
        Overloaded by the particular *Stressor.
        """
        raise NotImplementedError
 
 
    def reraise(self):
        """
        Reraise an exception raised in the thread's stress loop.
 
        This is a No-op if no exception was raised.
        """
        if self._exc_info:
            exc_info = self._exc_info
            self._exc_info = None
            raise exc_info[0], exc_info[1], exc_info[2]
 
 
class ControlledStressor(BaseStressor):
    """
    Run a stressor in loop on a separate thread.
 
    Creates a new thread and calls |stressor| in a loop until stop() is called.
    """
    def __init__(self, stressor, on_exit=None, escalate_exceptions=True):
        """
        Initialize the ControlledStressor.
 
        @param stressor: callable which performs a single stress event.
        @param on_exit: callable which will be called when the thread finishes.
        @param escalate_exceptions: whether to escalate exceptions to the parent
            thread when stop() is called; defaults to True.
        """
        self._complete = threading.Event()
        super(ControlledStressor, self).__init__(stressor, on_exit,
                                                 escalate_exceptions)
 
 
    def _loop_stressor(self):
        """Overloaded from parent."""
        iteration_num = 0
        while not self._complete.is_set():
            iteration_num += 1
            logging.info('Stressor iteration: %d', iteration_num)
            self.stressor()
 
 
    def start(self, start_condition=None, start_timeout_secs=None):
        """Start applying the stressor.
 
        Overloaded from parent.
 
        @param start_condition: the new thread will wait to until this optional
            callable returns True before running the stressor.
        @param start_timeout_secs: how long to wait for |start_condition| to
            become True, or None to wait forever.
        """
        self._complete.clear()
        super(ControlledStressor, self).start(start_condition,
                                              start_timeout_secs)
 
 
    def stop(self, timeout=45):
        """
        Stop applying the stressor.
 
        @param timeout: maximum time to wait for a single run of the stressor to
            complete, defaults to 45 seconds.
        """
        self._complete.set()
        self.join(timeout)
        self.reraise()
 
 
class CountedStressor(BaseStressor):
    """
    Run a stressor in a loop on a separate thread a given number of times.
 
    Creates a new thread and calls |stressor| in a loop |iterations| times. The
    calling thread can use wait() to block until the loop completes. If the
    stressor thread terminates with an exception, wait() will propagate that
    exception to the thread that called wait().
    """
    def _loop_stressor(self):
        """Overloaded from parent."""
        for iteration_num in xrange(1, self._iterations + 1):
            logging.info('Stressor iteration: %d of %d',
                         iteration_num, self._iterations)
            self.stressor()
 
 
    def start(self, iterations, start_condition=None, start_timeout_secs=None):
        """
        Apply the stressor a given number of times.
 
        Overloaded from parent.
 
        @param iterations: number of times to apply the stressor.
        @param start_condition: the new thread will wait to until this optional
            callable returns True before running the stressor.
        @param start_timeout_secs: how long to wait for |start_condition| to
            become True, or None to wait forever.
        """
        self._iterations = iterations
        super(CountedStressor, self).start(start_condition, start_timeout_secs)
 
 
    def wait(self, timeout=None):
        """Wait until the stressor completes.
 
        @param timeout: maximum time for the thread to complete, by default
            never times out.
        """
        self.join(timeout)
        self.reraise()