lin
2025-07-30 fcd736bf35fd93b563e9bbf594f2aa7b62028cc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
## This file is part of Scapy
## Copyright (C) 2017 Maxence Tury
## This program is published under a GPLv2 license
 
"""
SSLv2 handshake fields & logic.
"""
 
import math
 
from scapy.error import log_runtime, warning
from scapy.fields import *
from scapy.packet import Packet, Raw, Padding
from scapy.layers.tls.cert import Cert, PrivKey, PubKey
from scapy.layers.tls.basefields import _tls_version, _TLSVersionField
from scapy.layers.tls.handshake import _CipherSuitesField
from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature
from scapy.layers.tls.session import (_GenericTLSSessionInheritance,
                                      readConnState, writeConnState)
from scapy.layers.tls.crypto.suites import (_tls_cipher_suites,
                                            _tls_cipher_suites_cls,
                                            _GenericCipherSuite,
                                            _GenericCipherSuiteMetaclass,
                                            get_usable_ciphersuites,
                                            SSL_CK_DES_192_EDE3_CBC_WITH_MD5)
 
 
###############################################################################
### Generic SSLv2 Handshake message                                         ###
###############################################################################
 
_sslv2_handshake_type = { 0: "error",                1: "client_hello",
                          2: "client_master_key",    3: "client_finished",
                          4: "server_hello",         5: "server_verify",
                          6: "server_finished",      7: "request_certificate",
                          8: "client_certificate" }
 
 
class _SSLv2Handshake(_GenericTLSSessionInheritance):
    """
    Inherited by other Handshake classes to get post_build().
    Also used as a fallback for unknown TLS Handshake packets.
    """
    name = "SSLv2 Handshake Generic message"
    fields_desc = [ ByteEnumField("msgtype", None, _sslv2_handshake_type) ]
 
    def guess_payload_class(self, p):
        return Padding
 
    def tls_session_update(self, msg_str):
        """
        Covers both post_build- and post_dissection- context updates.
        """
        self.tls_session.handshake_messages.append(msg_str)
        self.tls_session.handshake_messages_parsed.append(self)
 
 
###############################################################################
### Error                                                                   ###
###############################################################################
 
_tls_error_code = { 1: "no_cipher",         2: "no_certificate",
                    4: "bad_certificate",   6: "unsupported_certificate_type" }
 
class SSLv2Error(_SSLv2Handshake):
    """
    SSLv2 Error.
    """
    name = "SSLv2 Handshake - Error"
    fields_desc = [ ByteEnumField("msgtype", 0, _sslv2_handshake_type),
                    ShortEnumField("code", None, _tls_error_code) ]
 
 
###############################################################################
### ClientHello                                                             ###
###############################################################################
 
class _SSLv2CipherSuitesField(_CipherSuitesField):
    def __init__(self, name, default, dico, length_from=None):
        _CipherSuitesField.__init__(self, name, default, dico,
                                    length_from=length_from)
        self.itemfmt = b""
        self.itemsize = 3
 
    def i2m(self, pkt, val):
        if val is None:
            val2 = []
        val2 = [(x >> 16, x & 0x00ffff) for x in val]
        return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2])
 
    def m2i(self, pkt, m):
        res = []
        while m:
            res.append(struct.unpack("!I", b"\x00" + m[:3])[0])
            m = m[3:]
        return res
 
 
class SSLv2ClientHello(_SSLv2Handshake):
    """
    SSLv2 ClientHello.
    """
    name = "SSLv2 Handshake - Client Hello"
    fields_desc = [ ByteEnumField("msgtype", 1, _sslv2_handshake_type),
                    _TLSVersionField("version", 0x0002, _tls_version),
 
                    FieldLenField("cipherslen", None, fmt="!H",
                                  length_of="ciphers"),
                    FieldLenField("sidlen", None, fmt="!H",
                                  length_of="sid"),
                    FieldLenField("challengelen", None, fmt="!H",
                                  length_of="challenge"),
 
                    XStrLenField("sid", b"",
                                 length_from=lambda pkt:pkt.sidlen),
                    _SSLv2CipherSuitesField("ciphers",
                                      [SSL_CK_DES_192_EDE3_CBC_WITH_MD5],
                                      _tls_cipher_suites,
                                      length_from=lambda pkt: pkt.cipherslen),
                    XStrLenField("challenge", b"",
                                 length_from=lambda pkt:pkt.challengelen) ]
 
    def tls_session_update(self, msg_str):
        super(SSLv2ClientHello, self).tls_session_update(msg_str)
        self.tls_session.advertised_tls_version = self.version
        self.tls_session.sslv2_common_cs = self.ciphers
        self.tls_session.sslv2_challenge = self.challenge
 
 
###############################################################################
### ServerHello                                                             ###
###############################################################################
 
class _SSLv2CertDataField(StrLenField):
    def getfield(self, pkt, s):
        l = 0
        if self.length_from is not None:
            l = self.length_from(pkt)
        try:
            certdata = Cert(s[:l])
        except:
            certdata = s[:l]
        return s[l:], certdata
 
    def i2len(self, pkt, i):
        if isinstance(i, Cert):
            return len(i.der)
        return len(i)
 
    def i2m(self, pkt, i):
        if isinstance(i, Cert):
            return i.der
        return i
 
 
class SSLv2ServerHello(_SSLv2Handshake):
    """
    SSLv2 ServerHello.
    """
    name = "SSLv2 Handshake - Server Hello"
    fields_desc = [ ByteEnumField("msgtype", 4, _sslv2_handshake_type),
 
                    ByteField("sid_hit", 0),
                    ByteEnumField("certtype", 1, {1: "x509_cert"}),
                    _TLSVersionField("version", 0x0002, _tls_version),
 
                    FieldLenField("certlen", None, fmt="!H",
                                  length_of="cert"),
                    FieldLenField("cipherslen", None, fmt="!H",
                                  length_of="ciphers"),
                    FieldLenField("connection_idlen", None, fmt="!H",
                                  length_of="connection_id"),
 
                    _SSLv2CertDataField("cert", b"",
                                        length_from=lambda pkt: pkt.certlen),
                    _SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites,
                                length_from=lambda pkt: pkt.cipherslen),
                    XStrLenField("connection_id", b"",
                                length_from=lambda pkt: pkt.connection_idlen) ]
 
    def tls_session_update(self, msg_str):
        """
        XXX Something should be done about the session ID here.
        """
        super(SSLv2ServerHello, self).tls_session_update(msg_str)
 
        s = self.tls_session
        client_cs = s.sslv2_common_cs
        css = [cs for cs in client_cs if cs in self.ciphers]
        s.sslv2_common_cs = css
        s.sslv2_connection_id = self.connection_id
        s.tls_version = self.version
        if self.cert is not None:
            s.server_certs = [self.cert]
 
 
###############################################################################
### ClientMasterKey                                                         ###
###############################################################################
 
class _SSLv2CipherSuiteField(EnumField):
    def __init__(self, name, default, dico):
        EnumField.__init__(self, name, default, dico)
 
    def i2m(self, pkt, val):
        if val is None:
            return b""
        val2 = (val >> 16, val & 0x00ffff)
        return struct.pack(">BH", val2[0], val2[1])
 
    def addfield(self, pkt, s, val):
        return s + self.i2m(pkt, val)
 
    def m2i(self, pkt, m):
        return struct.unpack("!I", b"\x00" + m[:3])[0]
 
    def getfield(self, pkt, s):
        return s[3:], self.m2i(pkt, s)
 
class _SSLv2EncryptedKeyField(XStrLenField):
    def i2repr(self, pkt, x):
        s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x)
        if pkt.decryptedkey is not None:
            dx = pkt.decryptedkey
            ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx)
            s += "    [decryptedkey= %s]" % ds
        return s
 
class SSLv2ClientMasterKey(_SSLv2Handshake):
    """
    SSLv2 ClientMasterKey.
    """
    __slots__ = ["decryptedkey"]
    name = "SSLv2 Handshake - Client Master Key"
    fields_desc = [ ByteEnumField("msgtype", 2, _sslv2_handshake_type),
                    _SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites),
 
                    FieldLenField("clearkeylen", None, fmt="!H",
                                  length_of="clearkey"),
                    FieldLenField("encryptedkeylen", None, fmt="!H",
                                  length_of="encryptedkey"),
                    FieldLenField("keyarglen", None, fmt="!H",
                                  length_of="keyarg"),
 
                    XStrLenField("clearkey", "",
                                length_from=lambda pkt: pkt.clearkeylen),
                    _SSLv2EncryptedKeyField("encryptedkey", "",
                                length_from=lambda pkt: pkt.encryptedkeylen),
                    XStrLenField("keyarg", "",
                                length_from=lambda pkt: pkt.keyarglen) ]
 
    def __init__(self, *args, **kargs):
        """
        When post_building, the packets fields are updated (this is somewhat
        non-standard). We might need these fields later, but calling __str__
        on a new packet (i.e. not dissected from a raw string) applies
        post_build to an object different from the original one... unless
        we hackishly always set self.explicit to 1.
        """
        if "decryptedkey" in kargs:
            self.decryptedkey = kargs["decryptedkey"]
            del kargs["decryptedkey"]
        else:
            self.decryptedkey = b""
        super(SSLv2ClientMasterKey, self).__init__(*args, **kargs)
        self.explicit = 1
 
    def pre_dissect(self, s):
        clearkeylen = struct.unpack("!H", s[4:6])[0]
        encryptedkeylen = struct.unpack("!H", s[6:8])[0]
        encryptedkeystart = 10 + clearkeylen
        encryptedkey = s[encryptedkeystart:encryptedkeystart+encryptedkeylen]
        if self.tls_session.server_rsa_key:
            self.decryptedkey = \
                    self.tls_session.server_rsa_key.decrypt(encryptedkey)
        else:
            self.decryptedkey = None
        return s
 
    def post_build(self, pkt, pay):
        cs_val = None
        if self.cipher is None:
            common_cs = self.tls_session.sslv2_common_cs
            cs_vals = get_usable_ciphersuites(common_cs, "SSLv2")
            if len(cs_vals) == 0:
                warning("No known common cipher suite between SSLv2 Hellos.")
                cs_val = 0x0700c0
                cipher = b"\x07\x00\xc0"
            else:
                cs_val = cs_vals[0]         #XXX choose the best one
                cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff)
            cs_cls = _tls_cipher_suites_cls[cs_val]
            self.cipher = cs_val
        else:
            cipher = pkt[1:4]
            cs_val = struct.unpack("!I", b"\x00" + cipher)[0]
            if cs_val not in _tls_cipher_suites_cls:
                warning("Unknown ciphersuite %d from ClientMasterKey" % cs_val)
                cs_cls = None
            else:
                cs_cls = _tls_cipher_suites_cls[cs_val]
 
        if cs_cls:
            if (self.encryptedkey == b"" and
                len(self.tls_session.server_certs) > 0):
                # else, the user is responsible for export slicing & encryption
                key = randstring(cs_cls.cipher_alg.key_len)
 
                if self.clearkey == b"" and cs_cls.kx_alg.export:
                    self.clearkey = key[:-5]
 
                if self.decryptedkey == b"":
                    if cs_cls.kx_alg.export:
                        self.decryptedkey = key[-5:]
                    else:
                        self.decryptedkey = key
 
                pubkey = self.tls_session.server_certs[0].pubKey
                self.encryptedkey = pubkey.encrypt(self.decryptedkey)
 
            if self.keyarg == b"" and cs_cls.cipher_alg.type == "block":
                self.keyarg = randstring(cs_cls.cipher_alg.block_size)
 
        clearkey = self.clearkey or b""
        if self.clearkeylen is None:
            self.clearkeylen = len(clearkey)
        clearkeylen = struct.pack("!H", self.clearkeylen)
 
        encryptedkey = self.encryptedkey or b""
        if self.encryptedkeylen is None:
            self.encryptedkeylen = len(encryptedkey)
        encryptedkeylen = struct.pack("!H", self.encryptedkeylen)
 
        keyarg = self.keyarg or b""
        if self.keyarglen is None:
            self.keyarglen = len(keyarg)
        keyarglen = struct.pack("!H", self.keyarglen)
 
        s = (chb(pkt[0]) + cipher
             + clearkeylen + encryptedkeylen + keyarglen
             + clearkey + encryptedkey + keyarg)
        return s + pay
 
    def tls_session_update(self, msg_str):
        super(SSLv2ClientMasterKey, self).tls_session_update(msg_str)
 
        s = self.tls_session
        cs_val = self.cipher
        if cs_val not in _tls_cipher_suites_cls:
            warning("Unknown cipher suite %d from ClientMasterKey" % cs_val)
            cs_cls = None
        else:
            cs_cls = _tls_cipher_suites_cls[cs_val]
 
        tls_version = s.tls_version or 0x0002
        connection_end = s.connection_end
        wcs_seq_num = s.wcs.seq_num
        s.pwcs = writeConnState(ciphersuite=cs_cls,
                                            connection_end=connection_end,
                                            seq_num=wcs_seq_num,
                                            tls_version=tls_version)
        rcs_seq_num = s.rcs.seq_num
        s.prcs = readConnState(ciphersuite=cs_cls,
                                           connection_end=connection_end,
                                           seq_num=rcs_seq_num,
                                           tls_version=tls_version)
 
        if self.decryptedkey is not None:
            s.master_secret = self.clearkey + self.decryptedkey
            s.compute_sslv2_km_and_derive_keys()
 
            if s.pwcs.cipher.type == "block":
                s.pwcs.cipher.iv = self.keyarg
            if s.prcs.cipher.type == "block":
                s.prcs.cipher.iv = self.keyarg
 
            s.triggered_prcs_commit = True
            s.triggered_pwcs_commit = True
 
 
###############################################################################
### ServerVerify                                                            ###
###############################################################################
 
class SSLv2ServerVerify(_SSLv2Handshake):
    """
    In order to parse a ServerVerify, the exact message string should be
    fed to the class. This is how SSLv2 defines the challenge length...
    """
    name = "SSLv2 Handshake - Server Verify"
    fields_desc = [ ByteEnumField("msgtype", 5, _sslv2_handshake_type),
                    XStrField("challenge", "") ]
 
    def build(self, *args, **kargs):
        fval = self.getfieldval("challenge")
        if fval is None:
            self.challenge = self.tls_session.sslv2_challenge
        return super(SSLv2ServerVerify, self).build(*args, **kargs)
 
    def post_dissection(self, pkt):
        s = self.tls_session
        if s.sslv2_challenge is not None:
            if self.challenge != s.sslv2_challenge:
                pkt_info = pkt.firstlayer().summary()
                log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info)
 
 
###############################################################################
### RequestCertificate                                                      ###
###############################################################################
 
class SSLv2RequestCertificate(_SSLv2Handshake):
    """
    In order to parse a RequestCertificate, the exact message string should be
    fed to the class. This is how SSLv2 defines the challenge length...
    """
    name = "SSLv2 Handshake - Request Certificate"
    fields_desc = [ ByteEnumField("msgtype", 7, _sslv2_handshake_type),
                    ByteEnumField("authtype", 1, {1: "md5_with_rsa"}),
                    XStrField("challenge", "") ]
 
    def tls_session_update(self, msg_str):
        super(SSLv2RequestCertificate, self).tls_session_update(msg_str)
        self.tls_session.sslv2_challenge_clientcert = self.challenge
 
 
###############################################################################
### ClientCertificate                                                       ###
###############################################################################
 
class SSLv2ClientCertificate(_SSLv2Handshake):
    """
    SSLv2 ClientCertificate.
    """
    name = "SSLv2 Handshake - Client Certificate"
    fields_desc = [ ByteEnumField("msgtype", 8, _sslv2_handshake_type),
 
                    ByteEnumField("certtype", 1, {1: "x509_cert"}),
                    FieldLenField("certlen", None, fmt="!H",
                                  length_of="certdata"),
                    FieldLenField("responselen", None, fmt="!H",
                                  length_of="responsedata"),
 
                    _SSLv2CertDataField("certdata", b"",
                                      length_from=lambda pkt: pkt.certlen),
                    _TLSSignatureField("responsedata", None,
                                length_from=lambda pkt: pkt.responselen) ]
 
    def build(self, *args, **kargs):
        s = self.tls_session
        sig = self.getfieldval("responsedata")
        test = (sig is None and
                s.sslv2_key_material is not None and
                s.sslv2_challenge_clientcert is not None and
                len(s.server_certs) > 0)
        if test:
            s = self.tls_session
            m = (s.sslv2_key_material +
                 s.sslv2_challenge_clientcert +
                 s.server_certs[0].der)
            self.responsedata = _TLSSignature(tls_session=s)
            self.responsedata._update_sig(m, s.client_key)
        else:
            self.responsedata = b""
        return super(SSLv2ClientCertificate, self).build(*args, **kargs)
 
    def post_dissection_tls_session_update(self, msg_str):
        self.tls_session_update(msg_str)
 
        s = self.tls_session
        test = (len(s.client_certs) > 0 and
                s.sslv2_key_material is not None and
                s.sslv2_challenge_clientcert is not None and
                len(s.server_certs) > 0)
        if test:
            m = (s.sslv2_key_material +
                 s.sslv2_challenge_clientcert +
                 s.server_certs[0].der)
            sig_test = self.responsedata._verify_sig(m, s.client_certs[0])
            if not sig_test:
                pkt_info = self.firstlayer().summary()
                log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info)
 
    def tls_session_update(self, msg_str):
        super(SSLv2ClientCertificate, self).tls_session_update(msg_str)
        if self.certdata:
            self.tls_session.client_certs = [self.certdata]
 
 
###############################################################################
### Finished                                                                ###
###############################################################################
 
class SSLv2ClientFinished(_SSLv2Handshake):
    """
    In order to parse a ClientFinished, the exact message string should be fed
    to the class. SSLv2 does not offer any other way to know the c_id length.
    """
    name = "SSLv2 Handshake - Client Finished"
    fields_desc = [ ByteEnumField("msgtype", 3, _sslv2_handshake_type),
                    XStrField("connection_id", "") ]
 
    def build(self, *args, **kargs):
        fval = self.getfieldval("connection_id")
        if fval == b"":
            self.connection_id = self.tls_session.sslv2_connection_id
        return super(SSLv2ClientFinished, self).build(*args, **kargs)
 
    def post_dissection(self, pkt):
        s = self.tls_session
        if s.sslv2_connection_id is not None:
            if self.connection_id != s.sslv2_connection_id:
                pkt_info = pkt.firstlayer().summary()
                log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info)
 
 
class SSLv2ServerFinished(_SSLv2Handshake):
    """
    In order to parse a ServerFinished, the exact message string should be fed
    to the class. SSLv2 does not offer any other way to know the sid length.
    """
    name = "SSLv2 Handshake - Server Finished"
    fields_desc = [ ByteEnumField("msgtype", 6, _sslv2_handshake_type),
                    XStrField("sid", "") ]
 
    def build(self, *args, **kargs):
        fval = self.getfieldval("sid")
        if fval == b"":
            self.sid = self.tls_session.sid
        return super(SSLv2ServerFinished, self).build(*args, **kargs)
 
    def post_dissection_tls_session_update(self, msg_str):
        self.tls_session_update(msg_str)
        self.tls_session.sid = self.sid
 
 
###############################################################################
### All handshake messages defined in this module                           ###
###############################################################################
 
_sslv2_handshake_cls = { 0: SSLv2Error,             1: SSLv2ClientHello,
                         2: SSLv2ClientMasterKey,   3: SSLv2ClientFinished,
                         4: SSLv2ServerHello,       5: SSLv2ServerVerify,
                         6: SSLv2ServerFinished,    7: SSLv2RequestCertificate,
                         8: SSLv2ClientCertificate }