mirror of
https://github.com/scrapy/scrapy.git
synced 2025-03-14 16:58:20 +00:00
94 lines
3.9 KiB
Python
94 lines
3.9 KiB
Python
import logging
|
|
|
|
from OpenSSL import SSL
|
|
from service_identity.exceptions import CertificateError
|
|
from twisted.internet._sslverify import ClientTLSOptions, verifyHostname, VerificationError
|
|
from twisted.internet.ssl import AcceptableCiphers
|
|
|
|
from scrapy import twisted_version
|
|
from scrapy.utils.ssl import x509name_to_string, get_temp_key_info
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
METHOD_SSLv3 = 'SSLv3'
|
|
METHOD_TLS = 'TLS'
|
|
METHOD_TLSv10 = 'TLSv1.0'
|
|
METHOD_TLSv11 = 'TLSv1.1'
|
|
METHOD_TLSv12 = 'TLSv1.2'
|
|
|
|
|
|
openssl_methods = {
|
|
METHOD_TLS: SSL.SSLv23_METHOD, # protocol negotiation (recommended)
|
|
METHOD_SSLv3: SSL.SSLv3_METHOD, # SSL 3 (NOT recommended)
|
|
METHOD_TLSv10: SSL.TLSv1_METHOD, # TLS 1.0 only
|
|
METHOD_TLSv11: getattr(SSL, 'TLSv1_1_METHOD', 5), # TLS 1.1 only
|
|
METHOD_TLSv12: getattr(SSL, 'TLSv1_2_METHOD', 6), # TLS 1.2 only
|
|
}
|
|
|
|
|
|
if twisted_version < (17, 0, 0):
|
|
from twisted.internet._sslverify import _maybeSetHostNameIndication as set_tlsext_host_name
|
|
else:
|
|
def set_tlsext_host_name(connection, hostNameBytes):
|
|
connection.set_tlsext_host_name(hostNameBytes)
|
|
|
|
|
|
class ScrapyClientTLSOptions(ClientTLSOptions):
|
|
"""
|
|
SSL Client connection creator ignoring certificate verification errors
|
|
(for genuinely invalid certificates or bugs in verification code).
|
|
|
|
Same as Twisted's private _sslverify.ClientTLSOptions,
|
|
except that VerificationError, CertificateError and ValueError
|
|
exceptions are caught, so that the connection is not closed, only
|
|
logging warnings. Also, HTTPS connection parameters logging is added.
|
|
"""
|
|
|
|
def __init__(self, hostname, ctx, verbose_logging=False):
|
|
super(ScrapyClientTLSOptions, self).__init__(hostname, ctx)
|
|
self.verbose_logging = verbose_logging
|
|
|
|
def _identityVerifyingInfoCallback(self, connection, where, ret):
|
|
if where & SSL.SSL_CB_HANDSHAKE_START:
|
|
set_tlsext_host_name(connection, self._hostnameBytes)
|
|
elif where & SSL.SSL_CB_HANDSHAKE_DONE:
|
|
if self.verbose_logging:
|
|
if hasattr(connection, 'get_cipher_name'): # requires pyOPenSSL 0.15
|
|
if hasattr(connection, 'get_protocol_version_name'): # requires pyOPenSSL 16.0.0
|
|
logger.debug('SSL connection to %s using protocol %s, cipher %s',
|
|
self._hostnameASCII,
|
|
connection.get_protocol_version_name(),
|
|
connection.get_cipher_name(),
|
|
)
|
|
else:
|
|
logger.debug('SSL connection to %s using cipher %s',
|
|
self._hostnameASCII,
|
|
connection.get_cipher_name(),
|
|
)
|
|
server_cert = connection.get_peer_certificate()
|
|
logger.debug('SSL connection certificate: issuer "%s", subject "%s"',
|
|
x509name_to_string(server_cert.get_issuer()),
|
|
x509name_to_string(server_cert.get_subject()),
|
|
)
|
|
key_info = get_temp_key_info(connection._ssl)
|
|
if key_info:
|
|
logger.debug('SSL temp key: %s', key_info)
|
|
|
|
try:
|
|
verifyHostname(connection, self._hostnameASCII)
|
|
except (CertificateError, VerificationError) as e:
|
|
logger.warning(
|
|
'Remote certificate is not valid for hostname "{}"; {}'.format(
|
|
self._hostnameASCII, e))
|
|
|
|
except ValueError as e:
|
|
logger.warning(
|
|
'Ignoring error while verifying certificate '
|
|
'from host "{}" (exception: {})'.format(
|
|
self._hostnameASCII, repr(e)))
|
|
|
|
|
|
DEFAULT_CIPHERS = AcceptableCiphers.fromOpenSSLCipherString('DEFAULT')
|