## This file is part of Scapy
|
## See http://www.secdev.org/projects/scapy for more informations
|
## Copyright (C) Philippe Biondi <phil@secdev.org>
|
## This program is published under a GPLv2 license
|
|
"""
|
General utility functions.
|
"""
|
|
from __future__ import absolute_import
|
from __future__ import print_function
|
import os, sys, socket, types
|
import random, time
|
import gzip, zlib
|
import re, struct, array
|
import subprocess
|
import tempfile
|
|
import warnings
|
import scapy.modules.six as six
|
from scapy.modules.six.moves import range
|
warnings.filterwarnings("ignore","tempnam",RuntimeWarning, __name__)
|
|
from scapy.config import conf
|
from scapy.consts import DARWIN, WINDOWS
|
from scapy.data import MTU
|
from scapy.compat import *
|
from scapy.error import log_runtime, log_loading, log_interactive, Scapy_Exception, warning
|
from scapy.base_classes import BasePacketList
|
|
###########
|
## Tools ##
|
###########
|
|
def get_temp_file(keep=False, autoext=""):
|
"""Create a temporary file and return its name. When keep is False,
|
the file is deleted when scapy exits.
|
|
"""
|
fname = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
|
delete=False).name
|
if not keep:
|
conf.temp_files.append(fname)
|
return fname
|
|
def sane_color(x):
|
r=""
|
for i in x:
|
j = orb(i)
|
if (j < 32) or (j >= 127):
|
r=r+conf.color_theme.not_printable(".")
|
else:
|
r=r+chr(j)
|
return r
|
|
def sane(x):
|
r=""
|
for i in x:
|
j = orb(i)
|
if (j < 32) or (j >= 127):
|
r=r+"."
|
else:
|
r=r+chr(j)
|
return r
|
|
@conf.commands.register
|
def restart():
|
"""Restarts scapy"""
|
if not conf.interactive or not os.path.isfile(sys.argv[0]):
|
raise OSError("Scapy was not started from console")
|
if WINDOWS:
|
os._exit(subprocess.call([sys.executable] + sys.argv))
|
os.execv(sys.executable, [sys.executable] + sys.argv)
|
|
def lhex(x):
|
if type(x) in six.integer_types:
|
return hex(x)
|
elif isinstance(x, tuple):
|
return "(%s)" % ", ".join(map(lhex, x))
|
elif isinstance(x, list):
|
return "[%s]" % ", ".join(map(lhex, x))
|
else:
|
return x
|
|
@conf.commands.register
|
def hexdump(x, dump=False):
|
""" Build a tcpdump like hexadecimal view
|
|
:param x: a Packet
|
:param dump: define if the result must be printed or returned in a variable
|
:returns: a String only when dump=True
|
"""
|
s = ""
|
x = raw(x)
|
l = len(x)
|
i = 0
|
while i < l:
|
s += "%04x " % i
|
for j in range(16):
|
if i+j < l:
|
s += "%02X" % orb(x[i+j])
|
else:
|
s += " "
|
if j%16 == 7:
|
s += ""
|
s += " "
|
s += sane_color(x[i:i+16])
|
i += 16
|
s += "\n"
|
# remove trailing \n
|
if s.endswith("\n"):
|
s = s[:-1]
|
if dump:
|
return s
|
else:
|
print(s)
|
|
|
@conf.commands.register
|
def linehexdump(x, onlyasc=0, onlyhex=0, dump=False):
|
""" Build an equivalent view of hexdump() on a single line
|
|
Note that setting both onlyasc and onlyhex to 1 results in a empty output
|
|
:param x: a Packet
|
:param onlyasc: 1 to display only the ascii view
|
:param onlyhex: 1 to display only the hexadecimal view
|
:param dump: print the view if False
|
:returns: a String only when dump=True
|
"""
|
s = ""
|
x = raw(x)
|
l = len(x)
|
if not onlyasc:
|
for i in range(l):
|
s += "%02X" % orb(x[i])
|
if not onlyhex: # separate asc & hex if both are displayed
|
s += " "
|
if not onlyhex:
|
s += sane_color(x)
|
if dump:
|
return s
|
else:
|
print(s)
|
|
@conf.commands.register
|
def chexdump(x, dump=False):
|
""" Build a per byte hexadecimal representation
|
|
Example:
|
>>> chexdump(IP())
|
0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01
|
|
:param x: a Packet
|
:param dump: print the view if False
|
:returns: a String only if dump=True
|
"""
|
x = raw(x)
|
s = ", ".join("%#04x" % orb(x) for x in x)
|
if dump:
|
return s
|
else:
|
print(s)
|
|
@conf.commands.register
|
def hexstr(x, onlyasc=0, onlyhex=0):
|
s = []
|
if not onlyasc:
|
s.append(" ".join("%02x" % orb(b) for b in x))
|
if not onlyhex:
|
s.append(sane(x))
|
return " ".join(s)
|
|
def repr_hex(s):
|
""" Convert provided bitstring to a simple string of hex digits """
|
return "".join("%02x" % orb(x) for x in s)
|
|
@conf.commands.register
|
def hexdiff(x,y):
|
"""Show differences between 2 binary strings"""
|
x=raw(x)[::-1]
|
y=raw(y)[::-1]
|
SUBST=1
|
INSERT=1
|
d = {(-1, -1): (0, (-1, -1))}
|
for j in range(len(y)):
|
d[-1,j] = d[-1,j-1][0]+INSERT, (-1,j-1)
|
for i in range(len(x)):
|
d[i,-1] = d[i-1,-1][0]+INSERT, (i-1,-1)
|
|
for j in range(len(y)):
|
for i in range(len(x)):
|
d[i,j] = min( ( d[i-1,j-1][0]+SUBST*(x[i] != y[j]), (i-1,j-1) ),
|
( d[i-1,j][0]+INSERT, (i-1,j) ),
|
( d[i,j-1][0]+INSERT, (i,j-1) ) )
|
|
|
backtrackx = []
|
backtracky = []
|
i=len(x)-1
|
j=len(y)-1
|
while not (i == j == -1):
|
i2,j2 = d[i,j][1]
|
backtrackx.append(x[i2+1:i+1])
|
backtracky.append(y[j2+1:j+1])
|
i,j = i2,j2
|
|
|
|
x = y = i = 0
|
colorize = { 0: lambda x:x,
|
-1: conf.color_theme.left,
|
1: conf.color_theme.right }
|
|
dox=1
|
doy=0
|
l = len(backtrackx)
|
while i < l:
|
separate=0
|
linex = backtrackx[i:i+16]
|
liney = backtracky[i:i+16]
|
xx = sum(len(k) for k in linex)
|
yy = sum(len(k) for k in liney)
|
if dox and not xx:
|
dox = 0
|
doy = 1
|
if dox and linex == liney:
|
doy=1
|
|
if dox:
|
xd = y
|
j = 0
|
while not linex[j]:
|
j += 1
|
xd -= 1
|
print(colorize[doy-dox]("%04x" % xd), end=' ')
|
x += xx
|
line=linex
|
else:
|
print(" ", end=' ')
|
if doy:
|
yd = y
|
j = 0
|
while not liney[j]:
|
j += 1
|
yd -= 1
|
print(colorize[doy-dox]("%04x" % yd), end=' ')
|
y += yy
|
line=liney
|
else:
|
print(" ", end=' ')
|
|
print(" ", end=' ')
|
|
cl = ""
|
for j in range(16):
|
if i+j < l:
|
if line[j]:
|
col = colorize[(linex[j]!=liney[j])*(doy-dox)]
|
print(col("%02X" % orb(line[j])), end=' ')
|
if linex[j]==liney[j]:
|
cl += sane_color(line[j])
|
else:
|
cl += col(sane(line[j]))
|
else:
|
print(" ", end=' ')
|
cl += " "
|
else:
|
print(" ", end=' ')
|
if j == 7:
|
print("", end=' ')
|
|
|
print(" ",cl)
|
|
if doy or not yy:
|
doy=0
|
dox=1
|
i += 16
|
else:
|
if yy:
|
dox=0
|
doy=1
|
else:
|
i += 16
|
|
if struct.pack("H",1) == b"\x00\x01": # big endian
|
def checksum(pkt):
|
if len(pkt) % 2 == 1:
|
pkt += b"\0"
|
s = sum(array.array("H", pkt))
|
s = (s >> 16) + (s & 0xffff)
|
s += s >> 16
|
s = ~s
|
return s & 0xffff
|
else:
|
def checksum(pkt):
|
if len(pkt) % 2 == 1:
|
pkt += b"\0"
|
s = sum(array.array("H", pkt))
|
s = (s >> 16) + (s & 0xffff)
|
s += s >> 16
|
s = ~s
|
return (((s>>8)&0xff)|s<<8) & 0xffff
|
|
|
def _fletcher16(charbuf):
|
# This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>
|
c0 = c1 = 0
|
for char in charbuf:
|
c0 += orb(char)
|
c1 += c0
|
|
c0 %= 255
|
c1 %= 255
|
return (c0,c1)
|
|
@conf.commands.register
|
def fletcher16_checksum(binbuf):
|
""" Calculates Fletcher-16 checksum of the given buffer.
|
|
Note:
|
If the buffer contains the two checkbytes derived from the Fletcher-16 checksum
|
the result of this function has to be 0. Otherwise the buffer has been corrupted.
|
"""
|
(c0,c1)= _fletcher16(binbuf)
|
return (c1 << 8) | c0
|
|
|
@conf.commands.register
|
def fletcher16_checkbytes(binbuf, offset):
|
""" Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
|
|
Including the bytes into the buffer (at the position marked by offset) the
|
global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify
|
the integrity of the buffer on the receiver side.
|
|
For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B.
|
"""
|
|
# This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>
|
if len(binbuf) < offset:
|
raise Exception("Packet too short for checkbytes %d" % len(binbuf))
|
|
binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
|
(c0,c1)= _fletcher16(binbuf)
|
|
x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
|
|
if (x <= 0):
|
x += 255
|
|
y = 510 - c0 - x
|
|
if (y > 255):
|
y -= 255
|
return chb(x) + chb(y)
|
|
|
def mac2str(mac):
|
return b"".join(chb(int(x, 16)) for x in mac.split(':'))
|
|
def str2mac(s):
|
if isinstance(s, str):
|
return ("%02x:"*6)[:-1] % tuple(map(ord, s))
|
return ("%02x:"*6)[:-1] % tuple(s)
|
|
def randstring(l):
|
"""
|
Returns a random string of length l (l >= 0)
|
"""
|
return b"".join(struct.pack('B', random.randint(0, 255)) for _ in range(l))
|
|
def zerofree_randstring(l):
|
"""
|
Returns a random string of length l (l >= 0) without zero in it.
|
"""
|
return b"".join(struct.pack('B', random.randint(1, 255)) for _ in range(l))
|
|
def strxor(s1, s2):
|
"""
|
Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
|
must be of same length.
|
"""
|
return b"".join(map(lambda x,y:chb(orb(x)^orb(y)), s1, s2))
|
|
def strand(s1, s2):
|
"""
|
Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
|
must be of same length.
|
"""
|
return b"".join(map(lambda x,y:chb(orb(x)&orb(y)), s1, s2))
|
|
|
# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470
|
try:
|
socket.inet_aton("255.255.255.255")
|
except socket.error:
|
def inet_aton(x):
|
if x == "255.255.255.255":
|
return b"\xff"*4
|
else:
|
return socket.inet_aton(x)
|
else:
|
inet_aton = socket.inet_aton
|
|
inet_ntoa = socket.inet_ntoa
|
from scapy.pton_ntop import *
|
|
|
def atol(x):
|
try:
|
ip = inet_aton(x)
|
except socket.error:
|
ip = inet_aton(socket.gethostbyname(x))
|
return struct.unpack("!I", ip)[0]
|
def ltoa(x):
|
return inet_ntoa(struct.pack("!I", x&0xffffffff))
|
|
def itom(x):
|
return (0xffffffff00000000>>x)&0xffffffff
|
|
class ContextManagerSubprocess(object):
|
"""
|
Context manager that eases checking for unknown command.
|
|
Example:
|
>>> with ContextManagerSubprocess("my custom message"):
|
>>> subprocess.Popen(["unknown_command"])
|
|
"""
|
def __init__(self, name, prog):
|
self.name = name
|
self.prog = prog
|
|
def __enter__(self):
|
pass
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
if isinstance(exc_value, (OSError, TypeError)):
|
msg = "%s: executing %r failed" % (self.name, self.prog) if self.prog else "Could not execute %s, is it installed ?" % self.name
|
if not conf.interactive:
|
raise OSError(msg)
|
else:
|
log_runtime.error(msg, exc_info=True)
|
return True # Suppress the exception
|
|
class ContextManagerCaptureOutput(object):
|
"""
|
Context manager that intercept the console's output.
|
|
Example:
|
>>> with ContextManagerCaptureOutput() as cmco:
|
... print("hey")
|
... assert cmco.get_output() == "hey"
|
"""
|
def __init__(self):
|
self.result_export_object = ""
|
try:
|
import mock
|
except:
|
raise ImportError("The mock module needs to be installed !")
|
def __enter__(self):
|
import mock
|
def write(s, decorator=self):
|
decorator.result_export_object += s
|
mock_stdout = mock.Mock()
|
mock_stdout.write = write
|
self.bck_stdout = sys.stdout
|
sys.stdout = mock_stdout
|
return self
|
def __exit__(self, *exc):
|
sys.stdout = self.bck_stdout
|
return False
|
def get_output(self, eval_bytes=False):
|
if self.result_export_object.startswith("b'") and eval_bytes:
|
return plain_str(eval(self.result_export_object))
|
return self.result_export_object
|
|
def do_graph(graph,prog=None,format=None,target=None,type=None,string=None,options=None):
|
"""do_graph(graph, prog=conf.prog.dot, format="svg",
|
target="| conf.prog.display", options=None, [string=1]):
|
string: if not None, simply return the graph string
|
graph: GraphViz graph description
|
format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" option
|
target: filename or redirect. Defaults pipe to Imagemagick's display program
|
prog: which graphviz program to use
|
options: options to be passed to prog"""
|
|
if format is None:
|
if WINDOWS:
|
format = "png" # use common format to make sure a viewer is installed
|
else:
|
format = "svg"
|
if string:
|
return graph
|
if type is not None:
|
format=type
|
if prog is None:
|
prog = conf.prog.dot
|
start_viewer=False
|
if target is None:
|
if WINDOWS:
|
target = get_temp_file(autoext="."+format)
|
start_viewer = True
|
else:
|
with ContextManagerSubprocess("do_graph()", conf.prog.display):
|
target = subprocess.Popen([conf.prog.display],
|
stdin=subprocess.PIPE).stdin
|
if format is not None:
|
format = "-T%s" % format
|
if isinstance(target, str):
|
if target.startswith('|'):
|
target = subprocess.Popen(target[1:].lstrip(), shell=True,
|
stdin=subprocess.PIPE).stdin
|
elif target.startswith('>'):
|
target = open(target[1:].lstrip(), "wb")
|
else:
|
target = open(os.path.abspath(target), "wb")
|
proc = subprocess.Popen("\"%s\" %s %s" % (prog, options or "", format or ""),
|
shell=True, stdin=subprocess.PIPE, stdout=target)
|
proc.stdin.write(raw(graph))
|
try:
|
target.close()
|
except:
|
pass
|
if start_viewer:
|
# Workaround for file not found error: We wait until tempfile is written.
|
waiting_start = time.time()
|
while not os.path.exists(target.name):
|
time.sleep(0.1)
|
if time.time() - waiting_start > 3:
|
warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile)
|
break
|
else:
|
if conf.prog.display == conf.prog._default:
|
os.startfile(target.name)
|
else:
|
with ContextManagerSubprocess("do_graph()", conf.prog.display):
|
subprocess.Popen([conf.prog.display, target.name])
|
|
_TEX_TR = {
|
"{":"{\\tt\\char123}",
|
"}":"{\\tt\\char125}",
|
"\\":"{\\tt\\char92}",
|
"^":"\\^{}",
|
"$":"\\$",
|
"#":"\\#",
|
"~":"\\~",
|
"_":"\\_",
|
"&":"\\&",
|
"%":"\\%",
|
"|":"{\\tt\\char124}",
|
"~":"{\\tt\\char126}",
|
"<":"{\\tt\\char60}",
|
">":"{\\tt\\char62}",
|
}
|
|
def tex_escape(x):
|
s = ""
|
for c in x:
|
s += _TEX_TR.get(c,c)
|
return s
|
|
def colgen(*lstcol,**kargs):
|
"""Returns a generator that mixes provided quantities forever
|
trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default"""
|
if len(lstcol) < 2:
|
lstcol *= 2
|
trans = kargs.get("trans", lambda x,y,z: (x,y,z))
|
while True:
|
for i in range(len(lstcol)):
|
for j in range(len(lstcol)):
|
for k in range(len(lstcol)):
|
if i != j or j != k or k != i:
|
yield trans(lstcol[(i+j)%len(lstcol)],lstcol[(j+k)%len(lstcol)],lstcol[(k+i)%len(lstcol)])
|
|
def incremental_label(label="tag%05i", start=0):
|
while True:
|
yield label % start
|
start += 1
|
|
def binrepr(val):
|
return bin(val)[2:]
|
|
def long_converter(s):
|
return int(s.replace('\n', '').replace(' ', ''), 16)
|
|
#########################
|
#### Enum management ####
|
#########################
|
|
class EnumElement:
|
_value=None
|
def __init__(self, key, value):
|
self._key = key
|
self._value = value
|
def __repr__(self):
|
return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value)
|
def __getattr__(self, attr):
|
return getattr(self._value, attr)
|
def __str__(self):
|
return self._key
|
def __bytes__(self):
|
return raw(self.__str__())
|
def __hash__(self):
|
return self._value
|
def __int__(self):
|
return int(self._value)
|
def __eq__(self, other):
|
return self._value == int(other)
|
def __neq__(self, other):
|
return not self.__eq__(other)
|
|
|
class Enum_metaclass(type):
|
element_class = EnumElement
|
def __new__(cls, name, bases, dct):
|
rdict={}
|
for k,v in six.iteritems(dct):
|
if isinstance(v, int):
|
v = cls.element_class(k,v)
|
dct[k] = v
|
rdict[v] = k
|
dct["__rdict__"] = rdict
|
return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
|
def __getitem__(self, attr):
|
return self.__rdict__[attr]
|
def __contains__(self, val):
|
return val in self.__rdict__
|
def get(self, attr, val=None):
|
return self.__rdict__.get(attr, val)
|
def __repr__(self):
|
return "<%s>" % self.__dict__.get("name", self.__name__)
|
|
|
|
###################
|
## Object saving ##
|
###################
|
|
|
def export_object(obj):
|
print(bytes_base64(gzip.zlib.compress(six.moves.cPickle.dumps(obj, 2), 9)))
|
|
def import_object(obj=None):
|
if obj is None:
|
obj = sys.stdin.read()
|
return six.moves.cPickle.loads(gzip.zlib.decompress(base64_bytes(obj.strip())))
|
|
|
def save_object(fname, obj):
|
"""Pickle a Python object"""
|
|
fd = gzip.open(fname, "wb")
|
six.moves.cPickle.dump(obj, fd)
|
fd.close()
|
|
def load_object(fname):
|
"""unpickle a Python object"""
|
return six.moves.cPickle.load(gzip.open(fname,"rb"))
|
|
@conf.commands.register
|
def corrupt_bytes(s, p=0.01, n=None):
|
"""Corrupt a given percentage or number of bytes from a string"""
|
s = array.array("B",raw(s))
|
l = len(s)
|
if n is None:
|
n = max(1,int(l*p))
|
for i in random.sample(range(l), n):
|
s[i] = (s[i]+random.randint(1,255))%256
|
return s.tostring()
|
|
@conf.commands.register
|
def corrupt_bits(s, p=0.01, n=None):
|
"""Flip a given percentage or number of bits from a string"""
|
s = array.array("B",raw(s))
|
l = len(s)*8
|
if n is None:
|
n = max(1,int(l*p))
|
for i in random.sample(range(l), n):
|
s[i // 8] ^= 1 << (i % 8)
|
return s.tostring()
|
|
|
|
|
#############################
|
## pcap capture file stuff ##
|
#############################
|
|
@conf.commands.register
|
def wrpcap(filename, pkt, *args, **kargs):
|
"""Write a list of packets to a pcap file
|
|
filename: the name of the file to write packets to, or an open,
|
writable file-like object. The file descriptor will be
|
closed at the end of the call, so do not use an object you
|
do not want to close (e.g., running wrpcap(sys.stdout, [])
|
in interactive mode will crash Scapy).
|
gz: set to 1 to save a gzipped capture
|
linktype: force linktype value
|
endianness: "<" or ">", force endianness
|
sync: do not bufferize writes to the capture file
|
|
"""
|
with PcapWriter(filename, *args, **kargs) as fdesc:
|
fdesc.write(pkt)
|
|
@conf.commands.register
|
def rdpcap(filename, count=-1):
|
"""Read a pcap or pcapng file and return a packet list
|
|
count: read only <count> packets
|
|
"""
|
with PcapReader(filename) as fdesc:
|
return fdesc.read_all(count=count)
|
|
|
class PcapReader_metaclass(type):
|
"""Metaclass for (Raw)Pcap(Ng)Readers"""
|
|
def __new__(cls, name, bases, dct):
|
"""The `alternative` class attribute is declared in the PcapNg
|
variant, and set here to the Pcap variant.
|
|
"""
|
newcls = super(PcapReader_metaclass, cls).__new__(cls, name, bases, dct)
|
if 'alternative' in dct:
|
dct['alternative'].alternative = newcls
|
return newcls
|
|
def __call__(cls, filename):
|
"""Creates a cls instance, use the `alternative` if that
|
fails.
|
|
"""
|
i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
|
filename, fdesc, magic = cls.open(filename)
|
try:
|
i.__init__(filename, fdesc, magic)
|
except Scapy_Exception:
|
if "alternative" in cls.__dict__:
|
cls = cls.__dict__["alternative"]
|
i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
|
try:
|
i.__init__(filename, fdesc, magic)
|
except Scapy_Exception:
|
raise
|
try:
|
i.f.seek(-4, 1)
|
except:
|
pass
|
raise Scapy_Exception("Not a supported capture file")
|
|
return i
|
|
@staticmethod
|
def open(filename):
|
"""Open (if necessary) filename, and read the magic."""
|
if isinstance(filename, six.string_types):
|
try:
|
fdesc = gzip.open(filename,"rb")
|
magic = fdesc.read(4)
|
except IOError:
|
fdesc = open(filename, "rb")
|
magic = fdesc.read(4)
|
else:
|
fdesc = filename
|
filename = (fdesc.name
|
if hasattr(fdesc, "name") else
|
"No name")
|
magic = fdesc.read(4)
|
return filename, fdesc, magic
|
|
|
class RawPcapReader(six.with_metaclass(PcapReader_metaclass)):
|
"""A stateful pcap reader. Each packet is returned as a string"""
|
def __init__(self, filename, fdesc, magic):
|
self.filename = filename
|
self.f = fdesc
|
if magic == b"\xa1\xb2\xc3\xd4": # big endian
|
self.endian = ">"
|
self.nano = False
|
elif magic == b"\xd4\xc3\xb2\xa1": # little endian
|
self.endian = "<"
|
self.nano = False
|
elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision
|
self.endian = ">"
|
self.nano = True
|
elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision
|
self.endian = "<"
|
self.nano = True
|
else:
|
raise Scapy_Exception(
|
"Not a pcap capture file (bad magic: %r)" % magic
|
)
|
hdr = self.f.read(20)
|
if len(hdr)<20:
|
raise Scapy_Exception("Invalid pcap file (too short)")
|
vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
|
self.endian + "HHIIII", hdr
|
)
|
self.linktype = linktype
|
|
def __iter__(self):
|
return self
|
|
def next(self):
|
"""implement the iterator protocol on a set of packets in a pcap file"""
|
pkt = self.read_packet()
|
if pkt == None:
|
raise StopIteration
|
return pkt
|
__next__ = next
|
|
|
def read_packet(self, size=MTU):
|
"""return a single packet read from the file
|
|
returns None when no more packets are available
|
"""
|
hdr = self.f.read(16)
|
if len(hdr) < 16:
|
return None
|
sec,usec,caplen,wirelen = struct.unpack(self.endian+"IIII", hdr)
|
s = self.f.read(caplen)[:size]
|
return s,(sec,usec,wirelen) # caplen = len(s)
|
|
|
def dispatch(self, callback):
|
"""call the specified callback routine for each packet read
|
|
This is just a convenience function for the main loop
|
that allows for easy launching of packet processing in a
|
thread.
|
"""
|
for p in self:
|
callback(p)
|
|
def read_all(self,count=-1):
|
"""return a list of all packets in the pcap file
|
"""
|
res=[]
|
while count != 0:
|
count -= 1
|
p = self.read_packet()
|
if p is None:
|
break
|
res.append(p)
|
return res
|
|
def recv(self, size=MTU):
|
""" Emulate a socket
|
"""
|
return self.read_packet(size=size)[0]
|
|
def fileno(self):
|
return self.f.fileno()
|
|
def close(self):
|
return self.f.close()
|
|
def __enter__(self):
|
return self
|
|
def __exit__(self, exc_type, exc_value, tracback):
|
self.close()
|
|
|
class PcapReader(RawPcapReader):
|
def __init__(self, filename, fdesc, magic):
|
RawPcapReader.__init__(self, filename, fdesc, magic)
|
try:
|
self.LLcls = conf.l2types[self.linktype]
|
except KeyError:
|
warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype,self.linktype))
|
self.LLcls = conf.raw_layer
|
def read_packet(self, size=MTU):
|
rp = RawPcapReader.read_packet(self, size=size)
|
if rp is None:
|
return None
|
s,(sec,usec,wirelen) = rp
|
|
try:
|
p = self.LLcls(s)
|
except KeyboardInterrupt:
|
raise
|
except:
|
if conf.debug_dissector:
|
raise
|
p = conf.raw_layer(s)
|
p.time = sec + (0.000000001 if self.nano else 0.000001) * usec
|
return p
|
def read_all(self,count=-1):
|
res = RawPcapReader.read_all(self, count)
|
from scapy import plist
|
return plist.PacketList(res,name = os.path.basename(self.filename))
|
def recv(self, size=MTU):
|
return self.read_packet(size=size)
|
|
|
class RawPcapNgReader(RawPcapReader):
|
"""A stateful pcapng reader. Each packet is returned as a
|
string.
|
|
"""
|
|
alternative = RawPcapReader
|
|
def __init__(self, filename, fdesc, magic):
|
self.filename = filename
|
self.f = fdesc
|
# A list of (linktype, snaplen, tsresol); will be populated by IDBs.
|
self.interfaces = []
|
self.blocktypes = {
|
1: self.read_block_idb,
|
2: self.read_block_pkt,
|
3: self.read_block_spb,
|
6: self.read_block_epb,
|
}
|
if magic != b"\x0a\x0d\x0d\x0a": # PcapNg:
|
raise Scapy_Exception(
|
"Not a pcapng capture file (bad magic: %r)" % magic
|
)
|
# see https://github.com/pcapng/pcapng
|
blocklen, magic = self.f.read(4), self.f.read(4)
|
if magic == b"\x1a\x2b\x3c\x4d":
|
self.endian = ">"
|
elif magic == b"\x4d\x3c\x2b\x1a":
|
self.endian = "<"
|
else:
|
raise Scapy_Exception("Not a pcapng capture file (bad magic)")
|
try:
|
self.f.seek(0)
|
except:
|
pass
|
|
def read_packet(self, size=MTU):
|
"""Read blocks until it reaches either EOF or a packet, and
|
returns None or (packet, (linktype, sec, usec, wirelen)),
|
where packet is a string.
|
|
"""
|
while True:
|
try:
|
blocktype, blocklen = struct.unpack(self.endian + "2I",
|
self.f.read(8))
|
except struct.error:
|
return None
|
block = self.f.read(blocklen - 12)
|
if blocklen % 4:
|
pad = self.f.read(4 - (blocklen % 4))
|
warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
|
"Ignored padding %r" % (blocklen, pad))
|
try:
|
if (blocklen,) != struct.unpack(self.endian + 'I',
|
self.f.read(4)):
|
warning("PcapNg: Invalid pcapng block (bad blocklen)")
|
except struct.error:
|
return None
|
res = self.blocktypes.get(blocktype,
|
lambda block, size: None)(block, size)
|
if res is not None:
|
return res
|
|
def read_block_idb(self, block, _):
|
"""Interface Description Block"""
|
options = block[16:]
|
tsresol = 1000000
|
while len(options) >= 4:
|
code, length = struct.unpack(self.endian + "HH", options[:4])
|
# PCAP Next Generation (pcapng) Capture File Format
|
# 4.2. - Interface Description Block
|
# http://xml2rfc.tools.ietf.org/cgi-bin/xml2rfc.cgi?url=https://raw.githubusercontent.com/pcapng/pcapng/master/draft-tuexen-opsawg-pcapng.xml&modeAsFormat=html/ascii&type=ascii#rfc.section.4.2
|
if code == 9 and length == 1 and len(options) >= 5:
|
tsresol = orb(options[4])
|
tsresol = (2 if tsresol & 128 else 10) ** (tsresol & 127)
|
if code == 0:
|
if length != 0:
|
warning("PcapNg: invalid option length %d for end-of-option" % length)
|
break
|
if length % 4:
|
length += (4 - (length % 4))
|
options = options[4 + length:]
|
self.interfaces.append(struct.unpack(self.endian + "HxxI", block[:8])
|
+ (tsresol,))
|
|
def read_block_epb(self, block, size):
|
"""Enhanced Packet Block"""
|
intid, tshigh, tslow, caplen, wirelen = struct.unpack(
|
self.endian + "5I",
|
block[:20],
|
)
|
return (block[20:20 + caplen][:size],
|
(self.interfaces[intid][0], self.interfaces[intid][2],
|
tshigh, tslow, wirelen))
|
|
def read_block_spb(self, block, size):
|
"""Simple Packet Block"""
|
# "it MUST be assumed that all the Simple Packet Blocks have
|
# been captured on the interface previously specified in the
|
# first Interface Description Block."
|
intid = 0
|
wirelen, = struct.unpack(self.endian + "I", block[:4])
|
caplen = min(wirelen, self.interfaces[intid][1])
|
return (block[4:4 + caplen][:size],
|
(self.interfaces[intid][0], self.interfaces[intid][2],
|
None, None, wirelen))
|
|
def read_block_pkt(self, block, size):
|
"""(Obsolete) Packet Block"""
|
intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
|
self.endian + "HH4I",
|
block[:20],
|
)
|
return (block[20:20 + caplen][:size],
|
(self.interfaces[intid][0], self.interfaces[intid][2],
|
tshigh, tslow, wirelen))
|
|
|
class PcapNgReader(RawPcapNgReader):
|
|
alternative = PcapReader
|
|
def __init__(self, filename, fdesc, magic):
|
RawPcapNgReader.__init__(self, filename, fdesc, magic)
|
|
def read_packet(self, size=MTU):
|
rp = RawPcapNgReader.read_packet(self, size=size)
|
if rp is None:
|
return None
|
s, (linktype, tsresol, tshigh, tslow, wirelen) = rp
|
try:
|
p = conf.l2types[linktype](s)
|
except KeyboardInterrupt:
|
raise
|
except:
|
if conf.debug_dissector:
|
raise
|
p = conf.raw_layer(s)
|
if tshigh is not None:
|
p.time = float((tshigh << 32) + tslow) / tsresol
|
return p
|
def read_all(self,count=-1):
|
res = RawPcapNgReader.read_all(self, count)
|
from scapy import plist
|
return plist.PacketList(res, name=os.path.basename(self.filename))
|
def recv(self, size=MTU):
|
return self.read_packet()
|
|
|
class RawPcapWriter:
|
"""A stream PCAP writer with more control than wrpcap()"""
|
def __init__(self, filename, linktype=None, gz=False, endianness="",
|
append=False, sync=False, nano=False):
|
"""
|
filename: the name of the file to write packets to, or an open,
|
writable file-like object.
|
linktype: force linktype to a given value. If None, linktype is taken
|
from the first writer packet
|
gz: compress the capture on the fly
|
endianness: force an endianness (little:"<", big:">"). Default is native
|
append: append packets to the capture file instead of truncating it
|
sync: do not bufferize writes to the capture file
|
nano: use nanosecond-precision (requires libpcap >= 1.5.0)
|
|
"""
|
|
self.linktype = linktype
|
self.header_present = 0
|
self.append = append
|
self.gz = gz
|
self.endian = endianness
|
self.sync = sync
|
self.nano = nano
|
bufsz=4096
|
if sync:
|
bufsz = 0
|
|
if isinstance(filename, six.string_types):
|
self.filename = filename
|
self.f = [open,gzip.open][gz](filename,append and "ab" or "wb", gz and 9 or bufsz)
|
else:
|
self.f = filename
|
self.filename = (filename.name
|
if hasattr(filename, "name") else
|
"No name")
|
|
def fileno(self):
|
return self.f.fileno()
|
|
def _write_header(self, pkt):
|
self.header_present=1
|
|
if self.append:
|
# Even if prone to race conditions, this seems to be
|
# safest way to tell whether the header is already present
|
# because we have to handle compressed streams that
|
# are not as flexible as basic files
|
g = [open,gzip.open][self.gz](self.filename,"rb")
|
if g.read(16):
|
return
|
|
self.f.write(struct.pack(self.endian+"IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4,
|
2, 4, 0, 0, MTU, self.linktype))
|
self.f.flush()
|
|
|
def write(self, pkt):
|
"""accepts either a single packet or a list of packets to be
|
written to the dumpfile
|
|
"""
|
if isinstance(pkt, str):
|
if not self.header_present:
|
self._write_header(pkt)
|
self._write_packet(pkt)
|
else:
|
pkt = pkt.__iter__()
|
if not self.header_present:
|
try:
|
p = next(pkt)
|
except StopIteration:
|
self._write_header(b"")
|
return
|
self._write_header(p)
|
self._write_packet(p)
|
for p in pkt:
|
self._write_packet(p)
|
|
def _write_packet(self, packet, sec=None, usec=None, caplen=None, wirelen=None):
|
"""writes a single packet to the pcap file
|
"""
|
if isinstance(packet, tuple):
|
for pkt in packet:
|
self._write_packet(pkt, sec=sec, usec=usec, caplen=caplen,
|
wirelen=wirelen)
|
return
|
if caplen is None:
|
caplen = len(packet)
|
if wirelen is None:
|
wirelen = caplen
|
if sec is None or usec is None:
|
t=time.time()
|
it = int(t)
|
if sec is None:
|
sec = it
|
if usec is None:
|
usec = int(round((t - it) * (1000000000 if self.nano else 1000000)))
|
self.f.write(struct.pack(self.endian+"IIII", sec, usec, caplen, wirelen))
|
self.f.write(packet)
|
if self.sync:
|
self.f.flush()
|
|
def flush(self):
|
return self.f.flush()
|
|
def close(self):
|
return self.f.close()
|
|
def __enter__(self):
|
return self
|
def __exit__(self, exc_type, exc_value, tracback):
|
self.flush()
|
self.close()
|
|
|
class PcapWriter(RawPcapWriter):
|
"""A stream PCAP writer with more control than wrpcap()"""
|
def _write_header(self, pkt):
|
if isinstance(pkt, tuple) and pkt:
|
pkt = pkt[0]
|
if self.linktype == None:
|
try:
|
self.linktype = conf.l2types[pkt.__class__]
|
except KeyError:
|
warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)", pkt.__class__.__name__)
|
self.linktype = 1
|
RawPcapWriter._write_header(self, pkt)
|
|
def _write_packet(self, packet):
|
if isinstance(packet, tuple):
|
for pkt in packet:
|
self._write_packet(pkt)
|
return
|
sec = int(packet.time)
|
usec = int(round((packet.time - sec) * (1000000000 if self.nano else 1000000)))
|
s = raw(packet)
|
caplen = len(s)
|
RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen)
|
|
|
re_extract_hexcap = re.compile("^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})")
|
|
@conf.commands.register
|
def import_hexcap():
|
p = ""
|
try:
|
while True:
|
l = input().strip()
|
try:
|
p += re_extract_hexcap.match(l).groups()[2]
|
except:
|
warning("Parsing error during hexcap")
|
continue
|
except EOFError:
|
pass
|
|
p = p.replace(" ","")
|
return p.decode("hex")
|
|
|
|
@conf.commands.register
|
def wireshark(pktlist):
|
"""Run wireshark on a list of packets"""
|
f = get_temp_file()
|
wrpcap(f, pktlist)
|
with ContextManagerSubprocess("wireshark()", conf.prog.wireshark):
|
subprocess.Popen([conf.prog.wireshark, "-r", f])
|
|
@conf.commands.register
|
def tcpdump(pktlist, dump=False, getfd=False, args=None,
|
prog=None, getproc=False, quiet=False):
|
"""Run tcpdump or tshark on a list of packets
|
|
pktlist: a Packet instance, a PacketList instance or a list of Packet
|
instances. Can also be a filename (as a string) or an open
|
file-like object that must be a file format readable by
|
tshark (Pcap, PcapNg, etc.)
|
|
dump: when set to True, returns a string instead of displaying it.
|
getfd: when set to True, returns a file-like object to read data
|
from tcpdump or tshark from.
|
getproc: when set to True, the subprocess.Popen object is returned
|
args: arguments (as a list) to pass to tshark (example for tshark:
|
args=["-T", "json"]). Defaults to ["-n"].
|
prog: program to use (defaults to tcpdump, will work with tshark)
|
quiet: when set to True, the process stderr is discarded
|
|
Examples:
|
|
>>> tcpdump([IP()/TCP(), IP()/UDP()])
|
reading from file -, link-type RAW (Raw IP)
|
16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0
|
16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
|
|
>>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
|
1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0
|
2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0
|
|
To get a JSON representation of a tshark-parsed PacketList(), one can:
|
>>> import json, pprint
|
>>> json_data = json.load(tcpdump(IP(src="217.25.178.5", dst="45.33.32.156"),
|
... prog=conf.prog.tshark, args=["-T", "json"],
|
... getfd=True))
|
>>> pprint.pprint(json_data)
|
[{u'_index': u'packets-2016-12-23',
|
u'_score': None,
|
u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
|
u'frame.encap_type': u'7',
|
[...]
|
u'frame.time_relative': u'0.000000000'},
|
u'ip': {u'ip.addr': u'45.33.32.156',
|
u'ip.checksum': u'0x0000a20d',
|
[...]
|
u'ip.ttl': u'64',
|
u'ip.version': u'4'},
|
u'raw': u'Raw packet data'}},
|
u'_type': u'pcap_file'}]
|
>>> json_data[0]['_source']['layers']['ip']['ip.ttl']
|
u'64'
|
|
"""
|
getfd = getfd or getproc
|
if prog is None:
|
prog = [conf.prog.tcpdump]
|
elif isinstance(prog, six.string_types):
|
prog = [prog]
|
_prog_name = "windump()" if WINDOWS else "tcpdump()"
|
if pktlist is None:
|
with ContextManagerSubprocess(_prog_name, prog[0]):
|
proc = subprocess.Popen(
|
prog + (args if args is not None else []),
|
stdout=subprocess.PIPE if dump or getfd else None,
|
stderr=open(os.devnull) if quiet else None,
|
)
|
elif isinstance(pktlist, six.string_types):
|
with ContextManagerSubprocess(_prog_name, prog[0]):
|
proc = subprocess.Popen(
|
prog + ["-r", pktlist] + (args if args is not None else []),
|
stdout=subprocess.PIPE if dump or getfd else None,
|
stderr=open(os.devnull) if quiet else None,
|
)
|
elif DARWIN:
|
# Tcpdump cannot read from stdin, see
|
# <http://apple.stackexchange.com/questions/152682/>
|
tmpfile = tempfile.NamedTemporaryFile(delete=False)
|
try:
|
tmpfile.writelines(iter(lambda: pktlist.read(1048576), b""))
|
except AttributeError:
|
wrpcap(tmpfile, pktlist)
|
else:
|
tmpfile.close()
|
with ContextManagerSubprocess(_prog_name, prog[0]):
|
proc = subprocess.Popen(
|
prog + ["-r", tmpfile.name] + (args if args is not None else []),
|
stdout=subprocess.PIPE if dump or getfd else None,
|
stderr=open(os.devnull) if quiet else None,
|
)
|
conf.temp_files.append(tmpfile.name)
|
else:
|
with ContextManagerSubprocess(_prog_name, prog[0]):
|
proc = subprocess.Popen(
|
prog + ["-r", "-"] + (args if args is not None else []),
|
stdin=subprocess.PIPE,
|
stdout=subprocess.PIPE if dump or getfd else None,
|
stderr=open(os.devnull) if quiet else None,
|
)
|
try:
|
proc.stdin.writelines(iter(lambda: pktlist.read(1048576), b""))
|
except AttributeError:
|
wrpcap(proc.stdin, pktlist)
|
else:
|
proc.stdin.close()
|
if dump:
|
return b"".join(iter(lambda: proc.stdout.read(1048576), b""))
|
if getproc:
|
return proc
|
if getfd:
|
return proc.stdout
|
proc.wait()
|
|
@conf.commands.register
|
def hexedit(x):
|
x = str(x)
|
f = get_temp_file()
|
open(f,"wb").write(x)
|
with ContextManagerSubprocess("hexedit()", conf.prog.hexedit):
|
subprocess.call([conf.prog.hexedit, f])
|
x = open(f).read()
|
os.unlink(f)
|
return x
|
|
def get_terminal_width():
|
"""Get terminal width if in a window"""
|
if WINDOWS:
|
from ctypes import windll, create_string_buffer
|
# http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
|
h = windll.kernel32.GetStdHandle(-12)
|
csbi = create_string_buffer(22)
|
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
|
if res:
|
import struct
|
(bufx, bufy, curx, cury, wattr,
|
left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
|
sizex = right - left + 1
|
#sizey = bottom - top + 1
|
return sizex
|
else:
|
return None
|
else:
|
sizex = 0
|
try:
|
import struct, fcntl, termios
|
s = struct.pack('HHHH', 0, 0, 0, 0)
|
x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
|
sizex = struct.unpack('HHHH', x)[1]
|
except IOError:
|
pass
|
if not sizex:
|
try:
|
sizex = int(os.environ['COLUMNS'])
|
except:
|
pass
|
if sizex:
|
return sizex
|
else:
|
return None
|
|
def pretty_list(rtlst, header, sortBy=0):
|
"""Pretty list to fit the terminal, and add header"""
|
_l_header = len(header[0])
|
_space = " "
|
# Sort correctly
|
rtlst.sort(key=lambda x: x[sortBy])
|
# Append tag
|
rtlst = header + rtlst
|
# Detect column's width
|
colwidth = [max([len(y) for y in x]) for x in zip(*rtlst)]
|
# Make text fit in box (if exist)
|
# TODO: find a better and more precise way of doing this. That's currently working but very complicated
|
width = get_terminal_width()
|
if width:
|
if sum(colwidth) > width:
|
# Needs to be cropped
|
_med = (width // _l_header) - (1 if WINDOWS else 0) # Windows has a fat window border
|
# Crop biggest until size is correct
|
for i in range(1, len(colwidth)): # Should use while, but this is safer
|
if (sum(colwidth)+6) <= width:
|
break
|
_max = max(colwidth)
|
colwidth = [_med if x == _max else x for x in colwidth]
|
def _crop(x, width):
|
_r = x[:width]
|
if _r != x:
|
_r = x[:width-3]
|
return _r + "..."
|
return _r
|
rtlst = [tuple([_crop(rtlst[j][i], colwidth[i]) for i in range(0, len(rtlst[j]))]) for j in range(0, len(rtlst))]
|
# Recalculate column's width
|
colwidth = [max([len(y) for y in x]) for x in zip(*rtlst)]
|
fmt = _space.join(["%%-%ds"%x for x in colwidth])
|
rt = "\n".join([fmt % x for x in rtlst])
|
return rt
|
|
def __make_table(yfmtfunc, fmtfunc, endline, data, fxyz, sortx=None, sorty=None, seplinefunc=None):
|
vx = {}
|
vy = {}
|
vz = {}
|
vxf = {}
|
vyf = {}
|
l = 0
|
for e in data:
|
xx, yy, zz = [str(s) for s in fxyz(e)]
|
l = max(len(yy),l)
|
vx[xx] = max(vx.get(xx,0), len(xx), len(zz))
|
vy[yy] = None
|
vz[(xx,yy)] = zz
|
|
vxk = list(vx)
|
vyk = list(vy)
|
if sortx:
|
vxk.sort(key=sortx)
|
else:
|
try:
|
vxk.sort(key=int)
|
except:
|
try:
|
vxk.sort(key=atol)
|
except:
|
vxk.sort()
|
if sorty:
|
vyk.sort(key=sorty)
|
else:
|
try:
|
vyk.sort(key=int)
|
except:
|
try:
|
vyk.sort(key=atol)
|
except:
|
vyk.sort()
|
|
|
if seplinefunc:
|
sepline = seplinefunc(l, [vx[x] for x in vxk])
|
print(sepline)
|
|
fmt = yfmtfunc(l)
|
print(fmt % "", end=' ')
|
for x in vxk:
|
vxf[x] = fmtfunc(vx[x])
|
print(vxf[x] % x, end=' ')
|
print(endline)
|
if seplinefunc:
|
print(sepline)
|
for y in vyk:
|
print(fmt % y, end=' ')
|
for x in vxk:
|
print(vxf[x] % vz.get((x,y), "-"), end=' ')
|
print(endline)
|
if seplinefunc:
|
print(sepline)
|
|
def make_table(*args, **kargs):
|
__make_table(lambda l:"%%-%is" % l, lambda l:"%%-%is" % l, "", *args, **kargs)
|
|
def make_lined_table(*args, **kargs):
|
__make_table(lambda l:"%%-%is |" % l, lambda l:"%%-%is |" % l, "",
|
seplinefunc=lambda a,x:"+".join('-'*(y+2) for y in [a-1]+x+[-2]),
|
*args, **kargs)
|
|
def make_tex_table(*args, **kargs):
|
__make_table(lambda l: "%s", lambda l: "& %s", "\\\\", seplinefunc=lambda a,x:"\\hline", *args, **kargs)
|
|
###############################################
|
### WHOIS CLIENT (not available on windows) ###
|
###############################################
|
|
def whois(ip_address):
|
"""Whois client for Python"""
|
whois_ip = str(ip_address)
|
try:
|
query = socket.gethostbyname(whois_ip)
|
except:
|
query = whois_ip
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
s.connect(("whois.ripe.net", 43))
|
s.send(query.encode("utf8") + b"\r\n")
|
answer = b""
|
while True:
|
d = s.recv(4096)
|
answer += d
|
if not d:
|
break
|
s.close()
|
ignore_tag = b"remarks:"
|
# ignore all lines starting with the ignore_tag
|
lines = [ line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))]
|
# remove empty lines at the bottom
|
for i in range(1, len(lines)):
|
if not lines[-i].strip():
|
del lines[-i]
|
else:
|
break
|
return b"\n".join(lines[3:])
|