liyujie
2025-08-28 786ff4f4ca2374bdd9177f2e24b503d43e7a3b93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/usr/bin/python
 
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
import logging
import socket
import sys
import time
 
import common
 
from autotest_lib.client.cros import dhcp_handling_rule
from autotest_lib.client.cros import dhcp_packet
from autotest_lib.client.cros import dhcp_test_server
 
TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"
 
TEST_CLASSLESS_STATIC_ROUTE_DATA = \
        "\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \
        "\x00\xc0\xa8\x00\xfe"
 
TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [
        (18, "10.9.192.0", "172.31.155.10"),
        (0, "0.0.0.0", "192.168.0.254")
        ]
 
TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \
        "\x03eng\x06google\x03com\x00\x09marketing\xC0\x04"
 
TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com")
 
# At this time, we don't support the compression allowed in the RFC.
# This is correct and sufficient for our purposes.
TEST_DOMAIN_SEARCH_LIST_EXPECTED = \
        "\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00"
 
def bin2hex(byte_str, justification=20):
    """
    Turn big hex strings into prettier strings of hex bytes.  Group those hex
    bytes into lines justification bytes long.
    """
    chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
    groups = []
    for i in xrange(0, len(chars), justification):
        groups.append("".join(chars[i:i+justification]))
    return "\n".join(groups)
 
def test_packet_serialization():
    log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
    binary_discovery_packet = log_file.read()
    log_file.close()
    discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
    if not discovery_packet.is_valid:
        return False
    generated_string = discovery_packet.to_binary_string()
    if generated_string is None:
        print "Failed to generate string from packet object."
        return False
    if generated_string != binary_discovery_packet:
        print "Packets didn't match: "
        print "Generated: \n%s" % bin2hex(generated_string)
        print "Expected: \n%s" % bin2hex(binary_discovery_packet)
        return False
    print "test_packet_serialization PASSED"
    return True
 
def test_classless_static_route_parsing():
    parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack(
            TEST_CLASSLESS_STATIC_ROUTE_DATA)
    if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED:
        print ("Parsed binary domain list and got %s but expected %s" %
               (repr(parsed_routes),
                repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)))
        return False
    print "test_classless_static_route_parsing PASSED"
    return True
 
def test_classless_static_route_serialization():
    byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack(
            TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)
    if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA:
        # Turn the strings into printable hex strings on a single line.
        pretty_actual = bin2hex(byte_string, 100)
        pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100)
        print ("Expected to serialize %s to %s but instead got %s." %
               (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected,
                     pretty_actual))
        return False
    print "test_classless_static_route_serialization PASSED"
    return True
 
def test_domain_search_list_parsing():
    parsed_domains = dhcp_packet.DomainListOption.unpack(
            TEST_DOMAIN_SEARCH_LIST_COMPRESSED)
    # Order matters too.
    parsed_domains = tuple(parsed_domains)
    if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED:
        print ("Parsed binary domain list and got %s but expected %s" %
               (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED))
        return False
    print "test_domain_search_list_parsing PASSED"
    return True
 
def test_domain_search_list_serialization():
    byte_string = dhcp_packet.DomainListOption.pack(
            TEST_DOMAIN_SEARCH_LIST_PARSED)
    if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED:
        # Turn the strings into printable hex strings on a single line.
        pretty_actual = bin2hex(byte_string, 100)
        pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100)
        print ("Expected to serialize %s to %s but instead got %s." %
               (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual))
        return False
    print "test_domain_search_list_serialization PASSED"
    return True
 
def receive_packet(a_socket, timeout_seconds=1.0):
    data = None
    start_time = time.time()
    while data is None and start_time + timeout_seconds > time.time():
        try:
            data, _ = a_socket.recvfrom(1024)
        except socket.timeout:
            pass # We expect many timeouts.
    if data is None:
        print "Timed out before we received a response from the server."
        return None
 
    print "Client received a packet of length %d from the server." % len(data)
    packet = dhcp_packet.DhcpPacket(byte_str=data)
    if not packet.is_valid:
        print "Received an invalid response from DHCP server."
        return None
 
    return packet
 
def test_simple_server_exchange(server):
    intended_ip = "127.0.0.42"
    subnet_mask = "255.255.255.0"
    server_ip = "127.0.0.1"
    lease_time_seconds = 60
    test_timeout = 3.0
    mac_addr = "\x01\x02\x03\x04\x05\x06"
    # Build up our packets and have them request some default option values,
    # like the IP we're being assigned and the address of the server assigning
    # it.
    discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr)
    discovery_message.set_option(
            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
    request_message = dhcp_packet.DhcpPacket.create_request_packet(
            discovery_message.transaction_id,
            mac_addr)
    request_message.set_option(
            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
    # This is the pool of settings the DHCP server will seem to draw from to
    # answer queries from the client.  This information is written into packets
    # through the handling rules.
    dhcp_server_config = {
            dhcp_packet.OPTION_SERVER_ID : server_ip,
            dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
            dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds,
            dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
            }
    # Build up the handling rules for the server and start the test.
    rules = []
    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
            intended_ip,
            server_ip,
            dhcp_server_config))
    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
            intended_ip,
            server_ip,
            dhcp_server_config))
    rules[-1].is_final_handler = True
    server.start_test(rules, test_timeout)
    # Because we don't want to require root permissions to run these tests,
    # listen on the loopback device, don't broadcast, and don't use reserved
    # ports (like the actual DHCP ports).  Use 8068/8067 instead.
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client_socket.bind(("127.0.0.1", 8068))
    client_socket.settimeout(0.1)
    client_socket.sendto(discovery_message.to_binary_string(),
                         (server_ip, 8067))
 
    offer_packet = receive_packet(client_socket)
    if offer_packet is None:
        return False
 
    if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER):
        print "Type of DHCP response is not offer."
        return False
 
    if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
        print "Server didn't offer the IP we expected."
        return False
 
    print "Offer looks good to the client, sending request."
    # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages.  In
    # our unit test, we have to do this ourselves.
    request_message.set_option(
            dhcp_packet.OPTION_SERVER_ID,
            offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID))
    request_message.set_option(
            dhcp_packet.OPTION_SUBNET_MASK,
            offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK))
    request_message.set_option(
            dhcp_packet.OPTION_IP_LEASE_TIME,
            offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME))
    request_message.set_option(
            dhcp_packet.OPTION_REQUESTED_IP,
            offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP))
    # Send the REQUEST message.
    client_socket.sendto(request_message.to_binary_string(),
                         (server_ip, 8067))
    ack_packet = receive_packet(client_socket)
    if ack_packet is None:
        return False
 
    if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK):
        print "Type of DHCP response is not acknowledgement."
        return False
 
    if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
        print "Server didn't give us the IP we expected."
        return False
 
    print "Waiting for the server to finish."
    server.wait_for_test_to_finish()
    print "Server agrees that the test is over."
    if not server.last_test_passed:
        print "Server is unhappy with the test result."
        return False
 
    print "test_simple_server_exchange PASSED."
    return True
 
def test_server_dialogue():
    server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
                                             ingress_port=8067,
                                             broadcast_address="127.0.0.1",
                                             broadcast_port=8068)
    server.start()
    ret = False
    if server.is_healthy:
        ret = test_simple_server_exchange(server)
    else:
        print "Server isn't healthy, aborting."
    print "Sending server stop() signal."
    server.stop()
    print "Stop signal sent."
    return ret
 
def run_tests():
    logger = logging.getLogger("dhcp")
    logger.setLevel(logging.DEBUG)
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.DEBUG)
    logger.addHandler(stream_handler)
    retval = test_packet_serialization()
    retval &= test_classless_static_route_parsing()
    retval &= test_classless_static_route_serialization()
    retval &= test_domain_search_list_parsing()
    retval &= test_domain_search_list_serialization()
    retval &= test_server_dialogue()
    if retval:
        print "All tests PASSED."
        return 0
    else:
        print "Some tests FAILED"
        return -1
 
if __name__ == "__main__":
    sys.exit(run_tests())