liyujie
2025-08-28 786ff4f4ca2374bdd9177f2e24b503d43e7a3b93
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
 
"""Return information about routing table entries
 
Read and parse the system routing table. There are
four classes defined here: NetworkRoutes, which contains
information about all routes; IPv4Route, which describes
a single IPv4 routing table entry; IPv6Route, which
does the same for IPv6; and Route, which has common code
for IPv4Route and IPv6Route.
"""
 
ROUTES_V4_FILE = "/proc/net/route"
ROUTES_V6_FILE = "/proc/net/ipv6_route"
 
# The following constants are from <net/route.h>
RTF_UP      = 0x0001
RTF_GATEWAY = 0x0002
RTF_HOST    = 0x0004
# IPv6 constants from <net/route.h>
RTF_DEFAULT = 0x10000
 
import socket
import struct
 
class Route(object):
    def __init__(self, iface, dest, gway, flags, mask):
        self.interface = iface
        self.destination = dest
        self.gateway = gway
        self.flagbits = flags
        self.netmask = mask
 
    def __str__(self):
        flags = ""
        if self.flagbits & RTF_UP:
            flags += "U"
        if self.flagbits & RTF_GATEWAY:
            flags += "G"
        if self.flagbits & RTF_HOST:
            flags += "H"
        if self.flagbits & RTF_DEFAULT:
            flags += "D"
        return "<%s dest: %s gway: %s mask: %s flags: %s>" % (
                self.interface,
                self._intToIp(self.destination),
                self._intToIp(self.gateway),
                self._intToIp(self.netmask),
                flags)
 
    def isUsable(self):
        return self.flagbits & RTF_UP
 
    def isHostRoute(self):
        return self.flagbits & RTF_HOST
 
    def isGatewayRoute(self):
        return self.flagbits & RTF_GATEWAY
 
    def isInterfaceRoute(self):
        return (self.flagbits & RTF_GATEWAY) == 0
 
    def matches(self, ip):
        try:
            return (self._ipToInt(ip) & self.netmask) == self.destination
        except socket.error:
            return False
 
 
class IPv4Route(Route):
    def __init__(self, iface, dest, gway, flags, mask):
        super(IPv4Route, self).__init__(
            iface, int(dest, 16), int(gway, 16), int(flags, 16), int(mask, 16))
 
    def _intToIp(self, addr):
        return socket.inet_ntoa(struct.pack('@I', addr))
 
    def _ipToInt(self, ip):
        return struct.unpack('I', socket.inet_aton(ip))[0]
 
    def isDefaultRoute(self):
        return (self.flagbits & RTF_GATEWAY) and self.destination == 0
 
def parseIPv4Routes(routelist):
    # The first line is headers that will allow us
    # to correctly interpret the values in the following
    # lines
    headers = routelist[0].split()
    col_map = {token: pos for (pos, token) in enumerate(headers)}
 
    routes = []
    for routeline in routelist[1:]:
        route = routeline.split()
        interface = route[col_map["Iface"]]
        destination = route[col_map["Destination"]]
        gateway = route[col_map["Gateway"]]
        flags = route[col_map["Flags"]]
        mask = route[col_map["Mask"]]
        routes.append(IPv4Route(interface, destination, gateway, flags, mask))
 
    return routes
 
 
class IPv6Route(Route):
    def __init__(self, iface, dest, gway, flags, plen):
        super(IPv6Route, self).__init__(
            iface,
            long(dest, 16),
            long(gway, 16),
            long(flags, 16),
            # netmask = set first plen bits to 1, all following to 0
            (1 << 128) - (1 << (128 - int(plen, 16))))
 
    def _intToIp(self, addr):
        return socket.inet_ntop(socket.AF_INET6, ("%032x" % addr).decode("hex"))
 
    def _ipToInt(self, ip):
        return long(socket.inet_pton(socket.AF_INET6, ip).encode("hex"), 16)
 
    def isDefaultRoute(self):
        return self.flagbits & RTF_DEFAULT
 
def parseIPv6Routes(routelist):
    # ipv6_route has no headers, so the routing table looks like the following:
    # Dest DestPrefix Src SrcPrefix Gateway Metric RefCnt UseCnt Flags Iface
    routes = []
    for routeline in routelist:
        route = routeline.split()
        interface = route[9]
        destination = route[0]
        gateway = route[4]
        flags = route[8]
        prefix = route[1]
        routes.append(IPv6Route(interface, destination, gateway, flags, prefix))
 
    return routes
 
 
class NetworkRoutes(object):
    def __init__(self, routelist_v4=None, routelist_v6=None):
        if routelist_v4 is None:
            with open(ROUTES_V4_FILE) as routef_v4:
                routelist_v4 = routef_v4.readlines()
 
        self.routes = parseIPv4Routes(routelist_v4)
 
        if routelist_v6 is None:
            with open(ROUTES_V6_FILE) as routef_v6:
                routelist_v6 = routef_v6.readlines()
 
        self.routes += parseIPv6Routes(routelist_v6)
 
    def _filterUsableRoutes(self):
        return (rr for rr in self.routes if rr.isUsable())
 
    def hasDefaultRoute(self, interface):
        return any(rr for rr in self._filterUsableRoutes()
                   if (rr.interface == interface and rr.isDefaultRoute()))
 
    def getDefaultRoutes(self):
        return [rr for rr in self._filterUsableRoutes() if rr.isDefaultRoute()]
 
    def hasInterfaceRoute(self, interface):
        return any(rr for rr in self._filterUsableRoutes()
                   if (rr.interface == interface and rr.isInterfaceRoute()))
 
    def getRouteFor(self, ip):
        for rr in self._filterUsableRoutes():
            if rr.matches(ip):
                return rr
        return None