## This file is part of Scapy
|
## Copyright (C) 2017 Maxence Tury
|
## This program is published under a GPLv2 license
|
|
"""
|
Common TLS 1.3 fields & bindings.
|
|
This module covers the record layer, along with the ChangeCipherSpec, Alert and
|
ApplicationData submessages. For the Handshake type, see tls_handshake.py.
|
|
See the TLS class documentation for more information.
|
"""
|
|
import struct
|
|
from scapy.config import conf
|
from scapy.error import log_runtime
|
from scapy.fields import *
|
from scapy.packet import *
|
from scapy.layers.tls.session import _GenericTLSSessionInheritance
|
from scapy.layers.tls.basefields import (_TLSVersionField, _tls_version,
|
_TLSMACField, _TLSLengthField, _tls_type)
|
from scapy.layers.tls.record import _TLSMsgListField
|
from scapy.layers.tls.crypto.cipher_aead import AEADTagError
|
from scapy.layers.tls.crypto.cipher_stream import Cipher_NULL
|
from scapy.layers.tls.crypto.ciphers import CipherError
|
|
|
###############################################################################
|
### TLS Record Protocol ###
|
###############################################################################
|
|
class TLSInnerPlaintext(_GenericTLSSessionInheritance):
|
name = "TLS Inner Plaintext"
|
fields_desc = [ _TLSMsgListField("msg", []),
|
ByteEnumField("type", None, _tls_type),
|
XStrField("pad", "") ]
|
|
def pre_dissect(self, s):
|
"""
|
We need to parse the padding and type as soon as possible,
|
else we won't be able to parse the message list...
|
"""
|
if len(s) < 1:
|
raise Exception("Invalid InnerPlaintext (too short).")
|
|
l = len(s) - 1
|
if s[-1] != b"\x00":
|
msg_len = l
|
else:
|
n = 1
|
while s[-n] != b"\x00" and n < l:
|
n += 1
|
msg_len = l - n
|
self.fields_desc[0].length_from = lambda pkt: msg_len
|
|
self.type = struct.unpack("B", s[msg_len:msg_len+1])[0]
|
|
return s
|
|
class _TLSInnerPlaintextField(PacketField):
|
def __init__(self, name, default, *args, **kargs):
|
super(_TLSInnerPlaintextField, self).__init__(name,
|
default,
|
TLSInnerPlaintext)
|
|
def m2i(self, pkt, m):
|
return self.cls(m, tls_session=pkt.tls_session)
|
|
def getfield(self, pkt, s):
|
tag_len = pkt.tls_session.rcs.mac_len
|
frag_len = pkt.len - tag_len
|
if frag_len < 1:
|
warning("InnerPlaintext should at least contain a byte type!")
|
return s, None
|
remain, i = super(_TLSInnerPlaintextField, self).getfield(pkt, s[:frag_len])
|
# remain should be empty here
|
return remain + s[frag_len:], i
|
|
def i2m(self, pkt, p):
|
if isinstance(p, _GenericTLSSessionInheritance):
|
p.tls_session = pkt.tls_session
|
if not pkt.tls_session.frozen:
|
return p.raw_stateful()
|
return raw(p)
|
|
|
class TLS13(_GenericTLSSessionInheritance):
|
__slots__ = ["deciphered_len"]
|
name = "TLS 1.3"
|
fields_desc = [ ByteEnumField("type", 0x17, _tls_type),
|
_TLSVersionField("version", 0x0301, _tls_version),
|
_TLSLengthField("len", None),
|
_TLSInnerPlaintextField("inner", TLSInnerPlaintext()),
|
_TLSMACField("auth_tag", None) ]
|
|
def __init__(self, *args, **kargs):
|
self.deciphered_len = kargs.get("deciphered_len", None)
|
super(TLS13, self).__init__(*args, **kargs)
|
|
|
### Parsing methods
|
|
def _tls_auth_decrypt(self, s):
|
"""
|
Provided with the record header and AEAD-ciphered data, return the
|
sliced and clear tuple (TLSInnerPlaintext, tag). Note that
|
we still return the slicing of the original input in case of decryption
|
failure. Also, if the integrity check fails, a warning will be issued,
|
but we still return the sliced (unauthenticated) plaintext.
|
"""
|
rcs = self.tls_session.rcs
|
read_seq_num = struct.pack("!Q", rcs.seq_num)
|
rcs.seq_num += 1
|
try:
|
return rcs.cipher.auth_decrypt(b"", s, read_seq_num)
|
except CipherError as e:
|
return e.args
|
except AEADTagError as e:
|
pkt_info = self.firstlayer().summary()
|
log_runtime.info("TLS: record integrity check failed [%s]", pkt_info)
|
return e.args
|
|
def pre_dissect(self, s):
|
"""
|
Decrypt, verify and decompress the message.
|
"""
|
if len(s) < 5:
|
raise Exception("Invalid record: header is too short.")
|
|
if isinstance(self.tls_session.rcs.cipher, Cipher_NULL):
|
self.deciphered_len = None
|
return s
|
else:
|
msglen = struct.unpack('!H', s[3:5])[0]
|
hdr, efrag, r = s[:5], s[5:5+msglen], s[msglen+5:]
|
frag, auth_tag = self._tls_auth_decrypt(efrag)
|
self.deciphered_len = len(frag)
|
return hdr + frag + auth_tag + r
|
|
def post_dissect(self, s):
|
"""
|
Commit the pending read state if it has been triggered. We update
|
nothing if the prcs was not set, as this probably means that we're
|
working out-of-context (and we need to keep the default rcs).
|
"""
|
if self.tls_session.triggered_prcs_commit:
|
if self.tls_session.prcs is not None:
|
self.tls_session.rcs = self.tls_session.prcs
|
self.tls_session.prcs = None
|
self.tls_session.triggered_prcs_commit = False
|
return s
|
|
def do_dissect_payload(self, s):
|
"""
|
Try to dissect the following data as a TLS message.
|
Note that overloading .guess_payload_class() would not be enough,
|
as the TLS session to be used would get lost.
|
"""
|
if s:
|
try:
|
p = TLS(s, _internal=1, _underlayer=self,
|
tls_session = self.tls_session)
|
except KeyboardInterrupt:
|
raise
|
except:
|
p = conf.raw_layer(s, _internal=1, _underlayer=self)
|
self.add_payload(p)
|
|
|
### Building methods
|
|
def _tls_auth_encrypt(self, s):
|
"""
|
Return the TLSCiphertext.encrypted_record for AEAD ciphers.
|
"""
|
wcs = self.tls_session.wcs
|
write_seq_num = struct.pack("!Q", wcs.seq_num)
|
wcs.seq_num += 1
|
return wcs.cipher.auth_encrypt(s, b"", write_seq_num)
|
|
def post_build(self, pkt, pay):
|
"""
|
Apply the previous methods according to the writing cipher type.
|
"""
|
# Compute the length of TLSPlaintext fragment
|
hdr, frag = pkt[:5], pkt[5:]
|
if not isinstance(self.tls_session.rcs.cipher, Cipher_NULL):
|
frag = self._tls_auth_encrypt(frag)
|
|
if self.len is not None:
|
# The user gave us a 'len', let's respect this ultimately
|
hdr = hdr[:3] + struct.pack("!H", self.len)
|
else:
|
# Update header with the length of TLSCiphertext.inner
|
hdr = hdr[:3] + struct.pack("!H", len(frag))
|
|
# Now we commit the pending write state if it has been triggered. We
|
# update nothing if the pwcs was not set. This probably means that
|
# we're working out-of-context (and we need to keep the default wcs).
|
if self.tls_session.triggered_pwcs_commit:
|
if self.tls_session.pwcs is not None:
|
self.tls_session.wcs = self.tls_session.pwcs
|
self.tls_session.pwcs = None
|
self.tls_session.triggered_pwcs_commit = False
|
|
return hdr + frag + pay
|