## This file is part of Scapy
|
## See http://www.secdev.org/projects/scapy for more informations
|
## Copyright (C) Philippe Biondi <phil@secdev.org>
|
## This program is published under a GPLv2 license
|
|
"""
|
Generators and packet meta classes.
|
"""
|
|
###############
|
## Generators ##
|
################
|
|
from __future__ import absolute_import
|
import re,random,socket
|
import types
|
from scapy.modules.six.moves import range
|
|
class Gen(object):
|
__slots__ = []
|
def __iter__(self):
|
return iter([])
|
|
class SetGen(Gen):
|
def __init__(self, values, _iterpacket=1):
|
self._iterpacket=_iterpacket
|
if isinstance(values, (list, BasePacketList)):
|
self.values = list(values)
|
elif (isinstance(values, tuple) and (2 <= len(values) <= 3) and \
|
all(hasattr(i, "__int__") for i in values)):
|
# We use values[1] + 1 as stop value for (x)range to maintain
|
# the behavior of using tuples as field `values`
|
self.values = [range(*((int(values[0]), int(values[1]) + 1)
|
+ tuple(int(v) for v in values[2:])))]
|
else:
|
self.values = [values]
|
def transf(self, element):
|
return element
|
def __iter__(self):
|
for i in self.values:
|
if (isinstance(i, Gen) and
|
(self._iterpacket or not isinstance(i,BasePacket))) or (
|
isinstance(i, (range, types.GeneratorType))):
|
for j in i:
|
yield j
|
else:
|
yield i
|
def __repr__(self):
|
return "<SetGen %r>" % self.values
|
|
class Net(Gen):
|
"""Generate a list of IPs from a network address or a name"""
|
name = "ip"
|
ip_regex = re.compile(r"^(\*|[0-2]?[0-9]?[0-9](-[0-2]?[0-9]?[0-9])?)\.(\*|[0-2]?[0-9]?[0-9](-[0-2]?[0-9]?[0-9])?)\.(\*|[0-2]?[0-9]?[0-9](-[0-2]?[0-9]?[0-9])?)\.(\*|[0-2]?[0-9]?[0-9](-[0-2]?[0-9]?[0-9])?)(/[0-3]?[0-9])?$")
|
|
@staticmethod
|
def _parse_digit(a,netmask):
|
netmask = min(8,max(netmask,0))
|
if a == "*":
|
a = (0,256)
|
elif a.find("-") >= 0:
|
x, y = [int(d) for d in a.split('-')]
|
if x > y:
|
y = x
|
a = (x & (0xff<<netmask) , max(y, (x | (0xff>>(8-netmask))))+1)
|
else:
|
a = (int(a) & (0xff<<netmask),(int(a) | (0xff>>(8-netmask)))+1)
|
return a
|
|
@classmethod
|
def _parse_net(cls, net):
|
tmp=net.split('/')+["32"]
|
if not cls.ip_regex.match(net):
|
tmp[0]=socket.gethostbyname(tmp[0])
|
netmask = int(tmp[1])
|
ret_list = [cls._parse_digit(x, y-netmask) for (x, y) in zip(tmp[0].split('.'), [8, 16, 24, 32])]
|
return ret_list, netmask
|
|
def __init__(self, net):
|
self.repr=net
|
self.parsed,self.netmask = self._parse_net(net)
|
|
def __str__(self):
|
try:
|
return next(self.__iter__())
|
except StopIteration:
|
return None
|
|
def __iter__(self):
|
for d in range(*self.parsed[3]):
|
for c in range(*self.parsed[2]):
|
for b in range(*self.parsed[1]):
|
for a in range(*self.parsed[0]):
|
yield "%i.%i.%i.%i" % (a,b,c,d)
|
def choice(self):
|
ip = []
|
for v in self.parsed:
|
ip.append(str(random.randint(v[0],v[1]-1)))
|
return ".".join(ip)
|
|
def __repr__(self):
|
return "Net(%r)" % self.repr
|
def __eq__(self, other):
|
if hasattr(other, "parsed"):
|
p2 = other.parsed
|
else:
|
p2,nm2 = self._parse_net(other)
|
return self.parsed == p2
|
def __contains__(self, other):
|
if hasattr(other, "parsed"):
|
p2 = other.parsed
|
else:
|
p2,nm2 = self._parse_net(other)
|
for (a1,b1),(a2,b2) in zip(self.parsed,p2):
|
if a1 > a2 or b1 < b2:
|
return False
|
return True
|
def __rcontains__(self, other):
|
return self in self.__class__(other)
|
|
|
class OID(Gen):
|
name = "OID"
|
def __init__(self, oid):
|
self.oid = oid
|
self.cmpt = []
|
fmt = []
|
for i in oid.split("."):
|
if "-" in i:
|
fmt.append("%i")
|
self.cmpt.append(tuple(map(int, i.split("-"))))
|
else:
|
fmt.append(i)
|
self.fmt = ".".join(fmt)
|
def __repr__(self):
|
return "OID(%r)" % self.oid
|
def __iter__(self):
|
ii = [k[0] for k in self.cmpt]
|
while True:
|
yield self.fmt % tuple(ii)
|
i = 0
|
while True:
|
if i >= len(ii):
|
raise StopIteration
|
if ii[i] < self.cmpt[i][1]:
|
ii[i]+=1
|
break
|
else:
|
ii[i] = self.cmpt[i][0]
|
i += 1
|
|
|
|
######################################
|
## Packet abstract and base classes ##
|
######################################
|
|
class Packet_metaclass(type):
|
def __new__(cls, name, bases, dct):
|
if "fields_desc" in dct: # perform resolution of references to other packets
|
current_fld = dct["fields_desc"]
|
resolved_fld = []
|
for f in current_fld:
|
if isinstance(f, Packet_metaclass): # reference to another fields_desc
|
for f2 in f.fields_desc:
|
resolved_fld.append(f2)
|
else:
|
resolved_fld.append(f)
|
else: # look for a fields_desc in parent classes
|
resolved_fld = None
|
for b in bases:
|
if hasattr(b,"fields_desc"):
|
resolved_fld = b.fields_desc
|
break
|
|
if resolved_fld: # perform default value replacements
|
final_fld = []
|
for f in resolved_fld:
|
if f.name in dct:
|
f = f.copy()
|
f.default = dct[f.name]
|
del(dct[f.name])
|
final_fld.append(f)
|
|
dct["fields_desc"] = final_fld
|
|
if "__slots__" not in dct:
|
dct["__slots__"] = []
|
for attr in ["name", "overload_fields"]:
|
try:
|
dct["_%s" % attr] = dct.pop(attr)
|
except KeyError:
|
pass
|
newcls = super(Packet_metaclass, cls).__new__(cls, name, bases, dct)
|
newcls.__all_slots__ = set(
|
attr
|
for cls in newcls.__mro__ if hasattr(cls, "__slots__")
|
for attr in cls.__slots__
|
)
|
|
if hasattr(newcls, "aliastypes"):
|
newcls.aliastypes = [newcls] + newcls.aliastypes
|
else:
|
newcls.aliastypes = [newcls]
|
|
if hasattr(newcls,"register_variant"):
|
newcls.register_variant()
|
for f in newcls.fields_desc:
|
if hasattr(f, "register_owner"):
|
f.register_owner(newcls)
|
from scapy import config
|
config.conf.layers.register(newcls)
|
return newcls
|
|
def __getattr__(self, attr):
|
for k in self.fields_desc:
|
if k.name == attr:
|
return k
|
raise AttributeError(attr)
|
|
def __call__(cls, *args, **kargs):
|
if "dispatch_hook" in cls.__dict__:
|
try:
|
cls = cls.dispatch_hook(*args, **kargs)
|
except:
|
from scapy import config
|
if config.conf.debug_dissector:
|
raise
|
cls = config.conf.raw_layer
|
i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
|
i.__init__(*args, **kargs)
|
return i
|
|
class Field_metaclass(type):
|
def __new__(cls, name, bases, dct):
|
if "__slots__" not in dct:
|
dct["__slots__"] = []
|
newcls = super(Field_metaclass, cls).__new__(cls, name, bases, dct)
|
return newcls
|
|
class NewDefaultValues(Packet_metaclass):
|
"""NewDefaultValues is deprecated (not needed anymore)
|
|
remove this:
|
__metaclass__ = NewDefaultValues
|
and it should still work.
|
"""
|
def __new__(cls, name, bases, dct):
|
from scapy.error import log_loading
|
import traceback
|
try:
|
for tb in traceback.extract_stack()+[("??",-1,None,"")]:
|
f,l,_,line = tb
|
if line.startswith("class"):
|
break
|
except:
|
f,l="??",-1
|
raise
|
log_loading.warning("Deprecated (no more needed) use of NewDefaultValues (%s l. %i).", f, l)
|
|
return super(NewDefaultValues, cls).__new__(cls, name, bases, dct)
|
|
class BasePacket(Gen):
|
__slots__ = []
|
|
|
#############################
|
## Packet list base class ##
|
#############################
|
|
class BasePacketList(object):
|
__slots__ = []
|