## This file is part of Scapy
|
## See http://www.secdev.org/projects/scapy for more informations
|
## Copyright (C) Philippe Biondi <phil@secdev.org>
|
## This program is published under a GPLv2 license
|
|
"""
|
Main module for interactive startup.
|
"""
|
|
from __future__ import absolute_import
|
from __future__ import print_function
|
|
import sys, os, getopt, re, code
|
import gzip, glob
|
import importlib
|
import logging
|
from random import choice
|
import types
|
import io
|
|
# Never add any global import, in main.py, that would trigger a warning messsage
|
# before the console handlers gets added in interact()
|
from scapy.error import log_interactive, log_loading, log_scapy, warning
|
import scapy.modules.six as six
|
from scapy.themes import DefaultTheme, apply_ipython_style
|
|
IGNORED = list(six.moves.builtins.__dict__)
|
|
GLOBKEYS = []
|
|
LAYER_ALIASES = {
|
"tls": "tls.all"
|
}
|
|
QUOTES = [
|
("Craft packets like it is your last day on earth.", "Lao-Tze"),
|
("Craft packets like I craft my beer.", "Jean De Clerck"),
|
("Craft packets before they craft you.", "Socrate"),
|
("Craft me if you can.", "IPv6 layer"),
|
("To craft a packet, you have to be a packet, and learn how to swim in the "
|
"wires and in the waves.", "Jean-Claude Van Damme"),
|
]
|
|
def _probe_config_file(cf):
|
cf_path = os.path.join(os.path.expanduser("~"), cf)
|
try:
|
os.stat(cf_path)
|
except OSError:
|
return None
|
else:
|
return cf_path
|
|
def _read_config_file(cf, _globals=globals(), _locals=locals(), interactive=True):
|
"""Read a config file: execute a python file while loading scapy, that may contain
|
some pre-configured values.
|
|
If _globals or _locals are specified, they will be updated with the loaded vars.
|
This allows an external program to use the function. Otherwise, vars are only available
|
from inside the scapy console.
|
|
params:
|
- _globals: the globals() vars
|
- _locals: the locals() vars
|
- interactive: specified whether or not errors should be printed using the scapy console or
|
raised.
|
|
ex, content of a config.py file:
|
'conf.verb = 42\n'
|
Manual loading:
|
>>> _read_config_file("./config.py"))
|
>>> conf.verb
|
42
|
"""
|
log_loading.debug("Loading config file [%s]", cf)
|
try:
|
exec(compile(open(cf).read(), cf, 'exec'), _globals, _locals)
|
except IOError as e:
|
if interactive:
|
raise
|
log_loading.warning("Cannot read config file [%s] [%s]", cf, e)
|
except Exception as e:
|
if interactive:
|
raise
|
log_loading.exception("Error during evaluation of config file [%s]", cf)
|
|
def _validate_local(x):
|
"""Returns whether or not a variable should be imported.
|
Will return False for any default modules (sys), or if
|
they are detected as private vars (starting with a _)"""
|
global IGNORED
|
return x[0] != "_" and not x in IGNORED
|
|
DEFAULT_PRESTART_FILE = _probe_config_file(".scapy_prestart.py")
|
DEFAULT_STARTUP_FILE = _probe_config_file(".scapy_startup.py")
|
SESSION = None
|
|
def _usage():
|
print("""Usage: scapy.py [-s sessionfile] [-c new_startup_file] [-p new_prestart_file] [-C] [-P]
|
-C: do not read startup file
|
-P: do not read pre-startup file""")
|
sys.exit(0)
|
|
|
######################
|
## Extension system ##
|
######################
|
|
|
def _load(module, globals_dict=None, symb_list=None):
|
"""Loads a Python module to make variables, objects and functions
|
available globally.
|
|
The idea is to load the module using importlib, then copy the
|
symbols to the global symbol table.
|
|
"""
|
if globals_dict is None:
|
globals_dict = six.moves.builtins.__dict__
|
try:
|
mod = importlib.import_module(module)
|
if '__all__' in mod.__dict__:
|
# import listed symbols
|
for name in mod.__dict__['__all__']:
|
if symb_list is not None:
|
symb_list.append(name)
|
globals_dict[name] = mod.__dict__[name]
|
else:
|
# only import non-private symbols
|
for name, sym in six.iteritems(mod.__dict__):
|
if _validate_local(name):
|
if symb_list is not None:
|
symb_list.append(name)
|
globals_dict[name] = sym
|
except Exception:
|
log_interactive.error("Loading module %s", module, exc_info=True)
|
|
def load_module(name):
|
"""Loads a Scapy module to make variables, objects and functions
|
available globally.
|
|
"""
|
_load("scapy.modules."+name)
|
|
def load_layer(name, globals_dict=None, symb_list=None):
|
"""Loads a Scapy layer module to make variables, objects and functions
|
available globally.
|
|
"""
|
_load("scapy.layers." + LAYER_ALIASES.get(name, name),
|
globals_dict=globals_dict, symb_list=symb_list)
|
|
def load_contrib(name):
|
"""Loads a Scapy contrib module to make variables, objects and
|
functions available globally.
|
|
If no contrib module can be found with the given name, try to find
|
a layer module, since a contrib module may become a layer module.
|
|
"""
|
try:
|
importlib.import_module("scapy.contrib." + name)
|
_load("scapy.contrib." + name)
|
except ImportError:
|
# if layer not found in contrib, try in layers
|
load_layer(name)
|
|
def list_contrib(name=None):
|
if name is None:
|
name="*.py"
|
elif "*" not in name and "?" not in name and not name.endswith(".py"):
|
name += ".py"
|
name = os.path.join(os.path.dirname(__file__), "contrib", name)
|
for f in sorted(glob.glob(name)):
|
mod = os.path.basename(f)
|
if mod.startswith("__"):
|
continue
|
if mod.endswith(".py"):
|
mod = mod[:-3]
|
desc = { "description":"-", "status":"?", "name":mod }
|
for l in io.open(f, errors="replace"):
|
p = l.find("scapy.contrib.")
|
if p >= 0:
|
p += 14
|
q = l.find("=", p)
|
key = l[p:q].strip()
|
value = l[q+1:].strip()
|
desc[key] = value
|
print("%(name)-20s: %(description)-40s status=%(status)s" % desc)
|
|
|
|
|
|
|
##############################
|
## Session saving/restoring ##
|
##############################
|
|
def update_ipython_session(session):
|
"""Updates IPython session with a custom one"""
|
try:
|
get_ipython().user_ns.update(session)
|
except:
|
pass
|
|
def save_session(fname=None, session=None, pickleProto=-1):
|
"""Save current Scapy session to the file specified in the fname arg.
|
|
params:
|
- fname: file to save the scapy session in
|
- session: scapy session to use. If None, the console one will be used
|
- pickleProto: pickle proto version (default: -1 = latest)"""
|
from scapy import utils
|
if fname is None:
|
fname = conf.session
|
if not fname:
|
conf.session = fname = utils.get_temp_file(keep=True)
|
log_interactive.info("Use [%s] as session file" % fname)
|
|
if session is None:
|
try:
|
session = get_ipython().user_ns
|
except:
|
session = six.moves.builtins.__dict__["scapy_session"]
|
|
to_be_saved = session.copy()
|
if "__builtins__" in to_be_saved:
|
del(to_be_saved["__builtins__"])
|
|
for k in list(to_be_saved):
|
i = to_be_saved[k]
|
if hasattr(i, "__module__") and (k[0] == "_" or i.__module__.startswith("IPython")):
|
del(to_be_saved[k])
|
if isinstance(i, ConfClass):
|
del(to_be_saved[k])
|
elif isinstance(i, (type, type, types.ModuleType)):
|
if k[0] != "_":
|
log_interactive.error("[%s] (%s) can't be saved.", k, type(to_be_saved[k]))
|
del(to_be_saved[k])
|
|
try:
|
os.rename(fname, fname+".bak")
|
except OSError:
|
pass
|
|
f=gzip.open(fname,"wb")
|
six.moves.cPickle.dump(to_be_saved, f, pickleProto)
|
f.close()
|
del f
|
|
def load_session(fname=None):
|
"""Load current Scapy session from the file specified in the fname arg.
|
This will erase any existing session.
|
|
params:
|
- fname: file to load the scapy session from"""
|
if fname is None:
|
fname = conf.session
|
try:
|
s = six.moves.cPickle.load(gzip.open(fname,"rb"))
|
except IOError:
|
try:
|
s = six.moves.cPickle.load(open(fname,"rb"))
|
except IOError:
|
# Raise "No such file exception"
|
raise
|
|
scapy_session = six.moves.builtins.__dict__["scapy_session"]
|
scapy_session.clear()
|
scapy_session.update(s)
|
update_ipython_session(scapy_session)
|
|
log_loading.info("Loaded session [%s]" % fname)
|
|
def update_session(fname=None):
|
"""Update current Scapy session from the file specified in the fname arg.
|
|
params:
|
- fname: file to load the scapy session from"""
|
if fname is None:
|
fname = conf.session
|
try:
|
s = six.moves.cPickle.load(gzip.open(fname,"rb"))
|
except IOError:
|
s = six.moves.cPickle.load(open(fname,"rb"))
|
scapy_session = six.moves.builtins.__dict__["scapy_session"]
|
scapy_session.update(s)
|
update_ipython_session(scapy_session)
|
|
def init_session(session_name, mydict=None):
|
global SESSION
|
global GLOBKEYS
|
|
scapy_builtins = {k: v for k, v in six.iteritems(importlib.import_module(".all", "scapy").__dict__) if _validate_local(k)}
|
six.moves.builtins.__dict__.update(scapy_builtins)
|
GLOBKEYS.extend(scapy_builtins)
|
GLOBKEYS.append("scapy_session")
|
scapy_builtins=None # XXX replace with "with" statement
|
|
if session_name:
|
try:
|
os.stat(session_name)
|
except OSError:
|
log_loading.info("New session [%s]" % session_name)
|
else:
|
try:
|
try:
|
SESSION = six.moves.cPickle.load(gzip.open(session_name,"rb"))
|
except IOError:
|
SESSION = six.moves.cPickle.load(open(session_name,"rb"))
|
log_loading.info("Using session [%s]" % session_name)
|
except EOFError:
|
log_loading.error("Error opening session [%s]" % session_name)
|
except AttributeError:
|
log_loading.error("Error opening session [%s]. Attribute missing" % session_name)
|
|
if SESSION:
|
if "conf" in SESSION:
|
conf.configure(SESSION["conf"])
|
SESSION["conf"] = conf
|
else:
|
conf.session = session_name
|
SESSION = {"conf":conf}
|
else:
|
SESSION = {"conf": conf}
|
|
six.moves.builtins.__dict__["scapy_session"] = SESSION
|
|
if mydict is not None:
|
six.moves.builtins.__dict__["scapy_session"].update(mydict)
|
update_ipython_session(mydict)
|
GLOBKEYS.extend(mydict)
|
|
################
|
##### Main #####
|
################
|
|
def scapy_delete_temp_files():
|
for f in conf.temp_files:
|
try:
|
os.unlink(f)
|
except:
|
pass
|
del(conf.temp_files[:])
|
|
def _prepare_quote(quote, author, max_len=78):
|
"""This function processes a quote and returns a string that is ready
|
to be used in the fancy prompt.
|
|
"""
|
quote = quote.split(' ')
|
max_len -= 6
|
lines = []
|
cur_line = []
|
def _len(line):
|
return sum(len(elt) for elt in line) + len(line) - 1
|
while quote:
|
if not cur_line or (_len(cur_line) + len(quote[0]) - 1 <= max_len):
|
cur_line.append(quote.pop(0))
|
continue
|
lines.append(' | %s' % ' '.join(cur_line))
|
cur_line = []
|
if cur_line:
|
lines.append(' | %s' % ' '.join(cur_line))
|
cur_line = []
|
lines.append(' | %s-- %s' % (" " * (max_len - len(author) - 5), author))
|
return lines
|
|
def interact(mydict=None,argv=None,mybanner=None,loglevel=20):
|
global SESSION
|
global GLOBKEYS
|
|
console_handler = logging.StreamHandler()
|
console_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
|
log_scapy.addHandler(console_handler)
|
|
from scapy.config import conf
|
conf.color_theme = DefaultTheme()
|
conf.interactive = True
|
if loglevel is not None:
|
conf.logLevel = loglevel
|
|
STARTUP_FILE = DEFAULT_STARTUP_FILE
|
PRESTART_FILE = DEFAULT_PRESTART_FILE
|
|
session_name = None
|
|
if argv is None:
|
argv = sys.argv
|
|
try:
|
opts = getopt.getopt(argv[1:], "hs:Cc:Pp:d")
|
for opt, parm in opts[0]:
|
if opt == "-h":
|
_usage()
|
elif opt == "-s":
|
session_name = parm
|
elif opt == "-c":
|
STARTUP_FILE = parm
|
elif opt == "-C":
|
STARTUP_FILE = None
|
elif opt == "-p":
|
PRESTART_FILE = parm
|
elif opt == "-P":
|
PRESTART_FILE = None
|
elif opt == "-d":
|
conf.logLevel = max(1, conf.logLevel-10)
|
|
if len(opts[1]) > 0:
|
raise getopt.GetoptError("Too many parameters : [%s]" % " ".join(opts[1]))
|
|
|
except getopt.GetoptError as msg:
|
log_loading.error(msg)
|
sys.exit(1)
|
|
init_session(session_name, mydict)
|
|
if STARTUP_FILE:
|
_read_config_file(STARTUP_FILE, interactive=True)
|
if PRESTART_FILE:
|
_read_config_file(PRESTART_FILE, interactive=True)
|
|
if conf.fancy_prompt:
|
|
the_logo = [
|
" ",
|
" aSPY//YASa ",
|
" apyyyyCY//////////YCa ",
|
" sY//////YSpcs scpCY//Pp ",
|
" ayp ayyyyyyySCP//Pp syY//C ",
|
" AYAsAYYYYYYYY///Ps cY//S",
|
" pCCCCY//p cSSps y//Y",
|
" SPPPP///a pP///AC//Y",
|
" A//A cyP////C",
|
" p///Ac sC///a",
|
" P////YCpc A//A",
|
" scccccp///pSP///p p//Y",
|
" sY/////////y caa S//P",
|
" cayCyayP//Ya pY/Ya",
|
" sY/PsY////YCc aC//Yp ",
|
" sc sccaCY//PCypaapyCP//YSs ",
|
" spCPY//////YPSps ",
|
" ccaacs ",
|
" ",
|
]
|
|
the_banner = [
|
"",
|
"",
|
" |",
|
" | Welcome to Scapy",
|
" | Version %s" % conf.version,
|
" |",
|
" | https://github.com/secdev/scapy",
|
" |",
|
" | Have fun!",
|
" |",
|
]
|
|
quote, author = choice(QUOTES)
|
the_banner.extend(_prepare_quote(quote, author, max_len=39))
|
the_banner.append(" |")
|
the_banner = "\n".join(
|
logo + banner for logo, banner in six.moves.zip_longest(
|
(conf.color_theme.logo(line) for line in the_logo),
|
(conf.color_theme.success(line) for line in the_banner),
|
fillvalue=""
|
)
|
)
|
else:
|
the_banner = "Welcome to Scapy (%s)" % conf.version
|
if mybanner is not None:
|
the_banner += "\n"
|
the_banner += mybanner
|
|
if not conf.interactive_shell or conf.interactive_shell.lower() in [
|
"ipython", "auto"
|
]:
|
try:
|
import IPython
|
from IPython.terminal.embed import InteractiveShellEmbed
|
except ImportError:
|
log_loading.warning(
|
"IPython not available. Using standard Python shell "
|
"instead.\nAutoCompletion, History are disabled."
|
)
|
IPYTHON = False
|
else:
|
IPYTHON = True
|
else:
|
IPYTHON = False
|
|
init_session(session_name, mydict)
|
|
if IPYTHON:
|
banner = the_banner + " using IPython %s\n" % IPython.__version__
|
try:
|
from traitlets.config.loader import Config
|
except ImportError:
|
log_loading.warning(
|
"traitlets not available. Some Scapy shell features won't be "
|
"available."
|
)
|
try:
|
ipshell = InteractiveShellEmbed(
|
banner1=banner,
|
user_ns=SESSION,
|
)
|
except:
|
code.interact(banner = the_banner, local=SESSION)
|
else:
|
cfg = Config()
|
try:
|
get_ipython
|
except NameError:
|
# Set "classic" prompt style when launched from run_scapy(.bat) files
|
# Register and apply scapy color+prompt style
|
apply_ipython_style(shell=cfg.TerminalInteractiveShell)
|
cfg.TerminalInteractiveShell.confirm_exit = False
|
cfg.TerminalInteractiveShell.separate_in = u''
|
cfg.TerminalInteractiveShell.hist_file = conf.histfile
|
# configuration can thus be specified here.
|
try:
|
ipshell = InteractiveShellEmbed(config=cfg,
|
banner1=banner,
|
hist_file=conf.histfile if conf.histfile else None,
|
user_ns=SESSION)
|
except (AttributeError, TypeError):
|
log_loading.warning("IPython too old. Won't support history and color style.")
|
try:
|
ipshell = InteractiveShellEmbed(
|
banner1=banner,
|
user_ns=SESSION,
|
)
|
except:
|
code.interact(banner = the_banner, local=SESSION)
|
ipshell(local_ns=SESSION)
|
else:
|
code.interact(banner = the_banner, local=SESSION)
|
|
if conf.session:
|
save_session(conf.session, SESSION)
|
|
for k in GLOBKEYS:
|
try:
|
del(six.moves.builtins.__dict__[k])
|
except:
|
pass
|
|
if __name__ == "__main__":
|
interact()
|