lin
2025-07-30 fcd736bf35fd93b563e9bbf594f2aa7b62028cc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
## This file is part of Scapy
## Copyright (C) 2017 Maxence Tury
## This program is published under a GPLv2 license
 
"""
Common TLS 1.3 fields & bindings.
 
This module covers the record layer, along with the ChangeCipherSpec, Alert and
ApplicationData submessages. For the Handshake type, see tls_handshake.py.
 
See the TLS class documentation for more information.
"""
 
import struct
 
from scapy.config import conf
from scapy.error import log_runtime
from scapy.fields import *
from scapy.packet import *
from scapy.layers.tls.session import _GenericTLSSessionInheritance
from scapy.layers.tls.basefields import (_TLSVersionField, _tls_version,
                                         _TLSMACField, _TLSLengthField, _tls_type)
from scapy.layers.tls.record import _TLSMsgListField
from scapy.layers.tls.crypto.cipher_aead import AEADTagError
from scapy.layers.tls.crypto.cipher_stream import Cipher_NULL
from scapy.layers.tls.crypto.ciphers import CipherError
 
 
###############################################################################
### TLS Record Protocol                                                     ###
###############################################################################
 
class TLSInnerPlaintext(_GenericTLSSessionInheritance):
    name = "TLS Inner Plaintext"
    fields_desc = [ _TLSMsgListField("msg", []),
                    ByteEnumField("type", None, _tls_type),
                    XStrField("pad", "") ]
 
    def pre_dissect(self, s):
        """
        We need to parse the padding and type as soon as possible,
        else we won't be able to parse the message list...
        """
        if len(s) < 1:
            raise Exception("Invalid InnerPlaintext (too short).")
 
        l = len(s) - 1
        if s[-1] != b"\x00":
            msg_len = l
        else:
            n = 1
            while s[-n] != b"\x00" and n < l:
                n += 1
            msg_len = l - n
        self.fields_desc[0].length_from = lambda pkt: msg_len
 
        self.type = struct.unpack("B", s[msg_len:msg_len+1])[0]
 
        return s
 
class _TLSInnerPlaintextField(PacketField):
    def __init__(self, name, default, *args, **kargs):
        super(_TLSInnerPlaintextField, self).__init__(name,
                                                      default,
                                                      TLSInnerPlaintext)
 
    def m2i(self, pkt, m):
        return self.cls(m, tls_session=pkt.tls_session)
 
    def getfield(self, pkt, s):
        tag_len = pkt.tls_session.rcs.mac_len
        frag_len = pkt.len - tag_len
        if frag_len < 1:
            warning("InnerPlaintext should at least contain a byte type!")
            return s, None
        remain, i = super(_TLSInnerPlaintextField, self).getfield(pkt, s[:frag_len])
        # remain should be empty here
        return remain + s[frag_len:], i
 
    def i2m(self, pkt, p):
        if isinstance(p, _GenericTLSSessionInheritance):
            p.tls_session = pkt.tls_session
            if not pkt.tls_session.frozen:
                return p.raw_stateful()
        return raw(p)
 
 
class TLS13(_GenericTLSSessionInheritance):
    __slots__ = ["deciphered_len"]
    name = "TLS 1.3"
    fields_desc = [ ByteEnumField("type", 0x17, _tls_type),
                    _TLSVersionField("version", 0x0301, _tls_version),
                    _TLSLengthField("len", None),
                    _TLSInnerPlaintextField("inner", TLSInnerPlaintext()),
                    _TLSMACField("auth_tag", None) ]
 
    def __init__(self, *args, **kargs):
        self.deciphered_len = kargs.get("deciphered_len", None)
        super(TLS13, self).__init__(*args, **kargs)
 
 
    ### Parsing methods
 
    def _tls_auth_decrypt(self, s):
        """
        Provided with the record header and AEAD-ciphered data, return the
        sliced and clear tuple (TLSInnerPlaintext, tag). Note that
        we still return the slicing of the original input in case of decryption
        failure. Also, if the integrity check fails, a warning will be issued,
        but we still return the sliced (unauthenticated) plaintext.
        """
        rcs = self.tls_session.rcs
        read_seq_num = struct.pack("!Q", rcs.seq_num)
        rcs.seq_num += 1
        try:
            return rcs.cipher.auth_decrypt(b"", s, read_seq_num)
        except CipherError as e:
            return e.args
        except AEADTagError as e:
            pkt_info = self.firstlayer().summary()
            log_runtime.info("TLS: record integrity check failed [%s]", pkt_info)
            return e.args
 
    def pre_dissect(self, s):
        """
        Decrypt, verify and decompress the message.
        """
        if len(s) < 5:
            raise Exception("Invalid record: header is too short.")
 
        if isinstance(self.tls_session.rcs.cipher, Cipher_NULL):
            self.deciphered_len = None
            return s
        else:
            msglen = struct.unpack('!H', s[3:5])[0]
            hdr, efrag, r = s[:5], s[5:5+msglen], s[msglen+5:]
            frag, auth_tag = self._tls_auth_decrypt(efrag)
            self.deciphered_len = len(frag)
            return hdr + frag + auth_tag + r
 
    def post_dissect(self, s):
        """
        Commit the pending read state if it has been triggered. We update
        nothing if the prcs was not set, as this probably means that we're
        working out-of-context (and we need to keep the default rcs).
        """
        if self.tls_session.triggered_prcs_commit:
            if self.tls_session.prcs is not None:
                self.tls_session.rcs = self.tls_session.prcs
                self.tls_session.prcs = None
            self.tls_session.triggered_prcs_commit = False
        return s
 
    def do_dissect_payload(self, s):
        """
        Try to dissect the following data as a TLS message.
        Note that overloading .guess_payload_class() would not be enough,
        as the TLS session to be used would get lost.
        """
        if s:
            try:
                p = TLS(s, _internal=1, _underlayer=self,
                        tls_session = self.tls_session)
            except KeyboardInterrupt:
                raise
            except:
                p = conf.raw_layer(s, _internal=1, _underlayer=self)
            self.add_payload(p)
 
 
    ### Building methods
 
    def _tls_auth_encrypt(self, s):
        """
        Return the TLSCiphertext.encrypted_record for AEAD ciphers.
        """
        wcs = self.tls_session.wcs
        write_seq_num = struct.pack("!Q", wcs.seq_num)
        wcs.seq_num += 1
        return wcs.cipher.auth_encrypt(s, b"", write_seq_num)
 
    def post_build(self, pkt, pay):
        """
        Apply the previous methods according to the writing cipher type.
        """
        # Compute the length of TLSPlaintext fragment
        hdr, frag = pkt[:5], pkt[5:]
        if not isinstance(self.tls_session.rcs.cipher, Cipher_NULL):
            frag = self._tls_auth_encrypt(frag)
 
        if self.len is not None:
            # The user gave us a 'len', let's respect this ultimately
            hdr = hdr[:3] + struct.pack("!H", self.len)
        else:
            # Update header with the length of TLSCiphertext.inner
            hdr = hdr[:3] + struct.pack("!H", len(frag))
 
        # Now we commit the pending write state if it has been triggered. We
        # update nothing if the pwcs was not set. This probably means that
        # we're working out-of-context (and we need to keep the default wcs).
        if self.tls_session.triggered_pwcs_commit:
            if self.tls_session.pwcs is not None:
                self.tls_session.wcs = self.tls_session.pwcs
                self.tls_session.pwcs = None
            self.tls_session.triggered_pwcs_commit = False
 
        return hdr + frag + pay