## This file is part of Scapy
|
## Copyright (C) 2017 Maxence Tury
|
## This program is published under a GPLv2 license
|
|
"""
|
SSLv2 handshake fields & logic.
|
"""
|
|
import math
|
|
from scapy.error import log_runtime, warning
|
from scapy.fields import *
|
from scapy.packet import Packet, Raw, Padding
|
from scapy.layers.tls.cert import Cert, PrivKey, PubKey
|
from scapy.layers.tls.basefields import _tls_version, _TLSVersionField
|
from scapy.layers.tls.handshake import _CipherSuitesField
|
from scapy.layers.tls.keyexchange import _TLSSignatureField, _TLSSignature
|
from scapy.layers.tls.session import (_GenericTLSSessionInheritance,
|
readConnState, writeConnState)
|
from scapy.layers.tls.crypto.suites import (_tls_cipher_suites,
|
_tls_cipher_suites_cls,
|
_GenericCipherSuite,
|
_GenericCipherSuiteMetaclass,
|
get_usable_ciphersuites,
|
SSL_CK_DES_192_EDE3_CBC_WITH_MD5)
|
|
|
###############################################################################
|
### Generic SSLv2 Handshake message ###
|
###############################################################################
|
|
_sslv2_handshake_type = { 0: "error", 1: "client_hello",
|
2: "client_master_key", 3: "client_finished",
|
4: "server_hello", 5: "server_verify",
|
6: "server_finished", 7: "request_certificate",
|
8: "client_certificate" }
|
|
|
class _SSLv2Handshake(_GenericTLSSessionInheritance):
|
"""
|
Inherited by other Handshake classes to get post_build().
|
Also used as a fallback for unknown TLS Handshake packets.
|
"""
|
name = "SSLv2 Handshake Generic message"
|
fields_desc = [ ByteEnumField("msgtype", None, _sslv2_handshake_type) ]
|
|
def guess_payload_class(self, p):
|
return Padding
|
|
def tls_session_update(self, msg_str):
|
"""
|
Covers both post_build- and post_dissection- context updates.
|
"""
|
self.tls_session.handshake_messages.append(msg_str)
|
self.tls_session.handshake_messages_parsed.append(self)
|
|
|
###############################################################################
|
### Error ###
|
###############################################################################
|
|
_tls_error_code = { 1: "no_cipher", 2: "no_certificate",
|
4: "bad_certificate", 6: "unsupported_certificate_type" }
|
|
class SSLv2Error(_SSLv2Handshake):
|
"""
|
SSLv2 Error.
|
"""
|
name = "SSLv2 Handshake - Error"
|
fields_desc = [ ByteEnumField("msgtype", 0, _sslv2_handshake_type),
|
ShortEnumField("code", None, _tls_error_code) ]
|
|
|
###############################################################################
|
### ClientHello ###
|
###############################################################################
|
|
class _SSLv2CipherSuitesField(_CipherSuitesField):
|
def __init__(self, name, default, dico, length_from=None):
|
_CipherSuitesField.__init__(self, name, default, dico,
|
length_from=length_from)
|
self.itemfmt = b""
|
self.itemsize = 3
|
|
def i2m(self, pkt, val):
|
if val is None:
|
val2 = []
|
val2 = [(x >> 16, x & 0x00ffff) for x in val]
|
return b"".join([struct.pack(">BH", x[0], x[1]) for x in val2])
|
|
def m2i(self, pkt, m):
|
res = []
|
while m:
|
res.append(struct.unpack("!I", b"\x00" + m[:3])[0])
|
m = m[3:]
|
return res
|
|
|
class SSLv2ClientHello(_SSLv2Handshake):
|
"""
|
SSLv2 ClientHello.
|
"""
|
name = "SSLv2 Handshake - Client Hello"
|
fields_desc = [ ByteEnumField("msgtype", 1, _sslv2_handshake_type),
|
_TLSVersionField("version", 0x0002, _tls_version),
|
|
FieldLenField("cipherslen", None, fmt="!H",
|
length_of="ciphers"),
|
FieldLenField("sidlen", None, fmt="!H",
|
length_of="sid"),
|
FieldLenField("challengelen", None, fmt="!H",
|
length_of="challenge"),
|
|
XStrLenField("sid", b"",
|
length_from=lambda pkt:pkt.sidlen),
|
_SSLv2CipherSuitesField("ciphers",
|
[SSL_CK_DES_192_EDE3_CBC_WITH_MD5],
|
_tls_cipher_suites,
|
length_from=lambda pkt: pkt.cipherslen),
|
XStrLenField("challenge", b"",
|
length_from=lambda pkt:pkt.challengelen) ]
|
|
def tls_session_update(self, msg_str):
|
super(SSLv2ClientHello, self).tls_session_update(msg_str)
|
self.tls_session.advertised_tls_version = self.version
|
self.tls_session.sslv2_common_cs = self.ciphers
|
self.tls_session.sslv2_challenge = self.challenge
|
|
|
###############################################################################
|
### ServerHello ###
|
###############################################################################
|
|
class _SSLv2CertDataField(StrLenField):
|
def getfield(self, pkt, s):
|
l = 0
|
if self.length_from is not None:
|
l = self.length_from(pkt)
|
try:
|
certdata = Cert(s[:l])
|
except:
|
certdata = s[:l]
|
return s[l:], certdata
|
|
def i2len(self, pkt, i):
|
if isinstance(i, Cert):
|
return len(i.der)
|
return len(i)
|
|
def i2m(self, pkt, i):
|
if isinstance(i, Cert):
|
return i.der
|
return i
|
|
|
class SSLv2ServerHello(_SSLv2Handshake):
|
"""
|
SSLv2 ServerHello.
|
"""
|
name = "SSLv2 Handshake - Server Hello"
|
fields_desc = [ ByteEnumField("msgtype", 4, _sslv2_handshake_type),
|
|
ByteField("sid_hit", 0),
|
ByteEnumField("certtype", 1, {1: "x509_cert"}),
|
_TLSVersionField("version", 0x0002, _tls_version),
|
|
FieldLenField("certlen", None, fmt="!H",
|
length_of="cert"),
|
FieldLenField("cipherslen", None, fmt="!H",
|
length_of="ciphers"),
|
FieldLenField("connection_idlen", None, fmt="!H",
|
length_of="connection_id"),
|
|
_SSLv2CertDataField("cert", b"",
|
length_from=lambda pkt: pkt.certlen),
|
_SSLv2CipherSuitesField("ciphers", [], _tls_cipher_suites,
|
length_from=lambda pkt: pkt.cipherslen),
|
XStrLenField("connection_id", b"",
|
length_from=lambda pkt: pkt.connection_idlen) ]
|
|
def tls_session_update(self, msg_str):
|
"""
|
XXX Something should be done about the session ID here.
|
"""
|
super(SSLv2ServerHello, self).tls_session_update(msg_str)
|
|
s = self.tls_session
|
client_cs = s.sslv2_common_cs
|
css = [cs for cs in client_cs if cs in self.ciphers]
|
s.sslv2_common_cs = css
|
s.sslv2_connection_id = self.connection_id
|
s.tls_version = self.version
|
if self.cert is not None:
|
s.server_certs = [self.cert]
|
|
|
###############################################################################
|
### ClientMasterKey ###
|
###############################################################################
|
|
class _SSLv2CipherSuiteField(EnumField):
|
def __init__(self, name, default, dico):
|
EnumField.__init__(self, name, default, dico)
|
|
def i2m(self, pkt, val):
|
if val is None:
|
return b""
|
val2 = (val >> 16, val & 0x00ffff)
|
return struct.pack(">BH", val2[0], val2[1])
|
|
def addfield(self, pkt, s, val):
|
return s + self.i2m(pkt, val)
|
|
def m2i(self, pkt, m):
|
return struct.unpack("!I", b"\x00" + m[:3])[0]
|
|
def getfield(self, pkt, s):
|
return s[3:], self.m2i(pkt, s)
|
|
class _SSLv2EncryptedKeyField(XStrLenField):
|
def i2repr(self, pkt, x):
|
s = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, x)
|
if pkt.decryptedkey is not None:
|
dx = pkt.decryptedkey
|
ds = super(_SSLv2EncryptedKeyField, self).i2repr(pkt, dx)
|
s += " [decryptedkey= %s]" % ds
|
return s
|
|
class SSLv2ClientMasterKey(_SSLv2Handshake):
|
"""
|
SSLv2 ClientMasterKey.
|
"""
|
__slots__ = ["decryptedkey"]
|
name = "SSLv2 Handshake - Client Master Key"
|
fields_desc = [ ByteEnumField("msgtype", 2, _sslv2_handshake_type),
|
_SSLv2CipherSuiteField("cipher", None, _tls_cipher_suites),
|
|
FieldLenField("clearkeylen", None, fmt="!H",
|
length_of="clearkey"),
|
FieldLenField("encryptedkeylen", None, fmt="!H",
|
length_of="encryptedkey"),
|
FieldLenField("keyarglen", None, fmt="!H",
|
length_of="keyarg"),
|
|
XStrLenField("clearkey", "",
|
length_from=lambda pkt: pkt.clearkeylen),
|
_SSLv2EncryptedKeyField("encryptedkey", "",
|
length_from=lambda pkt: pkt.encryptedkeylen),
|
XStrLenField("keyarg", "",
|
length_from=lambda pkt: pkt.keyarglen) ]
|
|
def __init__(self, *args, **kargs):
|
"""
|
When post_building, the packets fields are updated (this is somewhat
|
non-standard). We might need these fields later, but calling __str__
|
on a new packet (i.e. not dissected from a raw string) applies
|
post_build to an object different from the original one... unless
|
we hackishly always set self.explicit to 1.
|
"""
|
if "decryptedkey" in kargs:
|
self.decryptedkey = kargs["decryptedkey"]
|
del kargs["decryptedkey"]
|
else:
|
self.decryptedkey = b""
|
super(SSLv2ClientMasterKey, self).__init__(*args, **kargs)
|
self.explicit = 1
|
|
def pre_dissect(self, s):
|
clearkeylen = struct.unpack("!H", s[4:6])[0]
|
encryptedkeylen = struct.unpack("!H", s[6:8])[0]
|
encryptedkeystart = 10 + clearkeylen
|
encryptedkey = s[encryptedkeystart:encryptedkeystart+encryptedkeylen]
|
if self.tls_session.server_rsa_key:
|
self.decryptedkey = \
|
self.tls_session.server_rsa_key.decrypt(encryptedkey)
|
else:
|
self.decryptedkey = None
|
return s
|
|
def post_build(self, pkt, pay):
|
cs_val = None
|
if self.cipher is None:
|
common_cs = self.tls_session.sslv2_common_cs
|
cs_vals = get_usable_ciphersuites(common_cs, "SSLv2")
|
if len(cs_vals) == 0:
|
warning("No known common cipher suite between SSLv2 Hellos.")
|
cs_val = 0x0700c0
|
cipher = b"\x07\x00\xc0"
|
else:
|
cs_val = cs_vals[0] #XXX choose the best one
|
cipher = struct.pack(">BH", cs_val >> 16, cs_val & 0x00ffff)
|
cs_cls = _tls_cipher_suites_cls[cs_val]
|
self.cipher = cs_val
|
else:
|
cipher = pkt[1:4]
|
cs_val = struct.unpack("!I", b"\x00" + cipher)[0]
|
if cs_val not in _tls_cipher_suites_cls:
|
warning("Unknown ciphersuite %d from ClientMasterKey" % cs_val)
|
cs_cls = None
|
else:
|
cs_cls = _tls_cipher_suites_cls[cs_val]
|
|
if cs_cls:
|
if (self.encryptedkey == b"" and
|
len(self.tls_session.server_certs) > 0):
|
# else, the user is responsible for export slicing & encryption
|
key = randstring(cs_cls.cipher_alg.key_len)
|
|
if self.clearkey == b"" and cs_cls.kx_alg.export:
|
self.clearkey = key[:-5]
|
|
if self.decryptedkey == b"":
|
if cs_cls.kx_alg.export:
|
self.decryptedkey = key[-5:]
|
else:
|
self.decryptedkey = key
|
|
pubkey = self.tls_session.server_certs[0].pubKey
|
self.encryptedkey = pubkey.encrypt(self.decryptedkey)
|
|
if self.keyarg == b"" and cs_cls.cipher_alg.type == "block":
|
self.keyarg = randstring(cs_cls.cipher_alg.block_size)
|
|
clearkey = self.clearkey or b""
|
if self.clearkeylen is None:
|
self.clearkeylen = len(clearkey)
|
clearkeylen = struct.pack("!H", self.clearkeylen)
|
|
encryptedkey = self.encryptedkey or b""
|
if self.encryptedkeylen is None:
|
self.encryptedkeylen = len(encryptedkey)
|
encryptedkeylen = struct.pack("!H", self.encryptedkeylen)
|
|
keyarg = self.keyarg or b""
|
if self.keyarglen is None:
|
self.keyarglen = len(keyarg)
|
keyarglen = struct.pack("!H", self.keyarglen)
|
|
s = (chb(pkt[0]) + cipher
|
+ clearkeylen + encryptedkeylen + keyarglen
|
+ clearkey + encryptedkey + keyarg)
|
return s + pay
|
|
def tls_session_update(self, msg_str):
|
super(SSLv2ClientMasterKey, self).tls_session_update(msg_str)
|
|
s = self.tls_session
|
cs_val = self.cipher
|
if cs_val not in _tls_cipher_suites_cls:
|
warning("Unknown cipher suite %d from ClientMasterKey" % cs_val)
|
cs_cls = None
|
else:
|
cs_cls = _tls_cipher_suites_cls[cs_val]
|
|
tls_version = s.tls_version or 0x0002
|
connection_end = s.connection_end
|
wcs_seq_num = s.wcs.seq_num
|
s.pwcs = writeConnState(ciphersuite=cs_cls,
|
connection_end=connection_end,
|
seq_num=wcs_seq_num,
|
tls_version=tls_version)
|
rcs_seq_num = s.rcs.seq_num
|
s.prcs = readConnState(ciphersuite=cs_cls,
|
connection_end=connection_end,
|
seq_num=rcs_seq_num,
|
tls_version=tls_version)
|
|
if self.decryptedkey is not None:
|
s.master_secret = self.clearkey + self.decryptedkey
|
s.compute_sslv2_km_and_derive_keys()
|
|
if s.pwcs.cipher.type == "block":
|
s.pwcs.cipher.iv = self.keyarg
|
if s.prcs.cipher.type == "block":
|
s.prcs.cipher.iv = self.keyarg
|
|
s.triggered_prcs_commit = True
|
s.triggered_pwcs_commit = True
|
|
|
###############################################################################
|
### ServerVerify ###
|
###############################################################################
|
|
class SSLv2ServerVerify(_SSLv2Handshake):
|
"""
|
In order to parse a ServerVerify, the exact message string should be
|
fed to the class. This is how SSLv2 defines the challenge length...
|
"""
|
name = "SSLv2 Handshake - Server Verify"
|
fields_desc = [ ByteEnumField("msgtype", 5, _sslv2_handshake_type),
|
XStrField("challenge", "") ]
|
|
def build(self, *args, **kargs):
|
fval = self.getfieldval("challenge")
|
if fval is None:
|
self.challenge = self.tls_session.sslv2_challenge
|
return super(SSLv2ServerVerify, self).build(*args, **kargs)
|
|
def post_dissection(self, pkt):
|
s = self.tls_session
|
if s.sslv2_challenge is not None:
|
if self.challenge != s.sslv2_challenge:
|
pkt_info = pkt.firstlayer().summary()
|
log_runtime.info("TLS: invalid ServerVerify received [%s]", pkt_info)
|
|
|
###############################################################################
|
### RequestCertificate ###
|
###############################################################################
|
|
class SSLv2RequestCertificate(_SSLv2Handshake):
|
"""
|
In order to parse a RequestCertificate, the exact message string should be
|
fed to the class. This is how SSLv2 defines the challenge length...
|
"""
|
name = "SSLv2 Handshake - Request Certificate"
|
fields_desc = [ ByteEnumField("msgtype", 7, _sslv2_handshake_type),
|
ByteEnumField("authtype", 1, {1: "md5_with_rsa"}),
|
XStrField("challenge", "") ]
|
|
def tls_session_update(self, msg_str):
|
super(SSLv2RequestCertificate, self).tls_session_update(msg_str)
|
self.tls_session.sslv2_challenge_clientcert = self.challenge
|
|
|
###############################################################################
|
### ClientCertificate ###
|
###############################################################################
|
|
class SSLv2ClientCertificate(_SSLv2Handshake):
|
"""
|
SSLv2 ClientCertificate.
|
"""
|
name = "SSLv2 Handshake - Client Certificate"
|
fields_desc = [ ByteEnumField("msgtype", 8, _sslv2_handshake_type),
|
|
ByteEnumField("certtype", 1, {1: "x509_cert"}),
|
FieldLenField("certlen", None, fmt="!H",
|
length_of="certdata"),
|
FieldLenField("responselen", None, fmt="!H",
|
length_of="responsedata"),
|
|
_SSLv2CertDataField("certdata", b"",
|
length_from=lambda pkt: pkt.certlen),
|
_TLSSignatureField("responsedata", None,
|
length_from=lambda pkt: pkt.responselen) ]
|
|
def build(self, *args, **kargs):
|
s = self.tls_session
|
sig = self.getfieldval("responsedata")
|
test = (sig is None and
|
s.sslv2_key_material is not None and
|
s.sslv2_challenge_clientcert is not None and
|
len(s.server_certs) > 0)
|
if test:
|
s = self.tls_session
|
m = (s.sslv2_key_material +
|
s.sslv2_challenge_clientcert +
|
s.server_certs[0].der)
|
self.responsedata = _TLSSignature(tls_session=s)
|
self.responsedata._update_sig(m, s.client_key)
|
else:
|
self.responsedata = b""
|
return super(SSLv2ClientCertificate, self).build(*args, **kargs)
|
|
def post_dissection_tls_session_update(self, msg_str):
|
self.tls_session_update(msg_str)
|
|
s = self.tls_session
|
test = (len(s.client_certs) > 0 and
|
s.sslv2_key_material is not None and
|
s.sslv2_challenge_clientcert is not None and
|
len(s.server_certs) > 0)
|
if test:
|
m = (s.sslv2_key_material +
|
s.sslv2_challenge_clientcert +
|
s.server_certs[0].der)
|
sig_test = self.responsedata._verify_sig(m, s.client_certs[0])
|
if not sig_test:
|
pkt_info = self.firstlayer().summary()
|
log_runtime.info("TLS: invalid client CertificateVerify signature [%s]", pkt_info)
|
|
def tls_session_update(self, msg_str):
|
super(SSLv2ClientCertificate, self).tls_session_update(msg_str)
|
if self.certdata:
|
self.tls_session.client_certs = [self.certdata]
|
|
|
###############################################################################
|
### Finished ###
|
###############################################################################
|
|
class SSLv2ClientFinished(_SSLv2Handshake):
|
"""
|
In order to parse a ClientFinished, the exact message string should be fed
|
to the class. SSLv2 does not offer any other way to know the c_id length.
|
"""
|
name = "SSLv2 Handshake - Client Finished"
|
fields_desc = [ ByteEnumField("msgtype", 3, _sslv2_handshake_type),
|
XStrField("connection_id", "") ]
|
|
def build(self, *args, **kargs):
|
fval = self.getfieldval("connection_id")
|
if fval == b"":
|
self.connection_id = self.tls_session.sslv2_connection_id
|
return super(SSLv2ClientFinished, self).build(*args, **kargs)
|
|
def post_dissection(self, pkt):
|
s = self.tls_session
|
if s.sslv2_connection_id is not None:
|
if self.connection_id != s.sslv2_connection_id:
|
pkt_info = pkt.firstlayer().summary()
|
log_runtime.info("TLS: invalid client Finished received [%s]", pkt_info)
|
|
|
class SSLv2ServerFinished(_SSLv2Handshake):
|
"""
|
In order to parse a ServerFinished, the exact message string should be fed
|
to the class. SSLv2 does not offer any other way to know the sid length.
|
"""
|
name = "SSLv2 Handshake - Server Finished"
|
fields_desc = [ ByteEnumField("msgtype", 6, _sslv2_handshake_type),
|
XStrField("sid", "") ]
|
|
def build(self, *args, **kargs):
|
fval = self.getfieldval("sid")
|
if fval == b"":
|
self.sid = self.tls_session.sid
|
return super(SSLv2ServerFinished, self).build(*args, **kargs)
|
|
def post_dissection_tls_session_update(self, msg_str):
|
self.tls_session_update(msg_str)
|
self.tls_session.sid = self.sid
|
|
|
###############################################################################
|
### All handshake messages defined in this module ###
|
###############################################################################
|
|
_sslv2_handshake_cls = { 0: SSLv2Error, 1: SSLv2ClientHello,
|
2: SSLv2ClientMasterKey, 3: SSLv2ClientFinished,
|
4: SSLv2ServerHello, 5: SSLv2ServerVerify,
|
6: SSLv2ServerFinished, 7: SSLv2RequestCertificate,
|
8: SSLv2ClientCertificate }
|