## This file is part of Scapy
|
## See http://www.secdev.org/projects/scapy for more informations
|
## Copyright (C) Philippe Biondi <phil@secdev.org>
|
## This program is published under a GPLv2 license
|
|
"""
|
Packet class. Binding mechanism. fuzz() method.
|
"""
|
|
from __future__ import absolute_import
|
from __future__ import print_function
|
import re
|
import time,itertools
|
import copy
|
import subprocess
|
|
from scapy.fields import StrField, ConditionalField, Emph, PacketListField, BitField, \
|
MultiEnumField, EnumField, FlagsField
|
from scapy.config import conf
|
from scapy.compat import *
|
from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass
|
from scapy.volatile import VolatileValue
|
from scapy.utils import import_hexcap,tex_escape,colgen,get_temp_file, \
|
ContextManagerSubprocess
|
from scapy.error import Scapy_Exception, log_runtime
|
from scapy.consts import PYX
|
import scapy.modules.six as six
|
|
try:
|
import pyx
|
except ImportError:
|
pass
|
|
|
class RawVal:
|
def __init__(self, val=""):
|
self.val = val
|
def __str__(self):
|
return str(self.val)
|
def __bytes__(self):
|
return raw(self.val)
|
def __repr__(self):
|
return "<RawVal [%r]>" % self.val
|
|
|
class Packet(six.with_metaclass(Packet_metaclass, BasePacket)):
|
__slots__ = [
|
"time", "sent_time", "name", "default_fields",
|
"overload_fields", "overloaded_fields", "fields", "fieldtype",
|
"packetfields",
|
"original", "explicit", "raw_packet_cache",
|
"raw_packet_cache_fields", "_pkt", "post_transforms",
|
# then payload and underlayer
|
"payload", "underlayer",
|
"name",
|
# used for sr()
|
"_answered",
|
# used when sniffing
|
"direction", "sniffed_on"
|
]
|
name = None
|
fields_desc = []
|
overload_fields = {}
|
payload_guess = []
|
show_indent = 1
|
show_summary = True
|
|
@classmethod
|
def from_hexcap(cls):
|
return cls(import_hexcap())
|
|
@classmethod
|
def upper_bonds(self):
|
for fval,upper in self.payload_guess:
|
print("%-20s %s" % (upper.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in six.iteritems(fval))))
|
|
@classmethod
|
def lower_bonds(self):
|
for lower,fval in six.iteritems(self._overload_fields):
|
print("%-20s %s" % (lower.__name__, ", ".join("%-12s" % ("%s=%r"%i) for i in six.iteritems(fval))))
|
|
def _unpickle(self, dlist):
|
"""Used to unpack pickling"""
|
self.__init__(b"".join(dlist))
|
return self
|
|
def __reduce__(self):
|
"""Used by pickling methods"""
|
return (self.__class__, (), (self.build(),))
|
|
def __reduce_ex__(self, proto):
|
"""Used by pickling methods"""
|
return self.__reduce__()
|
|
def __getstate__(self):
|
"""Mark object as pickable"""
|
return self.__reduce__()[2]
|
|
def __setstate__(self, state):
|
"""Rebuild state using pickable methods"""
|
return self._unpickle(state)
|
|
def __deepcopy__(self, memo):
|
"""Used by copy.deepcopy"""
|
return self.copy()
|
|
def __init__(self, _pkt=b"", post_transform=None, _internal=0, _underlayer=None, **fields):
|
self.time = time.time()
|
self.sent_time = None
|
self.name = (self.__class__.__name__
|
if self._name is None else
|
self._name)
|
self.default_fields = {}
|
self.overload_fields = self._overload_fields
|
self.overloaded_fields = {}
|
self.fields = {}
|
self.fieldtype = {}
|
self.packetfields = []
|
self.payload = NoPayload()
|
self.init_fields()
|
self.underlayer = _underlayer
|
self.original = _pkt
|
self.explicit = 0
|
self.raw_packet_cache = None
|
self.raw_packet_cache_fields = None
|
if _pkt:
|
self.dissect(_pkt)
|
if not _internal:
|
self.dissection_done(self)
|
for f, v in six.iteritems(fields):
|
self.fields[f] = self.get_field(f).any2i(self, v)
|
if isinstance(post_transform, list):
|
self.post_transforms = post_transform
|
elif post_transform is None:
|
self.post_transforms = []
|
else:
|
self.post_transforms = [post_transform]
|
|
def init_fields(self):
|
"""
|
Initialize each fields of the fields_desc dict
|
"""
|
self.do_init_fields(self.fields_desc)
|
|
def do_init_fields(self, flist):
|
"""
|
Initialize each fields of the fields_desc dict
|
"""
|
for f in flist:
|
self.default_fields[f.name] = copy.deepcopy(f.default)
|
self.fieldtype[f.name] = f
|
if f.holds_packets:
|
self.packetfields.append(f)
|
|
def dissection_done(self,pkt):
|
"""DEV: will be called after a dissection is completed"""
|
self.post_dissection(pkt)
|
self.payload.dissection_done(pkt)
|
|
def post_dissection(self, pkt):
|
"""DEV: is called after the dissection of the whole packet"""
|
pass
|
|
def get_field(self, fld):
|
"""DEV: returns the field instance from the name of the field"""
|
return self.fieldtype[fld]
|
|
def add_payload(self, payload):
|
if payload is None:
|
return
|
elif not isinstance(self.payload, NoPayload):
|
self.payload.add_payload(payload)
|
else:
|
if isinstance(payload, Packet):
|
self.payload = payload
|
payload.add_underlayer(self)
|
for t in self.aliastypes:
|
if t in payload.overload_fields:
|
self.overloaded_fields = payload.overload_fields[t]
|
break
|
elif isinstance(payload, bytes):
|
self.payload = conf.raw_layer(load=payload)
|
else:
|
raise TypeError("payload must be either 'Packet' or 'bytes', not [%s]" % repr(payload))
|
def remove_payload(self):
|
self.payload.remove_underlayer(self)
|
self.payload = NoPayload()
|
self.overloaded_fields = {}
|
def add_underlayer(self, underlayer):
|
self.underlayer = underlayer
|
def remove_underlayer(self,other):
|
self.underlayer = None
|
def copy(self):
|
"""Returns a deep copy of the instance."""
|
clone = self.__class__()
|
clone.fields = self.copy_fields_dict(self.fields)
|
clone.default_fields = self.copy_fields_dict(self.default_fields)
|
clone.overloaded_fields = self.overloaded_fields.copy()
|
clone.underlayer = self.underlayer
|
clone.explicit = self.explicit
|
clone.raw_packet_cache = self.raw_packet_cache
|
clone.raw_packet_cache_fields = self.copy_fields_dict(
|
self.raw_packet_cache_fields
|
)
|
clone.post_transforms = self.post_transforms[:]
|
clone.payload = self.payload.copy()
|
clone.payload.add_underlayer(clone)
|
clone.time = self.time
|
return clone
|
|
def getfieldval(self, attr):
|
if attr in self.fields:
|
return self.fields[attr]
|
if attr in self.overloaded_fields:
|
return self.overloaded_fields[attr]
|
if attr in self.default_fields:
|
return self.default_fields[attr]
|
return self.payload.getfieldval(attr)
|
|
def getfield_and_val(self, attr):
|
if attr in self.fields:
|
return self.get_field(attr),self.fields[attr]
|
if attr in self.overloaded_fields:
|
return self.get_field(attr),self.overloaded_fields[attr]
|
if attr in self.default_fields:
|
return self.get_field(attr),self.default_fields[attr]
|
|
def __getattr__(self, attr):
|
try:
|
fld, v = self.getfield_and_val(attr)
|
except TypeError:
|
return self.payload.__getattr__(attr)
|
if fld is not None:
|
return fld.i2h(self, v)
|
return v
|
|
def setfieldval(self, attr, val):
|
if attr in self.default_fields:
|
fld = self.get_field(attr)
|
if fld is None:
|
any2i = lambda x,y: y
|
else:
|
any2i = fld.any2i
|
self.fields[attr] = any2i(self, val)
|
self.explicit = 0
|
self.raw_packet_cache = None
|
self.raw_packet_cache_fields = None
|
elif attr == "payload":
|
self.remove_payload()
|
self.add_payload(val)
|
else:
|
self.payload.setfieldval(attr,val)
|
|
def __setattr__(self, attr, val):
|
if attr in self.__all_slots__:
|
return object.__setattr__(self, attr, val)
|
try:
|
return self.setfieldval(attr,val)
|
except AttributeError:
|
pass
|
return object.__setattr__(self, attr, val)
|
|
def delfieldval(self, attr):
|
if attr in self.fields:
|
del(self.fields[attr])
|
self.explicit = 0 # in case a default value must be explicited
|
self.raw_packet_cache = None
|
self.raw_packet_cache_fields = None
|
elif attr in self.default_fields:
|
pass
|
elif attr == "payload":
|
self.remove_payload()
|
else:
|
self.payload.delfieldval(attr)
|
|
def __delattr__(self, attr):
|
if attr == "payload":
|
return self.remove_payload()
|
if attr in self.__all_slots__:
|
return object.__delattr__(self, attr)
|
try:
|
return self.delfieldval(attr)
|
except AttributeError:
|
pass
|
return object.__delattr__(self, attr)
|
|
def _superdir(self):
|
"""
|
Return a list of slots and methods, including those from subclasses.
|
"""
|
attrs = set()
|
cls = self.__class__
|
if hasattr(cls, '__all_slots__'):
|
attrs.update(cls.__all_slots__)
|
for bcls in cls.__mro__:
|
if hasattr(bcls, '__dict__'):
|
attrs.update(bcls.__dict__)
|
return attrs
|
|
def __dir__(self):
|
"""
|
Add fields to tab completion list.
|
"""
|
return sorted(itertools.chain(self._superdir(), self.default_fields))
|
|
def __repr__(self):
|
s = ""
|
ct = conf.color_theme
|
for f in self.fields_desc:
|
if isinstance(f, ConditionalField) and not f._evalcond(self):
|
continue
|
if f.name in self.fields:
|
val = f.i2repr(self, self.fields[f.name])
|
elif f.name in self.overloaded_fields:
|
val = f.i2repr(self, self.overloaded_fields[f.name])
|
else:
|
continue
|
if isinstance(f, Emph) or f in conf.emph:
|
ncol = ct.emph_field_name
|
vcol = ct.emph_field_value
|
else:
|
ncol = ct.field_name
|
vcol = ct.field_value
|
|
|
s += " %s%s%s" % (ncol(f.name),
|
ct.punct("="),
|
vcol(val))
|
return "%s%s %s %s%s%s"% (ct.punct("<"),
|
ct.layer_name(self.__class__.__name__),
|
s,
|
ct.punct("|"),
|
repr(self.payload),
|
ct.punct(">"))
|
def __str__(self):
|
return str(self.build())
|
def __bytes__(self):
|
return self.build()
|
def __div__(self, other):
|
if isinstance(other, Packet):
|
cloneA = self.copy()
|
cloneB = other.copy()
|
cloneA.add_payload(cloneB)
|
return cloneA
|
elif isinstance(other, (bytes, str)):
|
return self/conf.raw_layer(load=other)
|
else:
|
return other.__rdiv__(self)
|
__truediv__ = __div__
|
def __rdiv__(self, other):
|
if isinstance(other, (bytes, str)):
|
return conf.raw_layer(load=other)/self
|
else:
|
raise TypeError
|
__rtruediv__ = __rdiv__
|
def __mul__(self, other):
|
if isinstance(other, int):
|
return [self]*other
|
else:
|
raise TypeError
|
def __rmul__(self,other):
|
return self.__mul__(other)
|
|
def __nonzero__(self):
|
return True
|
__bool__ = __nonzero__
|
def __len__(self):
|
return len(self.__bytes__())
|
def copy_field_value(self, fieldname, value):
|
return self.get_field(fieldname).do_copy(value)
|
def copy_fields_dict(self, fields):
|
if fields is None:
|
return None
|
return {fname: self.copy_field_value(fname, fval)
|
for fname, fval in six.iteritems(fields)}
|
def self_build(self, field_pos_list=None):
|
"""
|
Create the default layer regarding fields_desc dict
|
|
:param field_pos_list:
|
"""
|
if self.raw_packet_cache is not None:
|
for fname, fval in six.iteritems(self.raw_packet_cache_fields):
|
if self.getfieldval(fname) != fval:
|
self.raw_packet_cache = None
|
self.raw_packet_cache_fields = None
|
break
|
if self.raw_packet_cache is not None:
|
return self.raw_packet_cache
|
p=b""
|
for f in self.fields_desc:
|
val = self.getfieldval(f.name)
|
if isinstance(val, RawVal):
|
sval = raw(val)
|
p += sval
|
if field_pos_list is not None:
|
field_pos_list.append( (f.name, sval.encode("string_escape"), len(p), len(sval) ) )
|
else:
|
p = f.addfield(self, p, val)
|
return p
|
|
def do_build_payload(self):
|
"""
|
Create the default version of the payload layer
|
|
:return: a string of payload layer
|
"""
|
return self.payload.do_build()
|
|
def do_build(self):
|
"""
|
Create the default version of the layer
|
|
:return: a string of the packet with the payload
|
"""
|
if not self.explicit:
|
self = next(iter(self))
|
pkt = self.self_build()
|
for t in self.post_transforms:
|
pkt = t(pkt)
|
pay = self.do_build_payload()
|
if self.raw_packet_cache is None:
|
return self.post_build(pkt, pay)
|
else:
|
return pkt + pay
|
|
def build_padding(self):
|
return self.payload.build_padding()
|
|
def build(self):
|
"""
|
Create the current layer
|
|
:return: string of the packet with the payload
|
"""
|
p = self.do_build()
|
p += self.build_padding()
|
p = self.build_done(p)
|
return p
|
|
def post_build(self, pkt, pay):
|
"""
|
DEV: called right after the current layer is build.
|
|
:param str pkt: the current packet (build by self_buil function)
|
:param str pay: the packet payload (build by do_build_payload function)
|
:return: a string of the packet with the payload
|
"""
|
return pkt+pay
|
|
def build_done(self, p):
|
return self.payload.build_done(p)
|
|
def do_build_ps(self):
|
p = b""
|
pl = []
|
q = b""
|
for f in self.fields_desc:
|
if isinstance(f, ConditionalField) and not f._evalcond(self):
|
continue
|
p = f.addfield(self, p, self.getfieldval(f.name) )
|
if isinstance(p, bytes):
|
r = p[len(q):]
|
q = p
|
else:
|
r = b""
|
pl.append( (f, f.i2repr(self,self.getfieldval(f.name)), r) )
|
|
pkt,lst = self.payload.build_ps(internal=1)
|
p += pkt
|
lst.append( (self, pl) )
|
|
return p,lst
|
|
def build_ps(self,internal=0):
|
p,lst = self.do_build_ps()
|
# if not internal:
|
# pkt = self
|
# while pkt.haslayer(conf.padding_layer):
|
# pkt = pkt.getlayer(conf.padding_layer)
|
# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
|
# p += pkt.load
|
# pkt = pkt.payload
|
return p,lst
|
|
|
def psdump(self, filename=None, **kargs):
|
"""
|
psdump(filename=None, layer_shift=0, rebuild=1)
|
|
Creates an EPS file describing a packet. If filename is not provided a
|
temporary file is created and gs is called.
|
|
:param filename: the file's filename
|
"""
|
canvas = self.canvas_dump(**kargs)
|
if filename is None:
|
fname = get_temp_file(autoext=".eps")
|
canvas.writeEPSfile(fname)
|
with ContextManagerSubprocess("psdump()", conf.prog.psreader):
|
subprocess.Popen([conf.prog.psreader, fname])
|
else:
|
canvas.writeEPSfile(filename)
|
|
def pdfdump(self, filename=None, **kargs):
|
"""
|
pdfdump(filename=None, layer_shift=0, rebuild=1)
|
|
Creates a PDF file describing a packet. If filename is not provided a
|
temporary file is created and xpdf is called.
|
|
:param filename: the file's filename
|
"""
|
canvas = self.canvas_dump(**kargs)
|
if filename is None:
|
fname = get_temp_file(autoext=".pdf")
|
canvas.writePDFfile(fname)
|
with ContextManagerSubprocess("pdfdump()", conf.prog.pdfreader):
|
subprocess.Popen([conf.prog.pdfreader, fname])
|
else:
|
canvas.writePDFfile(filename)
|
|
|
def canvas_dump(self, layer_shift=0, rebuild=1):
|
if PYX == 0:
|
raise ImportError("PyX and its depedencies must be installed")
|
canvas = pyx.canvas.canvas()
|
if rebuild:
|
p,t = self.__class__(raw(self)).build_ps()
|
else:
|
p,t = self.build_ps()
|
YTXT=len(t)
|
for n,l in t:
|
YTXT += len(l)
|
YTXT = float(YTXT)
|
YDUMP=YTXT
|
|
XSTART = 1
|
XDSTART = 10
|
y = 0.0
|
yd = 0.0
|
xd = 0
|
XMUL= 0.55
|
YMUL = 0.4
|
|
backcolor=colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
|
forecolor=colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
|
# backcolor=makecol(0.376, 0.729, 0.525, 1.0)
|
|
|
def hexstr(x):
|
s = []
|
for c in x:
|
s.append("%02x" % orb(c))
|
return " ".join(s)
|
|
|
def make_dump_txt(x,y,txt):
|
return pyx.text.text(XDSTART+x*XMUL, (YDUMP-y)*YMUL, r"\tt{%s}"%hexstr(txt), [pyx.text.size.Large])
|
|
def make_box(o):
|
return pyx.box.rect(o.left(), o.bottom(), o.width(), o.height(), relcenter=(0.5,0.5))
|
|
def make_frame(lst):
|
if len(lst) == 1:
|
b = lst[0].bbox()
|
b.enlarge(pyx.unit.u_pt)
|
return b.path()
|
else:
|
fb = lst[0].bbox()
|
fb.enlarge(pyx.unit.u_pt)
|
lb = lst[-1].bbox()
|
lb.enlarge(pyx.unit.u_pt)
|
if len(lst) == 2 and fb.left() > lb.right():
|
return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
|
pyx.path.lineto(fb.left(), fb.top()),
|
pyx.path.lineto(fb.left(), fb.bottom()),
|
pyx.path.lineto(fb.right(), fb.bottom()),
|
pyx.path.moveto(lb.left(), lb.top()),
|
pyx.path.lineto(lb.right(), lb.top()),
|
pyx.path.lineto(lb.right(), lb.bottom()),
|
pyx.path.lineto(lb.left(), lb.bottom()))
|
else:
|
# XXX
|
gb = lst[1].bbox()
|
if gb != lb:
|
gb.enlarge(pyx.unit.u_pt)
|
kb = lst[-2].bbox()
|
if kb != gb and kb != lb:
|
kb.enlarge(pyx.unit.u_pt)
|
return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
|
pyx.path.lineto(fb.right(), fb.top()),
|
pyx.path.lineto(fb.right(), kb.bottom()),
|
pyx.path.lineto(lb.right(), kb.bottom()),
|
pyx.path.lineto(lb.right(), lb.bottom()),
|
pyx.path.lineto(lb.left(), lb.bottom()),
|
pyx.path.lineto(lb.left(), gb.top()),
|
pyx.path.lineto(fb.left(), gb.top()),
|
pyx.path.closepath(),)
|
|
|
def make_dump(s, shift=0, y=0, col=None, bkcol=None, larg=16):
|
c = pyx.canvas.canvas()
|
tlist = []
|
while s:
|
dmp,s = s[:larg-shift],s[larg-shift:]
|
txt = make_dump_txt(shift, y, dmp)
|
tlist.append(txt)
|
shift += len(dmp)
|
if shift >= 16:
|
shift = 0
|
y += 1
|
if col is None:
|
col = pyx.color.rgb.red
|
if bkcol is None:
|
col = pyx.color.rgb.white
|
c.stroke(make_frame(tlist),[col,pyx.deco.filled([bkcol]),pyx.style.linewidth.Thick])
|
for txt in tlist:
|
c.insert(txt)
|
return c, tlist[-1].bbox(), shift, y
|
|
|
last_shift,last_y=0,0.0
|
while t:
|
bkcol = next(backcolor)
|
proto,fields = t.pop()
|
y += 0.5
|
pt = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % proto.name, [ pyx.text.size.Large])
|
y += 1
|
ptbb=pt.bbox()
|
ptbb.enlarge(pyx.unit.u_pt*2)
|
canvas.stroke(ptbb.path(),[pyx.color.rgb.black, pyx.deco.filled([bkcol])])
|
canvas.insert(pt)
|
for fname, fval, fdump in fields:
|
col = next(forecolor)
|
ft = pyx.text.text(XSTART, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fname.name))
|
if isinstance(fval, str):
|
if len(fval) > 18:
|
fval = fval[:18]+"[...]"
|
else:
|
fval=""
|
vt = pyx.text.text(XSTART+3, (YTXT-y)*YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval))
|
y += 1.0
|
if fdump:
|
dt,target,last_shift,last_y = make_dump(fdump, last_shift, last_y, col, bkcol)
|
|
dtb = dt.bbox()
|
dtb=target
|
vtb = vt.bbox()
|
bxvt = make_box(vtb)
|
bxdt = make_box(dtb)
|
dtb.enlarge(pyx.unit.u_pt)
|
try:
|
if yd < 0:
|
cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=-90)
|
else:
|
cnx = pyx.connector.curve(bxvt,bxdt,absangle1=0, absangle2=90)
|
except:
|
pass
|
else:
|
canvas.stroke(cnx,[pyx.style.linewidth.thin,pyx.deco.earrow.small,col])
|
|
canvas.insert(dt)
|
|
canvas.insert(ft)
|
canvas.insert(vt)
|
last_y += layer_shift
|
|
return canvas
|
|
|
|
def extract_padding(self, s):
|
"""
|
DEV: to be overloaded to extract current layer's padding.
|
|
:param str s: the current layer
|
:return: a couple of strings (actual layer, padding)
|
"""
|
return s,None
|
|
def post_dissect(self, s):
|
"""DEV: is called right after the current layer has been dissected"""
|
return s
|
|
def pre_dissect(self, s):
|
"""DEV: is called right before the current layer is dissected"""
|
return s
|
|
def do_dissect(self, s):
|
s = raw(s)
|
_raw = s
|
self.raw_packet_cache_fields = {}
|
for f in self.fields_desc:
|
if not s:
|
break
|
s, fval = f.getfield(self, s)
|
# We need to track fields with mutable values to discard
|
# .raw_packet_cache when needed.
|
if f.islist or f.holds_packets or f.ismutable:
|
self.raw_packet_cache_fields[f.name] = f.do_copy(fval)
|
self.fields[f.name] = fval
|
assert(_raw.endswith(raw(s)))
|
self.raw_packet_cache = _raw[:-len(s)] if s else _raw
|
self.explicit = 1
|
return s
|
|
def do_dissect_payload(self, s):
|
"""
|
Perform the dissection of the layer's payload
|
|
:param str s: the raw layer
|
"""
|
if s:
|
cls = self.guess_payload_class(s)
|
try:
|
p = cls(s, _internal=1, _underlayer=self)
|
except KeyboardInterrupt:
|
raise
|
except:
|
if conf.debug_dissector:
|
if isinstance(cls,type) and issubclass(cls,Packet):
|
log_runtime.error("%s dissector failed" % cls.__name__)
|
else:
|
log_runtime.error("%s.guess_payload_class() returned [%s]" % (self.__class__.__name__,repr(cls)))
|
if cls is not None:
|
raise
|
p = conf.raw_layer(s, _internal=1, _underlayer=self)
|
self.add_payload(p)
|
|
def dissect(self, s):
|
s = self.pre_dissect(s)
|
|
s = self.do_dissect(s)
|
|
s = self.post_dissect(s)
|
|
payl,pad = self.extract_padding(s)
|
self.do_dissect_payload(payl)
|
if pad and conf.padding:
|
self.add_payload(conf.padding_layer(pad))
|
|
|
def guess_payload_class(self, payload):
|
"""
|
DEV: Guesses the next payload class from layer bonds.
|
Can be overloaded to use a different mechanism.
|
|
:param str payload: the layer's payload
|
:return: the payload class
|
"""
|
for t in self.aliastypes:
|
for fval, cls in t.payload_guess:
|
ok = 1
|
for k, v in six.iteritems(fval):
|
if not hasattr(self, k) or v != self.getfieldval(k):
|
ok = 0
|
break
|
if ok:
|
return cls
|
return self.default_payload_class(payload)
|
|
def default_payload_class(self, payload):
|
"""
|
DEV: Returns the default payload class if nothing has been found by the
|
guess_payload_class() method.
|
|
:param str payload: the layer's payload
|
:return: the default payload class define inside the configuration file
|
"""
|
return conf.raw_layer
|
|
def hide_defaults(self):
|
"""Removes fields' values that are the same as default values."""
|
for k, v in list(self.fields.items()): # use list(): self.fields is modified in the loop
|
v = self.fields[k]
|
if k in self.default_fields:
|
if self.default_fields[k] == v:
|
del self.fields[k]
|
self.payload.hide_defaults()
|
|
def clone_with(self, payload=None, **kargs):
|
pkt = self.__class__()
|
pkt.explicit = 1
|
pkt.fields = kargs
|
pkt.default_fields = self.copy_fields_dict(self.default_fields)
|
pkt.overloaded_fields = self.overloaded_fields.copy()
|
pkt.time = self.time
|
pkt.underlayer = self.underlayer
|
pkt.post_transforms = self.post_transforms
|
pkt.raw_packet_cache = self.raw_packet_cache
|
pkt.raw_packet_cache_fields = self.copy_fields_dict(
|
self.raw_packet_cache_fields
|
)
|
if payload is not None:
|
pkt.add_payload(payload)
|
return pkt
|
|
def __iter__(self):
|
def loop(todo, done, self=self):
|
if todo:
|
eltname = todo.pop()
|
elt = self.getfieldval(eltname)
|
if not isinstance(elt, Gen):
|
if self.get_field(eltname).islist:
|
elt = SetGen([elt])
|
else:
|
elt = SetGen(elt)
|
for e in elt:
|
done[eltname]=e
|
for x in loop(todo[:], done):
|
yield x
|
else:
|
if isinstance(self.payload,NoPayload):
|
payloads = [None]
|
else:
|
payloads = self.payload
|
for payl in payloads:
|
done2=done.copy()
|
for k in done2:
|
if isinstance(done2[k], VolatileValue):
|
done2[k] = done2[k]._fix()
|
pkt = self.clone_with(payload=payl, **done2)
|
yield pkt
|
|
if self.explicit or self.raw_packet_cache is not None:
|
todo = []
|
done = self.fields
|
else:
|
todo = [k for (k,v) in itertools.chain(six.iteritems(self.default_fields),
|
six.iteritems(self.overloaded_fields))
|
if isinstance(v, VolatileValue)] + list(self.fields.keys())
|
done = {}
|
return loop(todo, done)
|
|
def __gt__(self, other):
|
"""True if other is an answer from self (self ==> other)."""
|
if isinstance(other, Packet):
|
return other < self
|
elif isinstance(other, bytes):
|
return 1
|
else:
|
raise TypeError((self, other))
|
def __lt__(self, other):
|
"""True if self is an answer from other (other ==> self)."""
|
if isinstance(other, Packet):
|
return self.answers(other)
|
elif isinstance(other, bytes):
|
return 1
|
else:
|
raise TypeError((self, other))
|
|
def __eq__(self, other):
|
if not isinstance(other, self.__class__):
|
return False
|
for f in self.fields_desc:
|
if f not in other.fields_desc:
|
return False
|
if self.getfieldval(f.name) != other.getfieldval(f.name):
|
return False
|
return self.payload == other.payload
|
|
def __ne__(self, other):
|
return not self.__eq__(other)
|
|
def hashret(self):
|
"""DEV: returns a string that has the same value for a request and its answer."""
|
return self.payload.hashret()
|
def answers(self, other):
|
"""DEV: true if self is an answer from other"""
|
if other.__class__ == self.__class__:
|
return self.payload.answers(other.payload)
|
return 0
|
|
def haslayer(self, cls):
|
"""true if self has a layer that is an instance of cls. Superseded by "cls in self" syntax."""
|
if self.__class__ == cls or self.__class__.__name__ == cls:
|
return 1
|
for f in self.packetfields:
|
fvalue_gen = self.getfieldval(f.name)
|
if fvalue_gen is None:
|
continue
|
if not f.islist:
|
fvalue_gen = SetGen(fvalue_gen,_iterpacket=0)
|
for fvalue in fvalue_gen:
|
if isinstance(fvalue, Packet):
|
ret = fvalue.haslayer(cls)
|
if ret:
|
return ret
|
return self.payload.haslayer(cls)
|
|
def getlayer(self, cls, nb=1, _track=None, _subclass=False, **flt):
|
"""Return the nb^th layer that is an instance of cls, matching flt
|
values.
|
|
"""
|
if _subclass:
|
match = lambda cls1, cls2: issubclass(cls1, cls2)
|
else:
|
match = lambda cls1, cls2: cls1 == cls2
|
if isinstance(cls, int):
|
nb = cls+1
|
cls = None
|
if isinstance(cls, str) and "." in cls:
|
ccls,fld = cls.split(".",1)
|
else:
|
ccls,fld = cls,None
|
if cls is None or match(self.__class__, cls) or self.__class__.__name__ == ccls:
|
if all(self.getfieldval(fldname) == fldvalue
|
for fldname, fldvalue in six.iteritems(flt)):
|
if nb == 1:
|
if fld is None:
|
return self
|
else:
|
return self.getfieldval(fld)
|
else:
|
nb -=1
|
for f in self.packetfields:
|
fvalue_gen = self.getfieldval(f.name)
|
if fvalue_gen is None:
|
continue
|
if not f.islist:
|
fvalue_gen = SetGen(fvalue_gen,_iterpacket=0)
|
for fvalue in fvalue_gen:
|
if isinstance(fvalue, Packet):
|
track=[]
|
ret = fvalue.getlayer(cls, nb=nb, _track=track,
|
_subclass=_subclass)
|
if ret is not None:
|
return ret
|
nb = track[0]
|
return self.payload.getlayer(cls, nb=nb, _track=_track,
|
_subclass=_subclass, **flt)
|
|
def firstlayer(self):
|
q = self
|
while q.underlayer is not None:
|
q = q.underlayer
|
return q
|
|
def __getitem__(self, cls):
|
if isinstance(cls, slice):
|
lname = cls.start
|
if cls.stop:
|
ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
|
else:
|
ret = self.getlayer(cls.start, **(cls.step or {}))
|
else:
|
lname = cls
|
ret = self.getlayer(cls)
|
if ret is None:
|
if isinstance(lname, Packet_metaclass):
|
lname = lname.__name__
|
elif not isinstance(lname, bytes):
|
lname = repr(lname)
|
raise IndexError("Layer [%s] not found" % lname)
|
return ret
|
|
def __delitem__(self, cls):
|
del(self[cls].underlayer.payload)
|
|
def __setitem__(self, cls, val):
|
self[cls].underlayer.payload = val
|
|
def __contains__(self, cls):
|
""""cls in self" returns true if self has a layer which is an instance of cls."""
|
return self.haslayer(cls)
|
|
def route(self):
|
return (None,None,None)
|
|
def fragment(self, *args, **kargs):
|
return self.payload.fragment(*args, **kargs)
|
|
|
def display(self,*args,**kargs): # Deprecated. Use show()
|
"""Deprecated. Use show() method."""
|
self.show(*args,**kargs)
|
|
def _show_or_dump(self, dump=False, indent=3, lvl="", label_lvl="", first_call=True):
|
"""
|
Internal method that shows or dumps a hierarchical view of a packet.
|
Called by show.
|
|
:param dump: determine if it prints or returns the string value
|
:param int indent: the size of indentation for each layer
|
:param str lvl: additional information about the layer lvl
|
:param str label_lvl: additional information about the layer fields
|
:param first_call: determine if the current function is the first
|
:return: return a hierarchical view if dump, else print it
|
"""
|
|
if dump:
|
from scapy.themes import AnsiColorTheme
|
ct = AnsiColorTheme() # No color for dump output
|
else:
|
ct = conf.color_theme
|
s = "%s%s %s %s \n" % (label_lvl,
|
ct.punct("###["),
|
ct.layer_name(self.name),
|
ct.punct("]###"))
|
for f in self.fields_desc:
|
if isinstance(f, ConditionalField) and not f._evalcond(self):
|
continue
|
if isinstance(f, Emph) or f in conf.emph:
|
ncol = ct.emph_field_name
|
vcol = ct.emph_field_value
|
else:
|
ncol = ct.field_name
|
vcol = ct.field_value
|
fvalue = self.getfieldval(f.name)
|
if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)):
|
s += "%s \\%-10s\\\n" % (label_lvl+lvl, ncol(f.name))
|
fvalue_gen = SetGen(fvalue,_iterpacket=0)
|
for fvalue in fvalue_gen:
|
s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl+lvl+" |", first_call=False)
|
else:
|
begn = "%s %-10s%s " % (label_lvl+lvl,
|
ncol(f.name),
|
ct.punct("="),)
|
reprval = f.i2repr(self,fvalue)
|
if isinstance(reprval, str):
|
reprval = reprval.replace("\n", "\n"+" "*(len(label_lvl)
|
+len(lvl)
|
+len(f.name)
|
+4))
|
s += "%s%s\n" % (begn,vcol(reprval))
|
if self.payload:
|
s += self.payload._show_or_dump(dump=dump, indent=indent, lvl=lvl+(" "*indent*self.show_indent), label_lvl=label_lvl, first_call=False)
|
|
if first_call and not dump:
|
print(s)
|
else:
|
return s
|
|
def show(self, dump=False, indent=3, lvl="", label_lvl=""):
|
"""
|
Prints or returns (when "dump" is true) a hierarchical view of the
|
packet.
|
|
:param dump: determine if it prints or returns the string value
|
:param int indent: the size of indentation for each layer
|
:param str lvl: additional information about the layer lvl
|
:param str label_lvl: additional information about the layer fields
|
:return: return a hierarchical view if dump, else print it
|
"""
|
return self._show_or_dump(dump, indent, lvl, label_lvl)
|
|
def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
|
"""
|
Prints or returns (when "dump" is true) a hierarchical view of an
|
assembled version of the packet, so that automatic fields are
|
calculated (checksums, etc.)
|
|
:param dump: determine if it prints or returns the string value
|
:param int indent: the size of indentation for each layer
|
:param str lvl: additional information about the layer lvl
|
:param str label_lvl: additional information about the layer fields
|
:return: return a hierarchical view if dump, else print it
|
"""
|
return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
|
|
def sprintf(self, fmt, relax=1):
|
"""sprintf(format, [relax=1]) -> str
|
where format is a string that can include directives. A directive begins and
|
ends by % and has the following format %[fmt[r],][cls[:nb].]field%.
|
|
fmt is a classic printf directive, "r" can be appended for raw substitution
|
(ex: IP.flags=0x18 instead of SA), nb is the number of the layer we want
|
(ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
|
Special case : "%.time%" is the creation time.
|
Ex : p.sprintf("%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
|
"%03xr,IP.proto% %r,TCP.flags%")
|
|
Moreover, the format string can include conditional statements. A conditional
|
statement looks like : {layer:string} where layer is a layer name, and string
|
is the string to insert in place of the condition if it is true, i.e. if layer
|
is present. If layer is preceded by a "!", the result is inverted. Conditions
|
can be imbricated. A valid statement can be :
|
p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
|
p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
|
|
A side effect is that, to obtain "{" and "}" characters, you must use
|
"%(" and "%)".
|
"""
|
|
escape = { "%": "%",
|
"(": "{",
|
")": "}" }
|
|
|
# Evaluate conditions
|
while "{" in fmt:
|
i = fmt.rindex("{")
|
j = fmt[i+1:].index("}")
|
cond = fmt[i+1:i+j+1]
|
k = cond.find(":")
|
if k < 0:
|
raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)"%cond)
|
cond,format = cond[:k],cond[k+1:]
|
res = False
|
if cond[0] == "!":
|
res = True
|
cond = cond[1:]
|
if self.haslayer(cond):
|
res = not res
|
if not res:
|
format = ""
|
fmt = fmt[:i]+format+fmt[i+j+2:]
|
|
# Evaluate directives
|
s = ""
|
while "%" in fmt:
|
i = fmt.index("%")
|
s += fmt[:i]
|
fmt = fmt[i+1:]
|
if fmt and fmt[0] in escape:
|
s += escape[fmt[0]]
|
fmt = fmt[1:]
|
continue
|
try:
|
i = fmt.index("%")
|
sfclsfld = fmt[:i]
|
fclsfld = sfclsfld.split(",")
|
if len(fclsfld) == 1:
|
f = "s"
|
clsfld = fclsfld[0]
|
elif len(fclsfld) == 2:
|
f,clsfld = fclsfld
|
else:
|
raise Scapy_Exception
|
if "." in clsfld:
|
cls,fld = clsfld.split(".")
|
else:
|
cls = self.__class__.__name__
|
fld = clsfld
|
num = 1
|
if ":" in cls:
|
cls,num = cls.split(":")
|
num = int(num)
|
fmt = fmt[i+1:]
|
except:
|
raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "..."))
|
else:
|
if fld == "time":
|
val = time.strftime("%H:%M:%S.%%06i", time.localtime(self.time)) % int((self.time-int(self.time))*1000000)
|
elif cls == self.__class__.__name__ and hasattr(self, fld):
|
if num > 1:
|
val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f,cls,num-1,fld), relax)
|
f = "s"
|
elif f[-1] == "r": # Raw field value
|
val = getattr(self,fld)
|
f = f[:-1]
|
if not f:
|
f = "s"
|
else:
|
val = getattr(self,fld)
|
if fld in self.fieldtype:
|
val = self.fieldtype[fld].i2repr(self,val)
|
else:
|
val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
|
f = "s"
|
s += ("%"+f) % val
|
|
s += fmt
|
return s
|
|
def mysummary(self):
|
"""DEV: can be overloaded to return a string that summarizes the layer.
|
Only one mysummary() is used in a whole packet summary: the one of the upper layer,
|
except if a mysummary() also returns (as a couple) a list of layers whose
|
mysummary() must be called if they are present."""
|
return ""
|
|
def _do_summary(self):
|
found, s, needed = self.payload._do_summary()
|
ret = ""
|
if not found or self.__class__ in needed:
|
ret = self.mysummary()
|
if isinstance(ret, tuple):
|
ret,n = ret
|
needed += n
|
if ret or needed:
|
found = 1
|
if not ret:
|
ret = self.__class__.__name__ if self.show_summary else ""
|
if self.__class__ in conf.emph:
|
impf = []
|
for f in self.fields_desc:
|
if f in conf.emph:
|
impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name))))
|
ret = "%s [%s]" % (ret," ".join(impf))
|
if ret and s:
|
ret = "%s / %s" % (ret, s)
|
else:
|
ret = "%s%s" % (ret,s)
|
return found,ret,needed
|
|
def summary(self, intern=0):
|
"""Prints a one line summary of a packet."""
|
found,s,needed = self._do_summary()
|
return s
|
|
|
def lastlayer(self,layer=None):
|
"""Returns the uppest layer of the packet"""
|
return self.payload.lastlayer(self)
|
|
def decode_payload_as(self,cls):
|
"""Reassembles the payload and decode it using another packet class"""
|
s = raw(self.payload)
|
self.payload = cls(s, _internal=1, _underlayer=self)
|
pp = self
|
while pp.underlayer is not None:
|
pp = pp.underlayer
|
self.payload.dissection_done(pp)
|
|
def command(self):
|
"""Returns a string representing the command you have to type to obtain the same packet"""
|
f = []
|
for fn,fv in self.fields.items():
|
fld = self.get_field(fn)
|
if isinstance(fv, Packet):
|
fv = fv.command()
|
elif fld.islist and fld.holds_packets and isinstance(fv, list):
|
fv = "[%s]" % ",".join( map(Packet.command, fv))
|
elif isinstance(fld, FlagsField):
|
fv = int(fv)
|
else:
|
fv = repr(fv)
|
f.append("%s=%s" % (fn, fv))
|
c = "%s(%s)" % (self.__class__.__name__, ", ".join(f))
|
pc = self.payload.command()
|
if pc:
|
c += "/"+pc
|
return c
|
|
class NoPayload(Packet):
|
def __new__(cls, *args, **kargs):
|
singl = cls.__dict__.get("__singl__")
|
if singl is None:
|
cls.__singl__ = singl = Packet.__new__(cls)
|
Packet.__init__(singl)
|
return singl
|
def __init__(self, *args, **kargs):
|
pass
|
def dissection_done(self,pkt):
|
return
|
def add_payload(self, payload):
|
raise Scapy_Exception("Can't add payload to NoPayload instance")
|
def remove_payload(self):
|
pass
|
def add_underlayer(self,underlayer):
|
pass
|
def remove_underlayer(self,other):
|
pass
|
def copy(self):
|
return self
|
def __repr__(self):
|
return ""
|
def __str__(self):
|
return ""
|
def __bytes__(self):
|
return b""
|
def __nonzero__(self):
|
return False
|
__bool__ = __nonzero__
|
def do_build(self):
|
return b""
|
def build(self):
|
return b""
|
def build_padding(self):
|
return b""
|
def build_done(self, p):
|
return p
|
def build_ps(self, internal=0):
|
return b"",[]
|
def getfieldval(self, attr):
|
raise AttributeError(attr)
|
def getfield_and_val(self, attr):
|
raise AttributeError(attr)
|
def setfieldval(self, attr, val):
|
raise AttributeError(attr)
|
def delfieldval(self, attr):
|
raise AttributeError(attr)
|
def hide_defaults(self):
|
pass
|
def __iter__(self):
|
return iter([])
|
def __eq__(self, other):
|
if isinstance(other, NoPayload):
|
return True
|
return False
|
def hashret(self):
|
return b""
|
def answers(self, other):
|
return isinstance(other, NoPayload) or isinstance(other, conf.padding_layer)
|
def haslayer(self, cls):
|
return 0
|
def getlayer(self, cls, nb=1, _track=None, **flt):
|
if _track is not None:
|
_track.append(nb)
|
return None
|
def fragment(self, *args, **kargs):
|
raise Scapy_Exception("cannot fragment this packet")
|
def show(self, indent=3, lvl="", label_lvl=""):
|
pass
|
def sprintf(self, fmt, relax):
|
if relax:
|
return "??"
|
else:
|
raise Scapy_Exception("Format not found [%s]"%fmt)
|
def _do_summary(self):
|
return 0,"",[]
|
def lastlayer(self,layer):
|
return layer
|
def command(self):
|
return ""
|
|
####################
|
## packet classes ##
|
####################
|
|
|
class Raw(Packet):
|
name = "Raw"
|
fields_desc = [ StrField("load", "") ]
|
def answers(self, other):
|
return 1
|
# s = raw(other)
|
# t = self.load
|
# l = min(len(s), len(t))
|
# return s[:l] == t[:l]
|
def mysummary(self):
|
cs = conf.raw_summary
|
if cs:
|
if callable(cs):
|
return "Raw %s" % cs(self.load)
|
else:
|
return "Raw %r" % self.load
|
return Packet.mysummary(self)
|
|
class Padding(Raw):
|
name = "Padding"
|
def self_build(self):
|
return b""
|
def build_padding(self):
|
return (raw(self.load) if self.raw_packet_cache is None
|
else self.raw_packet_cache) + self.payload.build_padding()
|
|
conf.raw_layer = Raw
|
conf.padding_layer = Padding
|
if conf.default_l2 is None:
|
conf.default_l2 = Raw
|
|
#################
|
## Bind layers ##
|
#################
|
|
|
def bind_bottom_up(lower, upper, __fval=None, **fval):
|
if __fval is not None:
|
fval.update(__fval)
|
lower.payload_guess = lower.payload_guess[:]
|
lower.payload_guess.append((fval, upper))
|
|
|
def bind_top_down(lower, upper, __fval=None, **fval):
|
if __fval is not None:
|
fval.update(__fval)
|
upper._overload_fields = upper._overload_fields.copy()
|
upper._overload_fields[lower] = fval
|
|
@conf.commands.register
|
def bind_layers(lower, upper, __fval=None, **fval):
|
"""Bind 2 layers on some specific fields' values"""
|
if __fval is not None:
|
fval.update(__fval)
|
bind_top_down(lower, upper, **fval)
|
bind_bottom_up(lower, upper, **fval)
|
|
def split_bottom_up(lower, upper, __fval=None, **fval):
|
if __fval is not None:
|
fval.update(__fval)
|
def do_filter(xxx_todo_changeme,upper=upper,fval=fval):
|
(f,u) = xxx_todo_changeme
|
if u != upper:
|
return True
|
for k in fval:
|
if k not in f or f[k] != fval[k]:
|
return True
|
return False
|
lower.payload_guess = [x for x in lower.payload_guess if do_filter(x)]
|
|
def split_top_down(lower, upper, __fval=None, **fval):
|
if __fval is not None:
|
fval.update(__fval)
|
if lower in upper._overload_fields:
|
ofval = upper._overload_fields[lower]
|
for k in fval:
|
if k not in ofval or ofval[k] != fval[k]:
|
return
|
upper._overload_fields = upper._overload_fields.copy()
|
del(upper._overload_fields[lower])
|
|
@conf.commands.register
|
def split_layers(lower, upper, __fval=None, **fval):
|
"""Split 2 layers previously bound"""
|
if __fval is not None:
|
fval.update(__fval)
|
split_bottom_up(lower, upper, **fval)
|
split_top_down(lower, upper, **fval)
|
|
|
@conf.commands.register
|
def ls(obj=None, case_sensitive=False, verbose=False):
|
"""List available layers, or infos on a given layer class or name"""
|
is_string = isinstance(obj, six.string_types)
|
|
if obj is None or is_string:
|
if obj is None:
|
all_layers = sorted(conf.layers, key=lambda x: x.__name__)
|
else:
|
pattern = re.compile(obj, 0 if case_sensitive else re.I)
|
all_layers = sorted((layer for layer in conf.layers
|
if (pattern.search(layer.__name__ or '')
|
or pattern.search(layer.name or ''))),
|
key=lambda x: x.__name__)
|
for layer in all_layers:
|
print("%-10s : %s" % (layer.__name__, layer._name))
|
|
else:
|
is_pkt = isinstance(obj, Packet)
|
if (isinstance(obj, type) and issubclass(obj, Packet)) or is_pkt:
|
for f in obj.fields_desc:
|
cur_fld = f
|
attrs = []
|
long_attrs = []
|
while isinstance(cur_fld, (Emph, ConditionalField)):
|
if isinstance(cur_fld, ConditionalField):
|
attrs.append(cur_fld.__class__.__name__[:4])
|
cur_fld = cur_fld.fld
|
if verbose and isinstance(cur_fld, EnumField) \
|
and hasattr(cur_fld, "i2s"):
|
if len(cur_fld.i2s) < 50:
|
long_attrs.extend(
|
"%s: %d" % (strval, numval)
|
for numval, strval in
|
sorted(six.iteritems(cur_fld.i2s))
|
)
|
elif isinstance(cur_fld, MultiEnumField):
|
fld_depend = cur_fld.depends_on(obj.__class__
|
if is_pkt else obj)
|
attrs.append("Depends on %s" % fld_depend.name)
|
if verbose:
|
cur_i2s = cur_fld.i2s_multi.get(
|
cur_fld.depends_on(obj if is_pkt else obj()), {}
|
)
|
if len(cur_i2s) < 50:
|
long_attrs.extend(
|
"%s: %d" % (strval, numval)
|
for numval, strval in
|
sorted(six.iteritems(cur_i2s))
|
)
|
elif verbose and isinstance(cur_fld, FlagsField):
|
names = cur_fld.names
|
long_attrs.append(", ".join(names))
|
class_name = "%s (%s)" % (
|
cur_fld.__class__.__name__,
|
", ".join(attrs)) if attrs else cur_fld.__class__.__name__
|
if isinstance(cur_fld, BitField):
|
class_name += " (%d bit%s)" % (cur_fld.size,
|
"s" if cur_fld.size > 1
|
else "")
|
print("%-10s : %-35s =" % (f.name, class_name), end=' ')
|
if is_pkt:
|
print("%-15r" % (getattr(obj, f.name),), end=' ')
|
print("(%r)" % (f.default,))
|
for attr in long_attrs:
|
print("%-15s%s" % ("", attr))
|
if is_pkt and not isinstance(obj.payload, NoPayload):
|
print("--")
|
ls(obj.payload)
|
|
else:
|
print("Not a packet class or name. Type 'ls()' to list packet classes.")
|
|
|
|
#############
|
## Fuzzing ##
|
#############
|
|
@conf.commands.register
|
def fuzz(p, _inplace=0):
|
"""Transform a layer into a fuzzy layer by replacing some default values by random objects"""
|
if not _inplace:
|
p = p.copy()
|
q = p
|
while not isinstance(q, NoPayload):
|
for f in q.fields_desc:
|
if isinstance(f, PacketListField):
|
for r in getattr(q, f.name):
|
print("fuzzing", repr(r))
|
fuzz(r, _inplace=1)
|
elif f.default is not None:
|
if not isinstance(f, ConditionalField) or f._evalcond(q):
|
rnd = f.randval()
|
if rnd is not None:
|
q.default_fields[f.name] = rnd
|
q = q.payload
|
return p
|