# Copyright 2018 The Chromium OS Authors. All rights reserved.
|
# Use of this source code is governed by a BSD-style license that can be
|
# found in the LICENSE file.
|
|
"""Return information about routing table entries
|
|
Read and parse the system routing table. There are
|
four classes defined here: NetworkRoutes, which contains
|
information about all routes; IPv4Route, which describes
|
a single IPv4 routing table entry; IPv6Route, which
|
does the same for IPv6; and Route, which has common code
|
for IPv4Route and IPv6Route.
|
"""
|
|
ROUTES_V4_FILE = "/proc/net/route"
|
ROUTES_V6_FILE = "/proc/net/ipv6_route"
|
|
# The following constants are from <net/route.h>
|
RTF_UP = 0x0001
|
RTF_GATEWAY = 0x0002
|
RTF_HOST = 0x0004
|
# IPv6 constants from <net/route.h>
|
RTF_DEFAULT = 0x10000
|
|
import socket
|
import struct
|
|
class Route(object):
|
def __init__(self, iface, dest, gway, flags, mask):
|
self.interface = iface
|
self.destination = dest
|
self.gateway = gway
|
self.flagbits = flags
|
self.netmask = mask
|
|
def __str__(self):
|
flags = ""
|
if self.flagbits & RTF_UP:
|
flags += "U"
|
if self.flagbits & RTF_GATEWAY:
|
flags += "G"
|
if self.flagbits & RTF_HOST:
|
flags += "H"
|
if self.flagbits & RTF_DEFAULT:
|
flags += "D"
|
return "<%s dest: %s gway: %s mask: %s flags: %s>" % (
|
self.interface,
|
self._intToIp(self.destination),
|
self._intToIp(self.gateway),
|
self._intToIp(self.netmask),
|
flags)
|
|
def isUsable(self):
|
return self.flagbits & RTF_UP
|
|
def isHostRoute(self):
|
return self.flagbits & RTF_HOST
|
|
def isGatewayRoute(self):
|
return self.flagbits & RTF_GATEWAY
|
|
def isInterfaceRoute(self):
|
return (self.flagbits & RTF_GATEWAY) == 0
|
|
def matches(self, ip):
|
try:
|
return (self._ipToInt(ip) & self.netmask) == self.destination
|
except socket.error:
|
return False
|
|
|
class IPv4Route(Route):
|
def __init__(self, iface, dest, gway, flags, mask):
|
super(IPv4Route, self).__init__(
|
iface, int(dest, 16), int(gway, 16), int(flags, 16), int(mask, 16))
|
|
def _intToIp(self, addr):
|
return socket.inet_ntoa(struct.pack('@I', addr))
|
|
def _ipToInt(self, ip):
|
return struct.unpack('I', socket.inet_aton(ip))[0]
|
|
def isDefaultRoute(self):
|
return (self.flagbits & RTF_GATEWAY) and self.destination == 0
|
|
def parseIPv4Routes(routelist):
|
# The first line is headers that will allow us
|
# to correctly interpret the values in the following
|
# lines
|
headers = routelist[0].split()
|
col_map = {token: pos for (pos, token) in enumerate(headers)}
|
|
routes = []
|
for routeline in routelist[1:]:
|
route = routeline.split()
|
interface = route[col_map["Iface"]]
|
destination = route[col_map["Destination"]]
|
gateway = route[col_map["Gateway"]]
|
flags = route[col_map["Flags"]]
|
mask = route[col_map["Mask"]]
|
routes.append(IPv4Route(interface, destination, gateway, flags, mask))
|
|
return routes
|
|
|
class IPv6Route(Route):
|
def __init__(self, iface, dest, gway, flags, plen):
|
super(IPv6Route, self).__init__(
|
iface,
|
long(dest, 16),
|
long(gway, 16),
|
long(flags, 16),
|
# netmask = set first plen bits to 1, all following to 0
|
(1 << 128) - (1 << (128 - int(plen, 16))))
|
|
def _intToIp(self, addr):
|
return socket.inet_ntop(socket.AF_INET6, ("%032x" % addr).decode("hex"))
|
|
def _ipToInt(self, ip):
|
return long(socket.inet_pton(socket.AF_INET6, ip).encode("hex"), 16)
|
|
def isDefaultRoute(self):
|
return self.flagbits & RTF_DEFAULT
|
|
def parseIPv6Routes(routelist):
|
# ipv6_route has no headers, so the routing table looks like the following:
|
# Dest DestPrefix Src SrcPrefix Gateway Metric RefCnt UseCnt Flags Iface
|
routes = []
|
for routeline in routelist:
|
route = routeline.split()
|
interface = route[9]
|
destination = route[0]
|
gateway = route[4]
|
flags = route[8]
|
prefix = route[1]
|
routes.append(IPv6Route(interface, destination, gateway, flags, prefix))
|
|
return routes
|
|
|
class NetworkRoutes(object):
|
def __init__(self, routelist_v4=None, routelist_v6=None):
|
if routelist_v4 is None:
|
with open(ROUTES_V4_FILE) as routef_v4:
|
routelist_v4 = routef_v4.readlines()
|
|
self.routes = parseIPv4Routes(routelist_v4)
|
|
if routelist_v6 is None:
|
with open(ROUTES_V6_FILE) as routef_v6:
|
routelist_v6 = routef_v6.readlines()
|
|
self.routes += parseIPv6Routes(routelist_v6)
|
|
def _filterUsableRoutes(self):
|
return (rr for rr in self.routes if rr.isUsable())
|
|
def hasDefaultRoute(self, interface):
|
return any(rr for rr in self._filterUsableRoutes()
|
if (rr.interface == interface and rr.isDefaultRoute()))
|
|
def getDefaultRoutes(self):
|
return [rr for rr in self._filterUsableRoutes() if rr.isDefaultRoute()]
|
|
def hasInterfaceRoute(self, interface):
|
return any(rr for rr in self._filterUsableRoutes()
|
if (rr.interface == interface and rr.isInterfaceRoute()))
|
|
def getRouteFor(self, ip):
|
for rr in self._filterUsableRoutes():
|
if rr.matches(ip):
|
return rr
|
return None
|