## This file is part of Scapy
|
## Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
|
## 2015, 2016, 2017 Maxence Tury
|
## This program is published under a GPLv2 license
|
|
"""
|
TLS client automaton. This makes for a primitive TLS stack.
|
Obviously you need rights for network access.
|
|
We support versions SSLv2 to TLS 1.2, along with many features.
|
There is no session resumption mechanism for now.
|
|
In order to run a client to tcp/50000 with one cipher suite of your choice:
|
> from scapy.all import *
|
> ch = TLSClientHello(ciphers=<int code of the cipher suite>)
|
> t = TLSClientAutomaton(dport=50000, client_hello=ch)
|
> t.run()
|
"""
|
|
from __future__ import print_function
|
import socket
|
|
from scapy.pton_ntop import inet_pton
|
from scapy.utils import randstring
|
from scapy.automaton import ATMT
|
from scapy.layers.tls.automaton import _TLSAutomaton
|
from scapy.layers.tls.basefields import _tls_version, _tls_version_options
|
from scapy.layers.tls.session import tlsSession
|
from scapy.layers.tls.extensions import (TLS_Ext_SupportedGroups,
|
TLS_Ext_SupportedVersions,
|
TLS_Ext_SignatureAlgorithms,
|
TLS_Ext_ServerName, ServerName)
|
from scapy.layers.tls.handshake import *
|
from scapy.layers.tls.handshake_sslv2 import *
|
from scapy.layers.tls.keyexchange_tls13 import (TLS_Ext_KeyShare_CH,
|
KeyShareEntry)
|
from scapy.layers.tls.record import (TLS, TLSAlert, TLSChangeCipherSpec,
|
TLSApplicationData)
|
from scapy.modules import six
|
|
|
class TLSClientAutomaton(_TLSAutomaton):
|
"""
|
A simple TLS test client automaton. Try to overload some states or
|
conditions and see what happens on the other side.
|
|
Rather than with an interruption, the best way to stop this client is by
|
typing 'quit'. This won't be a message sent to the server.
|
|
_'mycert' and 'mykey' may be provided as filenames. They will be used in
|
the handshake, should the server ask for client authentication.
|
_'server_name' does not need to be set.
|
_'client_hello' may hold a TLSClientHello or SSLv2ClientHello to be sent
|
to the server. This is particularly useful for extensions tweaking.
|
_'version' is a quicker way to advertise a protocol version ("sslv2",
|
"tls1", "tls12", etc.) It may be overriden by the previous 'client_hello'.
|
_'data' is a list of raw data to be sent to the server once the handshake
|
has been completed. Both 'stop_server' and 'quit' will work this way.
|
"""
|
|
def parse_args(self, server="127.0.0.1", dport=4433, server_name=None,
|
mycert=None, mykey=None,
|
client_hello=None, version=None,
|
data=None,
|
**kargs):
|
|
super(TLSClientAutomaton, self).parse_args(mycert=mycert,
|
mykey=mykey,
|
**kargs)
|
tmp = socket.getaddrinfo(server, dport)
|
self.remote_name = None
|
try:
|
if ':' in server:
|
inet_pton(socket.AF_INET6, server)
|
else:
|
inet_pton(socket.AF_INET, server)
|
except:
|
self.remote_name = socket.getfqdn(server)
|
if self.remote_name != server:
|
tmp = socket.getaddrinfo(self.remote_name, dport)
|
|
if server_name:
|
self.remote_name = server_name
|
self.remote_family = tmp[0][0]
|
self.remote_ip = tmp[0][4][0]
|
self.remote_port = dport
|
self.local_ip = None
|
self.local_port = None
|
self.socket = None
|
|
self.client_hello = client_hello
|
self.advertised_tls_version = None
|
if version:
|
v = _tls_version_options.get(version, None)
|
if not v:
|
self.vprint("Unrecognized TLS version option.")
|
else:
|
self.advertised_tls_version = v
|
|
self.linebreak = False
|
if isinstance(data, bytes):
|
self.data_to_send = [data]
|
elif isinstance(data, six.string_types):
|
self.data_to_send = [raw(data)]
|
elif isinstance(data, list):
|
self.data_to_send = list(raw(d) for d in reversed(data))
|
else:
|
self.data_to_send = []
|
|
|
def vprint_sessioninfo(self):
|
if self.verbose:
|
s = self.cur_session
|
v = _tls_version[s.tls_version]
|
self.vprint("Version : %s" % v)
|
cs = s.wcs.ciphersuite.name
|
self.vprint("Cipher suite : %s" % cs)
|
if s.tls_version >= 0x0304:
|
ms = s.tls13_master_secret
|
else:
|
ms = s.master_secret
|
self.vprint("Master secret : %s" % repr_hex(ms))
|
if s.server_certs:
|
self.vprint("Server certificate chain: %r" % s.server_certs)
|
self.vprint()
|
|
|
@ATMT.state(initial=True)
|
def INITIAL(self):
|
self.vprint("Starting TLS client automaton.")
|
raise self.INIT_TLS_SESSION()
|
|
@ATMT.state()
|
def INIT_TLS_SESSION(self):
|
self.cur_session = tlsSession(connection_end="client")
|
self.cur_session.client_certs = self.mycert
|
self.cur_session.client_key = self.mykey
|
v = self.advertised_tls_version
|
if v:
|
self.cur_session.advertised_tls_version = v
|
else:
|
default_version = self.cur_session.advertised_tls_version
|
self.advertised_tls_version = default_version
|
raise self.CONNECT()
|
|
@ATMT.state()
|
def CONNECT(self):
|
s = socket.socket(self.remote_family, socket.SOCK_STREAM)
|
self.vprint()
|
self.vprint("Trying to connect on %s:%d" % (self.remote_ip,
|
self.remote_port))
|
s.connect((self.remote_ip, self.remote_port))
|
self.socket = s
|
self.local_ip, self.local_port = self.socket.getsockname()[:2]
|
self.vprint()
|
if self.cur_session.advertised_tls_version in [0x0200, 0x0002]:
|
raise self.SSLv2_PREPARE_CLIENTHELLO()
|
elif self.cur_session.advertised_tls_version >= 0x0304:
|
raise self.TLS13_START()
|
else:
|
raise self.PREPARE_CLIENTFLIGHT1()
|
|
########################### TLS handshake #################################
|
|
@ATMT.state()
|
def PREPARE_CLIENTFLIGHT1(self):
|
self.add_record()
|
|
@ATMT.condition(PREPARE_CLIENTFLIGHT1)
|
def should_add_ClientHello(self):
|
self.add_msg(self.client_hello or TLSClientHello())
|
raise self.ADDED_CLIENTHELLO()
|
|
@ATMT.state()
|
def ADDED_CLIENTHELLO(self):
|
pass
|
|
@ATMT.condition(ADDED_CLIENTHELLO)
|
def should_send_ClientFlight1(self):
|
self.flush_records()
|
raise self.SENT_CLIENTFLIGHT1()
|
|
@ATMT.state()
|
def SENT_CLIENTFLIGHT1(self):
|
raise self.WAITING_SERVERFLIGHT1()
|
|
@ATMT.state()
|
def WAITING_SERVERFLIGHT1(self):
|
self.get_next_msg()
|
raise self.RECEIVED_SERVERFLIGHT1()
|
|
@ATMT.state()
|
def RECEIVED_SERVERFLIGHT1(self):
|
pass
|
|
@ATMT.condition(RECEIVED_SERVERFLIGHT1, prio=1)
|
def should_handle_ServerHello(self):
|
"""
|
XXX We should check the ServerHello attributes for discrepancies with
|
our own ClientHello.
|
"""
|
self.raise_on_packet(TLSServerHello,
|
self.HANDLED_SERVERHELLO)
|
|
@ATMT.state()
|
def HANDLED_SERVERHELLO(self):
|
pass
|
|
@ATMT.condition(RECEIVED_SERVERFLIGHT1, prio=2)
|
def missing_ServerHello(self):
|
raise self.MISSING_SERVERHELLO()
|
|
@ATMT.state()
|
def MISSING_SERVERHELLO(self):
|
self.vprint("Missing TLS ServerHello message!")
|
raise self.CLOSE_NOTIFY()
|
|
@ATMT.condition(HANDLED_SERVERHELLO, prio=1)
|
def should_handle_ServerCertificate(self):
|
if not self.cur_session.prcs.key_exchange.anonymous:
|
self.raise_on_packet(TLSCertificate,
|
self.HANDLED_SERVERCERTIFICATE)
|
raise self.HANDLED_SERVERCERTIFICATE()
|
|
@ATMT.state()
|
def HANDLED_SERVERCERTIFICATE(self):
|
pass
|
|
@ATMT.condition(HANDLED_SERVERHELLO, prio=2)
|
def missing_ServerCertificate(self):
|
raise self.MISSING_SERVERCERTIFICATE()
|
|
@ATMT.state()
|
def MISSING_SERVERCERTIFICATE(self):
|
self.vprint("Missing TLS Certificate message!")
|
raise self.CLOSE_NOTIFY()
|
|
@ATMT.state()
|
def HANDLED_CERTIFICATEREQUEST(self):
|
self.vprint("Server asked for a certificate...")
|
if not self.mykey or not self.mycert:
|
self.vprint("No client certificate to send!")
|
self.vprint("Will try and send an empty Certificate message...")
|
|
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=1)
|
def should_handle_ServerKeyExchange_from_ServerCertificate(self):
|
"""
|
XXX We should check the ServerKeyExchange attributes for discrepancies
|
with our own ClientHello, along with the ServerHello and Certificate.
|
"""
|
self.raise_on_packet(TLSServerKeyExchange,
|
self.HANDLED_SERVERKEYEXCHANGE)
|
|
@ATMT.state(final=True)
|
def MISSING_SERVERKEYEXCHANGE(self):
|
pass
|
|
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=2)
|
def missing_ServerKeyExchange(self):
|
if not self.cur_session.prcs.key_exchange.no_ske:
|
raise self.MISSING_SERVERKEYEXCHANGE()
|
|
@ATMT.state()
|
def HANDLED_SERVERKEYEXCHANGE(self):
|
pass
|
|
def should_handle_CertificateRequest(self):
|
"""
|
XXX We should check the CertificateRequest attributes for discrepancies
|
with the cipher suite, etc.
|
"""
|
self.raise_on_packet(TLSCertificateRequest,
|
self.HANDLED_CERTIFICATEREQUEST)
|
|
@ATMT.condition(HANDLED_SERVERKEYEXCHANGE, prio=2)
|
def should_handle_CertificateRequest_from_ServerKeyExchange(self):
|
self.should_handle_CertificateRequest()
|
|
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=3)
|
def should_handle_CertificateRequest_from_ServerCertificate(self):
|
self.should_handle_CertificateRequest()
|
|
def should_handle_ServerHelloDone(self):
|
self.raise_on_packet(TLSServerHelloDone,
|
self.HANDLED_SERVERHELLODONE)
|
|
@ATMT.condition(HANDLED_SERVERKEYEXCHANGE, prio=1)
|
def should_handle_ServerHelloDone_from_ServerKeyExchange(self):
|
return self.should_handle_ServerHelloDone()
|
|
@ATMT.condition(HANDLED_CERTIFICATEREQUEST, prio=4)
|
def should_handle_ServerHelloDone_from_CertificateRequest(self):
|
return self.should_handle_ServerHelloDone()
|
|
@ATMT.condition(HANDLED_SERVERCERTIFICATE, prio=4)
|
def should_handle_ServerHelloDone_from_ServerCertificate(self):
|
return self.should_handle_ServerHelloDone()
|
|
@ATMT.state()
|
def HANDLED_SERVERHELLODONE(self):
|
raise self.PREPARE_CLIENTFLIGHT2()
|
|
@ATMT.state()
|
def PREPARE_CLIENTFLIGHT2(self):
|
self.add_record()
|
|
@ATMT.condition(PREPARE_CLIENTFLIGHT2, prio=1)
|
def should_add_ClientCertificate(self):
|
"""
|
If the server sent a CertificateRequest, we send a Certificate message.
|
If no certificate is available, an empty Certificate message is sent:
|
- this is a SHOULD in RFC 4346 (Section 7.4.6)
|
- this is a MUST in RFC 5246 (Section 7.4.6)
|
|
XXX We may want to add a complete chain.
|
"""
|
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
|
if not TLSCertificateRequest in hs_msg:
|
return
|
certs = []
|
if self.mycert:
|
certs = [self.mycert]
|
self.add_msg(TLSCertificate(certs=certs))
|
raise self.ADDED_CLIENTCERTIFICATE()
|
|
@ATMT.state()
|
def ADDED_CLIENTCERTIFICATE(self):
|
pass
|
|
def should_add_ClientKeyExchange(self):
|
self.add_msg(TLSClientKeyExchange())
|
raise self.ADDED_CLIENTKEYEXCHANGE()
|
|
@ATMT.condition(PREPARE_CLIENTFLIGHT2, prio=2)
|
def should_add_ClientKeyExchange_from_ClientFlight2(self):
|
return self.should_add_ClientKeyExchange()
|
|
@ATMT.condition(ADDED_CLIENTCERTIFICATE)
|
def should_add_ClientKeyExchange_from_ClientCertificate(self):
|
return self.should_add_ClientKeyExchange()
|
|
@ATMT.state()
|
def ADDED_CLIENTKEYEXCHANGE(self):
|
pass
|
|
@ATMT.condition(ADDED_CLIENTKEYEXCHANGE, prio=1)
|
def should_add_ClientVerify(self):
|
"""
|
XXX Section 7.4.7.1 of RFC 5246 states that the CertificateVerify
|
message is only sent following a client certificate that has signing
|
capability (i.e. not those containing fixed DH params).
|
We should verify that before adding the message. We should also handle
|
the case when the Certificate message was empty.
|
"""
|
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
|
if (not TLSCertificateRequest in hs_msg or
|
self.mycert is None or
|
self.mykey is None):
|
return
|
self.add_msg(TLSCertificateVerify())
|
raise self.ADDED_CERTIFICATEVERIFY()
|
|
@ATMT.state()
|
def ADDED_CERTIFICATEVERIFY(self):
|
pass
|
|
@ATMT.condition(ADDED_CERTIFICATEVERIFY)
|
def should_add_ChangeCipherSpec_from_CertificateVerify(self):
|
self.add_record()
|
self.add_msg(TLSChangeCipherSpec())
|
raise self.ADDED_CHANGECIPHERSPEC()
|
|
@ATMT.condition(ADDED_CLIENTKEYEXCHANGE, prio=2)
|
def should_add_ChangeCipherSpec_from_ClientKeyExchange(self):
|
self.add_record()
|
self.add_msg(TLSChangeCipherSpec())
|
raise self.ADDED_CHANGECIPHERSPEC()
|
|
@ATMT.state()
|
def ADDED_CHANGECIPHERSPEC(self):
|
pass
|
|
@ATMT.condition(ADDED_CHANGECIPHERSPEC)
|
def should_add_ClientFinished(self):
|
self.add_record()
|
self.add_msg(TLSFinished())
|
raise self.ADDED_CLIENTFINISHED()
|
|
@ATMT.state()
|
def ADDED_CLIENTFINISHED(self):
|
pass
|
|
@ATMT.condition(ADDED_CLIENTFINISHED)
|
def should_send_ClientFlight2(self):
|
self.flush_records()
|
raise self.SENT_CLIENTFLIGHT2()
|
|
@ATMT.state()
|
def SENT_CLIENTFLIGHT2(self):
|
raise self.WAITING_SERVERFLIGHT2()
|
|
@ATMT.state()
|
def WAITING_SERVERFLIGHT2(self):
|
self.get_next_msg()
|
raise self.RECEIVED_SERVERFLIGHT2()
|
|
@ATMT.state()
|
def RECEIVED_SERVERFLIGHT2(self):
|
pass
|
|
@ATMT.condition(RECEIVED_SERVERFLIGHT2)
|
def should_handle_ChangeCipherSpec(self):
|
self.raise_on_packet(TLSChangeCipherSpec,
|
self.HANDLED_CHANGECIPHERSPEC)
|
|
@ATMT.state()
|
def HANDLED_CHANGECIPHERSPEC(self):
|
pass
|
|
@ATMT.condition(HANDLED_CHANGECIPHERSPEC)
|
def should_handle_Finished(self):
|
self.raise_on_packet(TLSFinished,
|
self.HANDLED_SERVERFINISHED)
|
|
@ATMT.state()
|
def HANDLED_SERVERFINISHED(self):
|
self.vprint("TLS handshake completed!")
|
self.vprint_sessioninfo()
|
self.vprint("You may send data or use 'quit'.")
|
|
####################### end of TLS handshake ##############################
|
|
@ATMT.condition(HANDLED_SERVERFINISHED)
|
def should_wait_ClientData(self):
|
raise self.WAIT_CLIENTDATA()
|
|
@ATMT.state()
|
def WAIT_CLIENTDATA(self):
|
pass
|
|
@ATMT.condition(WAIT_CLIENTDATA, prio=1)
|
def add_ClientData(self):
|
"""
|
The user may type in:
|
GET / HTTP/1.1\r\nHost: testserver.com\r\n\r\n
|
Special characters are handled so that it becomes a valid HTTP request.
|
"""
|
if not self.data_to_send:
|
data = six.moves.input().replace('\\r', '\r').replace('\\n', '\n').encode()
|
else:
|
data = self.data_to_send.pop()
|
if data == b"quit":
|
return
|
if self.linebreak:
|
data += b"\n"
|
self.add_record()
|
self.add_msg(TLSApplicationData(data=data))
|
raise self.ADDED_CLIENTDATA()
|
|
@ATMT.condition(WAIT_CLIENTDATA, prio=2)
|
def no_more_ClientData(self):
|
raise self.CLOSE_NOTIFY()
|
|
@ATMT.state()
|
def ADDED_CLIENTDATA(self):
|
pass
|
|
@ATMT.condition(ADDED_CLIENTDATA)
|
def should_send_ClientData(self):
|
self.flush_records()
|
raise self.SENT_CLIENTDATA()
|
|
@ATMT.state()
|
def SENT_CLIENTDATA(self):
|
raise self.WAITING_SERVERDATA()
|
|
@ATMT.state()
|
def WAITING_SERVERDATA(self):
|
self.get_next_msg(0.3, 1)
|
raise self.RECEIVED_SERVERDATA()
|
|
@ATMT.state()
|
def RECEIVED_SERVERDATA(self):
|
pass
|
|
@ATMT.condition(RECEIVED_SERVERDATA, prio=1)
|
def should_handle_ServerData(self):
|
if not self.buffer_in:
|
raise self.WAIT_CLIENTDATA()
|
p = self.buffer_in[0]
|
if isinstance(p, TLSApplicationData):
|
print("> Received: %r" % p.data)
|
elif isinstance(p, TLSAlert):
|
print("> Received: %r" % p)
|
raise self.CLOSE_NOTIFY()
|
else:
|
print("> Received: %r" % p)
|
self.buffer_in = self.buffer_in[1:]
|
raise self.HANDLED_SERVERDATA()
|
|
@ATMT.state()
|
def HANDLED_SERVERDATA(self):
|
raise self.WAIT_CLIENTDATA()
|
|
@ATMT.state()
|
def CLOSE_NOTIFY(self):
|
self.vprint()
|
self.vprint("Trying to send a TLSAlert to the server...")
|
|
@ATMT.condition(CLOSE_NOTIFY)
|
def close_session(self):
|
self.add_record()
|
self.add_msg(TLSAlert(level=1, descr=0))
|
try:
|
self.flush_records()
|
except:
|
self.vprint("Could not send termination Alert, maybe the server stopped?")
|
raise self.FINAL()
|
|
########################## SSLv2 handshake ################################
|
|
@ATMT.state()
|
def SSLv2_PREPARE_CLIENTHELLO(self):
|
pass
|
|
@ATMT.condition(SSLv2_PREPARE_CLIENTHELLO)
|
def sslv2_should_add_ClientHello(self):
|
self.add_record(is_sslv2=True)
|
p = self.client_hello or SSLv2ClientHello(challenge=randstring(16))
|
self.add_msg(p)
|
raise self.SSLv2_ADDED_CLIENTHELLO()
|
|
@ATMT.state()
|
def SSLv2_ADDED_CLIENTHELLO(self):
|
pass
|
|
@ATMT.condition(SSLv2_ADDED_CLIENTHELLO)
|
def sslv2_should_send_ClientHello(self):
|
self.flush_records()
|
raise self.SSLv2_SENT_CLIENTHELLO()
|
|
@ATMT.state()
|
def SSLv2_SENT_CLIENTHELLO(self):
|
raise self.SSLv2_WAITING_SERVERHELLO()
|
|
@ATMT.state()
|
def SSLv2_WAITING_SERVERHELLO(self):
|
self.get_next_msg()
|
raise self.SSLv2_RECEIVED_SERVERHELLO()
|
|
@ATMT.state()
|
def SSLv2_RECEIVED_SERVERHELLO(self):
|
pass
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERHELLO, prio=1)
|
def sslv2_should_handle_ServerHello(self):
|
self.raise_on_packet(SSLv2ServerHello,
|
self.SSLv2_HANDLED_SERVERHELLO)
|
|
@ATMT.state()
|
def SSLv2_HANDLED_SERVERHELLO(self):
|
pass
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERHELLO, prio=2)
|
def sslv2_missing_ServerHello(self):
|
raise self.SSLv2_MISSING_SERVERHELLO()
|
|
@ATMT.state()
|
def SSLv2_MISSING_SERVERHELLO(self):
|
self.vprint("Missing SSLv2 ServerHello message!")
|
raise self.SSLv2_CLOSE_NOTIFY()
|
|
@ATMT.condition(SSLv2_HANDLED_SERVERHELLO)
|
def sslv2_should_add_ClientMasterKey(self):
|
self.add_record(is_sslv2=True)
|
self.add_msg(SSLv2ClientMasterKey())
|
raise self.SSLv2_ADDED_CLIENTMASTERKEY()
|
|
@ATMT.state()
|
def SSLv2_ADDED_CLIENTMASTERKEY(self):
|
pass
|
|
@ATMT.condition(SSLv2_ADDED_CLIENTMASTERKEY)
|
def sslv2_should_send_ClientMasterKey(self):
|
self.flush_records()
|
raise self.SSLv2_SENT_CLIENTMASTERKEY()
|
|
@ATMT.state()
|
def SSLv2_SENT_CLIENTMASTERKEY(self):
|
raise self.SSLv2_WAITING_SERVERVERIFY()
|
|
@ATMT.state()
|
def SSLv2_WAITING_SERVERVERIFY(self):
|
# We give the server 0.5 second to send his ServerVerify.
|
# Else we assume that he's waiting for our ClientFinished.
|
self.get_next_msg(0.5, 0)
|
raise self.SSLv2_RECEIVED_SERVERVERIFY()
|
|
@ATMT.state()
|
def SSLv2_RECEIVED_SERVERVERIFY(self):
|
pass
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERVERIFY, prio=1)
|
def sslv2_should_handle_ServerVerify(self):
|
self.raise_on_packet(SSLv2ServerVerify,
|
self.SSLv2_HANDLED_SERVERVERIFY,
|
get_next_msg=False)
|
|
@ATMT.state()
|
def SSLv2_HANDLED_SERVERVERIFY(self):
|
pass
|
|
def sslv2_should_add_ClientFinished(self):
|
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
|
if SSLv2ClientFinished in hs_msg:
|
return
|
self.add_record(is_sslv2=True)
|
self.add_msg(SSLv2ClientFinished())
|
raise self.SSLv2_ADDED_CLIENTFINISHED()
|
|
@ATMT.condition(SSLv2_HANDLED_SERVERVERIFY, prio=1)
|
def sslv2_should_add_ClientFinished_from_ServerVerify(self):
|
return self.sslv2_should_add_ClientFinished()
|
|
@ATMT.condition(SSLv2_HANDLED_SERVERVERIFY, prio=2)
|
def sslv2_should_wait_ServerFinished_from_ServerVerify(self):
|
raise self.SSLv2_WAITING_SERVERFINISHED()
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERVERIFY, prio=2)
|
def sslv2_should_add_ClientFinished_from_NoServerVerify(self):
|
return self.sslv2_should_add_ClientFinished()
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERVERIFY, prio=3)
|
def sslv2_missing_ServerVerify(self):
|
raise self.SSLv2_MISSING_SERVERVERIFY()
|
|
@ATMT.state(final=True)
|
def SSLv2_MISSING_SERVERVERIFY(self):
|
self.vprint("Missing SSLv2 ServerVerify message!")
|
raise self.SSLv2_CLOSE_NOTIFY()
|
|
@ATMT.state()
|
def SSLv2_ADDED_CLIENTFINISHED(self):
|
pass
|
|
@ATMT.condition(SSLv2_ADDED_CLIENTFINISHED)
|
def sslv2_should_send_ClientFinished(self):
|
self.flush_records()
|
raise self.SSLv2_SENT_CLIENTFINISHED()
|
|
@ATMT.state()
|
def SSLv2_SENT_CLIENTFINISHED(self):
|
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
|
if SSLv2ServerVerify in hs_msg:
|
raise self.SSLv2_WAITING_SERVERFINISHED()
|
else:
|
self.get_next_msg()
|
raise self.SSLv2_RECEIVED_SERVERVERIFY()
|
|
@ATMT.state()
|
def SSLv2_WAITING_SERVERFINISHED(self):
|
self.get_next_msg()
|
raise self.SSLv2_RECEIVED_SERVERFINISHED()
|
|
@ATMT.state()
|
def SSLv2_RECEIVED_SERVERFINISHED(self):
|
pass
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERFINISHED, prio=1)
|
def sslv2_should_handle_ServerFinished(self):
|
self.raise_on_packet(SSLv2ServerFinished,
|
self.SSLv2_HANDLED_SERVERFINISHED)
|
|
####################### SSLv2 client authentication #######################
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERFINISHED, prio=2)
|
def sslv2_should_handle_RequestCertificate(self):
|
self.raise_on_packet(SSLv2RequestCertificate,
|
self.SSLv2_HANDLED_REQUESTCERTIFICATE)
|
|
@ATMT.state()
|
def SSLv2_HANDLED_REQUESTCERTIFICATE(self):
|
self.vprint("Server asked for a certificate...")
|
if not self.mykey or not self.mycert:
|
self.vprint("No client certificate to send!")
|
raise self.SSLv2_CLOSE_NOTIFY()
|
|
@ATMT.condition(SSLv2_HANDLED_REQUESTCERTIFICATE)
|
def sslv2_should_add_ClientCertificate(self):
|
self.add_record(is_sslv2=True)
|
self.add_msg(SSLv2ClientCertificate(certdata=self.mycert))
|
raise self.SSLv2_ADDED_CLIENTCERTIFICATE()
|
|
@ATMT.state()
|
def SSLv2_ADDED_CLIENTCERTIFICATE(self):
|
pass
|
|
@ATMT.condition(SSLv2_ADDED_CLIENTCERTIFICATE)
|
def sslv2_should_send_ClientCertificate(self):
|
self.flush_records()
|
raise self.SSLv2_SENT_CLIENTCERTIFICATE()
|
|
@ATMT.state()
|
def SSLv2_SENT_CLIENTCERTIFICATE(self):
|
raise self.SSLv2_WAITING_SERVERFINISHED()
|
|
################### end of SSLv2 client authentication ####################
|
|
@ATMT.state()
|
def SSLv2_HANDLED_SERVERFINISHED(self):
|
self.vprint("SSLv2 handshake completed!")
|
self.vprint_sessioninfo()
|
self.vprint("You may send data or use 'quit'.")
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERFINISHED, prio=3)
|
def sslv2_missing_ServerFinished(self):
|
raise self.SSLv2_MISSING_SERVERFINISHED()
|
|
@ATMT.state()
|
def SSLv2_MISSING_SERVERFINISHED(self):
|
self.vprint("Missing SSLv2 ServerFinished message!")
|
raise self.SSLv2_CLOSE_NOTIFY()
|
|
######################## end of SSLv2 handshake ###########################
|
|
@ATMT.condition(SSLv2_HANDLED_SERVERFINISHED)
|
def sslv2_should_wait_ClientData(self):
|
raise self.SSLv2_WAITING_CLIENTDATA()
|
|
@ATMT.state()
|
def SSLv2_WAITING_CLIENTDATA(self):
|
pass
|
|
@ATMT.condition(SSLv2_WAITING_CLIENTDATA, prio=1)
|
def sslv2_add_ClientData(self):
|
if not self.data_to_send:
|
data = six.moves.input().replace('\\r', '\r').replace('\\n', '\n').encode()
|
else:
|
data = self.data_to_send.pop()
|
self.vprint("> Read from list: %s" % data)
|
if data == "quit":
|
return
|
if self.linebreak:
|
data += "\n"
|
self.add_record(is_sslv2=True)
|
self.add_msg(Raw(data))
|
raise self.SSLv2_ADDED_CLIENTDATA()
|
|
@ATMT.condition(SSLv2_WAITING_CLIENTDATA, prio=2)
|
def sslv2_no_more_ClientData(self):
|
raise self.SSLv2_CLOSE_NOTIFY()
|
|
@ATMT.state()
|
def SSLv2_ADDED_CLIENTDATA(self):
|
pass
|
|
@ATMT.condition(SSLv2_ADDED_CLIENTDATA)
|
def sslv2_should_send_ClientData(self):
|
self.flush_records()
|
raise self.SSLv2_SENT_CLIENTDATA()
|
|
@ATMT.state()
|
def SSLv2_SENT_CLIENTDATA(self):
|
raise self.SSLv2_WAITING_SERVERDATA()
|
|
@ATMT.state()
|
def SSLv2_WAITING_SERVERDATA(self):
|
self.get_next_msg(0.3, 1)
|
raise self.SSLv2_RECEIVED_SERVERDATA()
|
|
@ATMT.state()
|
def SSLv2_RECEIVED_SERVERDATA(self):
|
pass
|
|
@ATMT.condition(SSLv2_RECEIVED_SERVERDATA)
|
def sslv2_should_handle_ServerData(self):
|
if not self.buffer_in:
|
raise self.SSLv2_WAITING_CLIENTDATA()
|
p = self.buffer_in[0]
|
print("> Received: %r" % p.load)
|
if p.load.startswith(b"goodbye"):
|
raise self.SSLv2_CLOSE_NOTIFY()
|
self.buffer_in = self.buffer_in[1:]
|
raise self.SSLv2_HANDLED_SERVERDATA()
|
|
@ATMT.state()
|
def SSLv2_HANDLED_SERVERDATA(self):
|
raise self.SSLv2_WAITING_CLIENTDATA()
|
|
@ATMT.state()
|
def SSLv2_CLOSE_NOTIFY(self):
|
"""
|
There is no proper way to end an SSLv2 session.
|
We try and send a 'goodbye' message as a substitute.
|
"""
|
self.vprint()
|
self.vprint("Trying to send a 'goodbye' to the server...")
|
|
@ATMT.condition(SSLv2_CLOSE_NOTIFY)
|
def sslv2_close_session(self):
|
self.add_record()
|
self.add_msg(Raw('goodbye'))
|
try:
|
self.flush_records()
|
except:
|
self.vprint("Could not send our goodbye. The server probably stopped.")
|
self.socket.close()
|
raise self.FINAL()
|
|
######################### TLS 1.3 handshake ###############################
|
|
@ATMT.state()
|
def TLS13_START(self):
|
pass
|
|
@ATMT.condition(TLS13_START)
|
def tls13_should_add_ClientHello(self):
|
# we have to use the legacy, plaintext TLS record here
|
self.add_record(is_tls13=False)
|
if self.client_hello:
|
p = self.client_hello
|
else:
|
# When trying to connect to a public TLS 1.3 server,
|
# you will most likely need to provide an SNI extension.
|
#sn = ServerName(servername="<put server name here>")
|
ext = [TLS_Ext_SupportedGroups(groups=["secp256r1"]),
|
#TLS_Ext_ServerName(servernames=[sn]),
|
TLS_Ext_KeyShare_CH(client_shares=[KeyShareEntry(group=23)]),
|
TLS_Ext_SupportedVersions(versions=["TLS 1.3-d18"]),
|
TLS_Ext_SignatureAlgorithms(sig_algs=["sha256+rsapss",
|
"sha256+rsa"]) ]
|
p = TLSClientHello(ciphers=0x1301, ext=ext)
|
self.add_msg(p)
|
raise self.TLS13_ADDED_CLIENTHELLO()
|
|
@ATMT.state()
|
def TLS13_ADDED_CLIENTHELLO(self):
|
pass
|
|
@ATMT.condition(TLS13_ADDED_CLIENTHELLO)
|
def tls13_should_send_ClientHello(self):
|
self.flush_records()
|
raise self.TLS13_SENT_CLIENTHELLO()
|
|
@ATMT.state()
|
def TLS13_SENT_CLIENTHELLO(self):
|
raise self.TLS13_WAITING_SERVERHELLO()
|
|
@ATMT.state()
|
def TLS13_WAITING_SERVERHELLO(self):
|
self.get_next_msg()
|
|
@ATMT.condition(TLS13_WAITING_SERVERHELLO)
|
def tls13_should_handle_ServerHello(self):
|
self.raise_on_packet(TLS13ServerHello,
|
self.TLS13_WAITING_ENCRYPTEDEXTENSIONS)
|
|
@ATMT.state()
|
def TLS13_WAITING_ENCRYPTEDEXTENSIONS(self):
|
self.get_next_msg()
|
|
@ATMT.condition(TLS13_WAITING_ENCRYPTEDEXTENSIONS)
|
def tls13_should_handle_EncryptedExtensions(self):
|
self.raise_on_packet(TLSEncryptedExtensions,
|
self.TLS13_WAITING_CERTIFICATE)
|
|
@ATMT.state()
|
def TLS13_WAITING_CERTIFICATE(self):
|
self.get_next_msg()
|
|
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=1)
|
def tls13_should_handle_Certificate(self):
|
self.raise_on_packet(TLS13Certificate,
|
self.TLS13_WAITING_CERTIFICATEVERIFY)
|
|
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=2)
|
def tls13_should_handle_CertificateRequest(self):
|
hs_msg = [type(m) for m in self.cur_session.handshake_messages_parsed]
|
if TLSCertificateRequest in hs_msg:
|
self.vprint("TLSCertificateRequest already received!")
|
self.raise_on_packet(TLSCertificateRequest,
|
self.TLS13_WAITING_CERTIFICATE)
|
|
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=3)
|
def tls13_should_handle_ServerFinished_from_EncryptedExtensions(self):
|
self.raise_on_packet(TLSFinished,
|
self.TLS13_CONNECTED)
|
|
@ATMT.condition(TLS13_WAITING_CERTIFICATE, prio=4)
|
def tls13_missing_Certificate(self):
|
self.vprint("Missing TLS 1.3 message after EncryptedExtensions!")
|
raise self.FINAL()
|
|
@ATMT.state()
|
def TLS13_WAITING_CERTIFICATEVERIFY(self):
|
self.get_next_msg()
|
|
@ATMT.condition(TLS13_WAITING_CERTIFICATEVERIFY)
|
def tls13_should_handle_CertificateVerify(self):
|
self.raise_on_packet(TLSCertificateVerify,
|
self.TLS13_WAITING_SERVERFINISHED)
|
|
@ATMT.state()
|
def TLS13_WAITING_SERVERFINISHED(self):
|
self.get_next_msg()
|
|
@ATMT.condition(TLS13_WAITING_SERVERFINISHED)
|
def tls13_should_handle_ServerFinished_from_CertificateVerify(self):
|
self.raise_on_packet(TLSFinished,
|
self.TLS13_PREPARE_CLIENTFLIGHT2)
|
|
@ATMT.state()
|
def TLS13_PREPARE_CLIENTFLIGHT2(self):
|
self.add_record(is_tls13=True)
|
#raise self.FINAL()
|
|
@ATMT.condition(TLS13_PREPARE_CLIENTFLIGHT2)
|
def tls13_should_add_ClientFinished(self):
|
self.add_msg(TLSFinished())
|
raise self.TLS13_ADDED_CLIENTFINISHED()
|
|
@ATMT.state()
|
def TLS13_ADDED_CLIENTFINISHED(self):
|
pass
|
|
@ATMT.condition(TLS13_ADDED_CLIENTFINISHED)
|
def tls13_should_send_ClientFlight2(self):
|
self.flush_records()
|
raise self.TLS13_SENT_CLIENTFLIGHT2()
|
|
@ATMT.state()
|
def TLS13_SENT_CLIENTFLIGHT2(self):
|
raise self.HANDLED_SERVERFINISHED()
|
|
@ATMT.state(final=True)
|
def FINAL(self):
|
# We might call shutdown, but it may happen that the server
|
# did not wait for us to shutdown after answering our data query.
|
#self.socket.shutdown(1)
|
self.vprint("Closing client socket...")
|
self.socket.close()
|
self.vprint("Ending TLS client automaton.")
|