ronnie
2022-10-14 1504bb53e29d3d46222c0b3ea994fc494b48e153
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
"""
Programmable testing DHCP server.
 
Simple DHCP server you can program with expectations of future packets and
responses to those packets.  The server is basically a thin wrapper around a
server socket with some utility logic to make setting up tests easier.  To write
a test, you start a server, construct a sequence of handling rules.
 
Handling rules let you set up expectations of future packets of certain types.
Handling rules are processed in order, and only the first remaining handler
handles a given packet.  In theory you could write the entire test into a single
handling rule and keep an internal state machine for how far that handler has
gotten through the test.  This would be poor style however.  Correct style is to
write (or reuse) a handler for each packet the server should see, leading us to
a happy land where any conceivable packet handler has already been written for
us.
 
Example usage:
 
# Start up the DHCP server, which will ignore packets until a test is started
server = DhcpTestServer(interface="veth_master")
server.start()
 
# Given a list of handling rules, start a test with a 30 sec timeout.
handling_rules = []
handling_rules.append(DhcpHandlingRule_RespondToDiscovery(intended_ip,
                                                          intended_subnet_mask,
                                                          dhcp_server_ip,
                                                          lease_time_seconds)
server.start_test(handling_rules, 30.0)
 
# Trigger DHCP clients to do various test related actions
...
 
# Get results
server.wait_for_test_to_finish()
if (server.last_test_passed):
    ...
else:
    ...
 
 
Note that if you make changes, make sure that the tests in dhcp_unittest.py
still pass.
"""
 
import logging
import socket
import threading
import time
import traceback
 
from autotest_lib.client.cros import dhcp_packet
from autotest_lib.client.cros import dhcp_handling_rule
 
# From socket.h
SO_BINDTODEVICE = 25
 
class DhcpTestServer(threading.Thread):
    def __init__(self,
                 interface=None,
                 ingress_address="<broadcast>",
                 ingress_port=67,
                 broadcast_address="255.255.255.255",
                 broadcast_port=68):
        super(DhcpTestServer, self).__init__()
        self._mutex = threading.Lock()
        self._ingress_address = ingress_address
        self._ingress_port = ingress_port
        self._broadcast_port = broadcast_port
        self._broadcast_address = broadcast_address
        self._socket = None
        self._interface = interface
        self._stopped = False
        self._test_in_progress = False
        self._last_test_passed = False
        self._test_timeout = 0
        self._handling_rules = []
        self._logger = logging.getLogger("dhcp.test_server")
        self._exception = None
        self.daemon = False
 
    @property
    def stopped(self):
        with self._mutex:
            return self._stopped
 
    @property
    def is_healthy(self):
        with self._mutex:
            return self._socket is not None
 
    @property
    def test_in_progress(self):
        with self._mutex:
            return self._test_in_progress
 
    @property
    def last_test_passed(self):
        with self._mutex:
            return self._last_test_passed
 
    @property
    def current_rule(self):
        """
        Return the currently active DhcpHandlingRule.
        """
        with self._mutex:
            return self._handling_rules[0]
 
    def start(self):
        """
        Start the DHCP server.  Only call this once.
        """
        if self.is_alive():
            return False
        self._logger.info("DhcpTestServer started; opening sockets.")
        try:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self._logger.info("Opening socket on '%s' port %d." %
                              (self._ingress_address, self._ingress_port))
            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            if self._interface is not None:
                self._logger.info("Binding to %s" % self._interface)
                self._socket.setsockopt(socket.SOL_SOCKET,
                                        SO_BINDTODEVICE,
                                        self._interface)
            self._socket.bind((self._ingress_address, self._ingress_port))
            # Wait 100 ms for a packet, then return, thus keeping the thread
            # active but mostly idle.
            self._socket.settimeout(0.1)
        except socket.error, socket_error:
            self._logger.error("Socket error: %s." % str(socket_error))
            self._logger.error(traceback.format_exc())
            if not self._socket is None:
                self._socket.close()
            self._socket = None
            self._logger.error("Failed to open server socket.  Aborting.")
            return
        super(DhcpTestServer, self).start()
 
    def stop(self):
        """
        Stop the DHCP server and free its socket.
        """
        with self._mutex:
            self._stopped = True
 
    def start_test(self, handling_rules, test_timeout_seconds):
        """
        Start a new test using |handling_rules|.  The server will call the
        test successfull if it receives a RESPONSE_IGNORE_SUCCESS (or
        RESPONSE_RESPOND_SUCCESS) from a handling_rule before
        |test_timeout_seconds| passes.  If the timeout passes without that
        message, the server runs out of handling rules, or a handling rule
        return RESPONSE_FAIL, the test is ended and marked as not passed.
 
        All packets received before start_test() is called are received and
        ignored.
        """
        with self._mutex:
            self._test_timeout = time.time() + test_timeout_seconds
            self._handling_rules = handling_rules
            self._test_in_progress = True
            self._last_test_passed = False
            self._exception = None
 
    def wait_for_test_to_finish(self):
        """
        Block on the test finishing in a CPU friendly way.  Timeouts, successes,
        and failures count as finishes.
        """
        while self.test_in_progress:
            time.sleep(0.1)
        if self._exception:
            raise self._exception
 
    def abort_test(self):
        """
        Abort a test prematurely, counting the test as a failure.
        """
        with self._mutex:
            self._logger.info("Manually aborting test.")
            self._end_test_unsafe(False)
 
    def _teardown(self):
        with self._mutex:
            self._socket.close()
            self._socket = None
 
    def _end_test_unsafe(self, passed):
        if not self._test_in_progress:
            return
        if passed:
            self._logger.info("DHCP server says test passed.")
        else:
            self._logger.info("DHCP server says test failed.")
        self._test_in_progress = False
        self._last_test_passed = passed
 
    def _send_response_unsafe(self, packet):
        if packet is None:
            self._logger.error("Handling rule failed to return a packet.")
            return False
        self._logger.debug("Sending response: %s" % packet)
        binary_string = packet.to_binary_string()
        if binary_string is None or len(binary_string) < 1:
            self._logger.error("Packet failed to serialize to binary string.")
            return False
 
        self._socket.sendto(binary_string,
                            (self._broadcast_address, self._broadcast_port))
        return True
 
    def _loop_body(self):
        with self._mutex:
            if self._test_in_progress and self._test_timeout < time.time():
                # The test has timed out, so we abort it.  However, we should
                # continue to accept packets, so we fall through.
                self._logger.error("Test in progress has timed out.")
                self._end_test_unsafe(False)
            try:
                data, _ = self._socket.recvfrom(1024)
                self._logger.info("Server received packet of length %d." %
                                   len(data))
            except socket.timeout:
                # No packets available, lets return and see if the server has
                # been shut down in the meantime.
                return
 
            # Receive packets when no test is in progress, just don't process
            # them.
            if not self._test_in_progress:
                return
 
            packet = dhcp_packet.DhcpPacket(byte_str=data)
            if not packet.is_valid:
                self._logger.warning("Server received an invalid packet over a "
                                     "DHCP port?")
                return
 
            logging.debug("Server received a DHCP packet: %s." % packet)
            if len(self._handling_rules) < 1:
                self._logger.info("No handling rule for packet: %s." %
                                  str(packet))
                self._end_test_unsafe(False)
                return
 
            handling_rule = self._handling_rules[0]
            response_code = handling_rule.handle(packet)
            logging.info("Handler gave response: %d" % response_code)
            if response_code & dhcp_handling_rule.RESPONSE_POP_HANDLER:
                self._handling_rules.pop(0)
 
            if response_code & dhcp_handling_rule.RESPONSE_HAVE_RESPONSE:
                for response_instance in range(
                        handling_rule.response_packet_count):
                    response = handling_rule.respond(packet)
                    if not self._send_response_unsafe(response):
                        self._logger.error(
                                "Failed to send packet, ending test.")
                        self._end_test_unsafe(False)
                        return
 
            if response_code & dhcp_handling_rule.RESPONSE_TEST_FAILED:
                self._logger.info("Handling rule %s rejected packet %s." %
                                  (handling_rule, packet))
                self._end_test_unsafe(False)
                return
 
            if response_code & dhcp_handling_rule.RESPONSE_TEST_SUCCEEDED:
                self._end_test_unsafe(True)
                return
 
    def run(self):
        """
        Main method of the thread.  Never call this directly, since it assumes
        some setup done in start().
        """
        with self._mutex:
            if self._socket is None:
                self._logger.error("Failed to create server socket, exiting.")
                return
 
        self._logger.info("DhcpTestServer entering handling loop.")
        while not self.stopped:
            try:
                self._loop_body()
                # Python does not have waiting queues on Lock objects.
                # Give other threads a change to hold the mutex by
                # forcibly releasing the GIL while we sleep.
                time.sleep(0.01)
            except Exception as e:
                with self._mutex:
                    self._end_test_unsafe(False)
                    self._exception = e
        with self._mutex:
            self._end_test_unsafe(False)
        self._logger.info("DhcpTestServer closing sockets.")
        self._teardown()
        self._logger.info("DhcpTestServer exiting.")