| from __future__ import annotations |
| |
| import hmac |
| import os |
| import socket |
| import sys |
| import typing |
| import warnings |
| from binascii import unhexlify |
| from hashlib import md5, sha1, sha256 |
| |
| from ..exceptions import ProxySchemeUnsupported, SSLError |
| from .url import _BRACELESS_IPV6_ADDRZ_RE, _IPV4_RE |
| |
| SSLContext = None |
| SSLTransport = None |
| HAS_NEVER_CHECK_COMMON_NAME = False |
| IS_PYOPENSSL = False |
| IS_SECURETRANSPORT = False |
| ALPN_PROTOCOLS = ["http/1.1"] |
| |
| _TYPE_VERSION_INFO = typing.Tuple[int, int, int, str, int] |
| |
| # Maps the length of a digest to a possible hash function producing this digest |
| HASHFUNC_MAP = {32: md5, 40: sha1, 64: sha256} |
| |
| |
| def _is_bpo_43522_fixed( |
| implementation_name: str, |
| version_info: _TYPE_VERSION_INFO, |
| pypy_version_info: _TYPE_VERSION_INFO | None, |
| ) -> bool: |
| """Return True for CPython 3.8.9+, 3.9.3+ or 3.10+ and PyPy 7.3.8+ where |
| setting SSLContext.hostname_checks_common_name to False works. |
| |
| Outside of CPython and PyPy we don't know which implementations work |
| or not so we conservatively use our hostname matching as we know that works |
| on all implementations. |
| |
| https://github.com/urllib3/urllib3/issues/2192#issuecomment-821832963 |
| https://foss.heptapod.net/pypy/pypy/-/issues/3539 |
| """ |
| if implementation_name == "pypy": |
| # https://foss.heptapod.net/pypy/pypy/-/issues/3129 |
| return pypy_version_info >= (7, 3, 8) and version_info >= (3, 8) # type: ignore[operator] |
| elif implementation_name == "cpython": |
| major_minor = version_info[:2] |
| micro = version_info[2] |
| return ( |
| (major_minor == (3, 8) and micro >= 9) |
| or (major_minor == (3, 9) and micro >= 3) |
| or major_minor >= (3, 10) |
| ) |
| else: # Defensive: |
| return False |
| |
| |
| def _is_has_never_check_common_name_reliable( |
| openssl_version: str, |
| openssl_version_number: int, |
| implementation_name: str, |
| version_info: _TYPE_VERSION_INFO, |
| pypy_version_info: _TYPE_VERSION_INFO | None, |
| ) -> bool: |
| # As of May 2023, all released versions of LibreSSL fail to reject certificates with |
| # only common names, see https://github.com/urllib3/urllib3/pull/3024 |
| is_openssl = openssl_version.startswith("OpenSSL ") |
| # Before fixing OpenSSL issue #14579, the SSL_new() API was not copying hostflags |
| # like X509_CHECK_FLAG_NEVER_CHECK_SUBJECT, which tripped up CPython. |
| # https://github.com/openssl/openssl/issues/14579 |
| # This was released in OpenSSL 1.1.1l+ (>=0x101010cf) |
| is_openssl_issue_14579_fixed = openssl_version_number >= 0x101010CF |
| |
| return is_openssl and ( |
| is_openssl_issue_14579_fixed |
| or _is_bpo_43522_fixed(implementation_name, version_info, pypy_version_info) |
| ) |
| |
| |
| if typing.TYPE_CHECKING: |
| from ssl import VerifyMode |
| |
| from typing_extensions import Literal, TypedDict |
| |
| from .ssltransport import SSLTransport as SSLTransportType |
| |
| class _TYPE_PEER_CERT_RET_DICT(TypedDict, total=False): |
| subjectAltName: tuple[tuple[str, str], ...] |
| subject: tuple[tuple[tuple[str, str], ...], ...] |
| serialNumber: str |
| |
| |
| # Mapping from 'ssl.PROTOCOL_TLSX' to 'TLSVersion.X' |
| _SSL_VERSION_TO_TLS_VERSION: dict[int, int] = {} |
| |
| try: # Do we have ssl at all? |
| import ssl |
| from ssl import ( # type: ignore[assignment] |
| CERT_REQUIRED, |
| HAS_NEVER_CHECK_COMMON_NAME, |
| OP_NO_COMPRESSION, |
| OP_NO_TICKET, |
| OPENSSL_VERSION, |
| OPENSSL_VERSION_NUMBER, |
| PROTOCOL_TLS, |
| PROTOCOL_TLS_CLIENT, |
| OP_NO_SSLv2, |
| OP_NO_SSLv3, |
| SSLContext, |
| TLSVersion, |
| ) |
| |
| PROTOCOL_SSLv23 = PROTOCOL_TLS |
| |
| # Setting SSLContext.hostname_checks_common_name = False didn't work before CPython |
| # 3.8.9, 3.9.3, and 3.10 (but OK on PyPy) or OpenSSL 1.1.1l+ |
| if HAS_NEVER_CHECK_COMMON_NAME and not _is_has_never_check_common_name_reliable( |
| OPENSSL_VERSION, |
| OPENSSL_VERSION_NUMBER, |
| sys.implementation.name, |
| sys.version_info, |
| sys.pypy_version_info if sys.implementation.name == "pypy" else None, # type: ignore[attr-defined] |
| ): |
| HAS_NEVER_CHECK_COMMON_NAME = False |
| |
| # Need to be careful here in case old TLS versions get |
| # removed in future 'ssl' module implementations. |
| for attr in ("TLSv1", "TLSv1_1", "TLSv1_2"): |
| try: |
| _SSL_VERSION_TO_TLS_VERSION[getattr(ssl, f"PROTOCOL_{attr}")] = getattr( |
| TLSVersion, attr |
| ) |
| except AttributeError: # Defensive: |
| continue |
| |
| from .ssltransport import SSLTransport # type: ignore[assignment] |
| except ImportError: |
| OP_NO_COMPRESSION = 0x20000 # type: ignore[assignment] |
| OP_NO_TICKET = 0x4000 # type: ignore[assignment] |
| OP_NO_SSLv2 = 0x1000000 # type: ignore[assignment] |
| OP_NO_SSLv3 = 0x2000000 # type: ignore[assignment] |
| PROTOCOL_SSLv23 = PROTOCOL_TLS = 2 # type: ignore[assignment] |
| PROTOCOL_TLS_CLIENT = 16 # type: ignore[assignment] |
| |
| |
| _TYPE_PEER_CERT_RET = typing.Union["_TYPE_PEER_CERT_RET_DICT", bytes, None] |
| |
| |
| def assert_fingerprint(cert: bytes | None, fingerprint: str) -> None: |
| """ |
| Checks if given fingerprint matches the supplied certificate. |
| |
| :param cert: |
| Certificate as bytes object. |
| :param fingerprint: |
| Fingerprint as string of hexdigits, can be interspersed by colons. |
| """ |
| |
| if cert is None: |
| raise SSLError("No certificate for the peer.") |
| |
| fingerprint = fingerprint.replace(":", "").lower() |
| digest_length = len(fingerprint) |
| hashfunc = HASHFUNC_MAP.get(digest_length) |
| if not hashfunc: |
| raise SSLError(f"Fingerprint of invalid length: {fingerprint}") |
| |
| # We need encode() here for py32; works on py2 and p33. |
| fingerprint_bytes = unhexlify(fingerprint.encode()) |
| |
| cert_digest = hashfunc(cert).digest() |
| |
| if not hmac.compare_digest(cert_digest, fingerprint_bytes): |
| raise SSLError( |
| f'Fingerprints did not match. Expected "{fingerprint}", got "{cert_digest.hex()}"' |
| ) |
| |
| |
| def resolve_cert_reqs(candidate: None | int | str) -> VerifyMode: |
| """ |
| Resolves the argument to a numeric constant, which can be passed to |
| the wrap_socket function/method from the ssl module. |
| Defaults to :data:`ssl.CERT_REQUIRED`. |
| If given a string it is assumed to be the name of the constant in the |
| :mod:`ssl` module or its abbreviation. |
| (So you can specify `REQUIRED` instead of `CERT_REQUIRED`. |
| If it's neither `None` nor a string we assume it is already the numeric |
| constant which can directly be passed to wrap_socket. |
| """ |
| if candidate is None: |
| return CERT_REQUIRED |
| |
| if isinstance(candidate, str): |
| res = getattr(ssl, candidate, None) |
| if res is None: |
| res = getattr(ssl, "CERT_" + candidate) |
| return res # type: ignore[no-any-return] |
| |
| return candidate # type: ignore[return-value] |
| |
| |
| def resolve_ssl_version(candidate: None | int | str) -> int: |
| """ |
| like resolve_cert_reqs |
| """ |
| if candidate is None: |
| return PROTOCOL_TLS |
| |
| if isinstance(candidate, str): |
| res = getattr(ssl, candidate, None) |
| if res is None: |
| res = getattr(ssl, "PROTOCOL_" + candidate) |
| return typing.cast(int, res) |
| |
| return candidate |
| |
| |
| def create_urllib3_context( |
| ssl_version: int | None = None, |
| cert_reqs: int | None = None, |
| options: int | None = None, |
| ciphers: str | None = None, |
| ssl_minimum_version: int | None = None, |
| ssl_maximum_version: int | None = None, |
| ) -> ssl.SSLContext: |
| """Creates and configures an :class:`ssl.SSLContext` instance for use with urllib3. |
| |
| :param ssl_version: |
| The desired protocol version to use. This will default to |
| PROTOCOL_SSLv23 which will negotiate the highest protocol that both |
| the server and your installation of OpenSSL support. |
| |
| This parameter is deprecated instead use 'ssl_minimum_version'. |
| :param ssl_minimum_version: |
| The minimum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. |
| :param ssl_maximum_version: |
| The maximum version of TLS to be used. Use the 'ssl.TLSVersion' enum for specifying the value. |
| Not recommended to set to anything other than 'ssl.TLSVersion.MAXIMUM_SUPPORTED' which is the |
| default value. |
| :param cert_reqs: |
| Whether to require the certificate verification. This defaults to |
| ``ssl.CERT_REQUIRED``. |
| :param options: |
| Specific OpenSSL options. These default to ``ssl.OP_NO_SSLv2``, |
| ``ssl.OP_NO_SSLv3``, ``ssl.OP_NO_COMPRESSION``, and ``ssl.OP_NO_TICKET``. |
| :param ciphers: |
| Which cipher suites to allow the server to select. Defaults to either system configured |
| ciphers if OpenSSL 1.1.1+, otherwise uses a secure default set of ciphers. |
| :returns: |
| Constructed SSLContext object with specified options |
| :rtype: SSLContext |
| """ |
| if SSLContext is None: |
| raise TypeError("Can't create an SSLContext object without an ssl module") |
| |
| # This means 'ssl_version' was specified as an exact value. |
| if ssl_version not in (None, PROTOCOL_TLS, PROTOCOL_TLS_CLIENT): |
| # Disallow setting 'ssl_version' and 'ssl_minimum|maximum_version' |
| # to avoid conflicts. |
| if ssl_minimum_version is not None or ssl_maximum_version is not None: |
| raise ValueError( |
| "Can't specify both 'ssl_version' and either " |
| "'ssl_minimum_version' or 'ssl_maximum_version'" |
| ) |
| |
| # 'ssl_version' is deprecated and will be removed in the future. |
| else: |
| # Use 'ssl_minimum_version' and 'ssl_maximum_version' instead. |
| ssl_minimum_version = _SSL_VERSION_TO_TLS_VERSION.get( |
| ssl_version, TLSVersion.MINIMUM_SUPPORTED |
| ) |
| ssl_maximum_version = _SSL_VERSION_TO_TLS_VERSION.get( |
| ssl_version, TLSVersion.MAXIMUM_SUPPORTED |
| ) |
| |
| # This warning message is pushing users to use 'ssl_minimum_version' |
| # instead of both min/max. Best practice is to only set the minimum version and |
| # keep the maximum version to be it's default value: 'TLSVersion.MAXIMUM_SUPPORTED' |
| warnings.warn( |
| "'ssl_version' option is deprecated and will be " |
| "removed in urllib3 v2.1.0. Instead use 'ssl_minimum_version'", |
| category=DeprecationWarning, |
| stacklevel=2, |
| ) |
| |
| # PROTOCOL_TLS is deprecated in Python 3.10 so we always use PROTOCOL_TLS_CLIENT |
| context = SSLContext(PROTOCOL_TLS_CLIENT) |
| |
| if ssl_minimum_version is not None: |
| context.minimum_version = ssl_minimum_version |
| else: # Python <3.10 defaults to 'MINIMUM_SUPPORTED' so explicitly set TLSv1.2 here |
| context.minimum_version = TLSVersion.TLSv1_2 |
| |
| if ssl_maximum_version is not None: |
| context.maximum_version = ssl_maximum_version |
| |
| # Unless we're given ciphers defer to either system ciphers in |
| # the case of OpenSSL 1.1.1+ or use our own secure default ciphers. |
| if ciphers: |
| context.set_ciphers(ciphers) |
| |
| # Setting the default here, as we may have no ssl module on import |
| cert_reqs = ssl.CERT_REQUIRED if cert_reqs is None else cert_reqs |
| |
| if options is None: |
| options = 0 |
| # SSLv2 is easily broken and is considered harmful and dangerous |
| options |= OP_NO_SSLv2 |
| # SSLv3 has several problems and is now dangerous |
| options |= OP_NO_SSLv3 |
| # Disable compression to prevent CRIME attacks for OpenSSL 1.0+ |
| # (issue #309) |
| options |= OP_NO_COMPRESSION |
| # TLSv1.2 only. Unless set explicitly, do not request tickets. |
| # This may save some bandwidth on wire, and although the ticket is encrypted, |
| # there is a risk associated with it being on wire, |
| # if the server is not rotating its ticketing keys properly. |
| options |= OP_NO_TICKET |
| |
| context.options |= options |
| |
| # Enable post-handshake authentication for TLS 1.3, see GH #1634. PHA is |
| # necessary for conditional client cert authentication with TLS 1.3. |
| # The attribute is None for OpenSSL <= 1.1.0 or does not exist in older |
| # versions of Python. We only enable on Python 3.7.4+ or if certificate |
| # verification is enabled to work around Python issue #37428 |
| # See: https://bugs.python.org/issue37428 |
| if (cert_reqs == ssl.CERT_REQUIRED or sys.version_info >= (3, 7, 4)) and getattr( |
| context, "post_handshake_auth", None |
| ) is not None: |
| context.post_handshake_auth = True |
| |
| # The order of the below lines setting verify_mode and check_hostname |
| # matter due to safe-guards SSLContext has to prevent an SSLContext with |
| # check_hostname=True, verify_mode=NONE/OPTIONAL. |
| # We always set 'check_hostname=False' for pyOpenSSL so we rely on our own |
| # 'ssl.match_hostname()' implementation. |
| if cert_reqs == ssl.CERT_REQUIRED and not IS_PYOPENSSL: |
| context.verify_mode = cert_reqs |
| context.check_hostname = True |
| else: |
| context.check_hostname = False |
| context.verify_mode = cert_reqs |
| |
| try: |
| context.hostname_checks_common_name = False |
| except AttributeError: # Defensive: for CPython < 3.8.9 and 3.9.3; for PyPy < 7.3.8 |
| pass |
| |
| # Enable logging of TLS session keys via defacto standard environment variable |
| # 'SSLKEYLOGFILE', if the feature is available (Python 3.8+). Skip empty values. |
| if hasattr(context, "keylog_filename"): |
| sslkeylogfile = os.environ.get("SSLKEYLOGFILE") |
| if sslkeylogfile: |
| context.keylog_filename = sslkeylogfile |
| |
| return context |
| |
| |
| @typing.overload |
| def ssl_wrap_socket( |
| sock: socket.socket, |
| keyfile: str | None = ..., |
| certfile: str | None = ..., |
| cert_reqs: int | None = ..., |
| ca_certs: str | None = ..., |
| server_hostname: str | None = ..., |
| ssl_version: int | None = ..., |
| ciphers: str | None = ..., |
| ssl_context: ssl.SSLContext | None = ..., |
| ca_cert_dir: str | None = ..., |
| key_password: str | None = ..., |
| ca_cert_data: None | str | bytes = ..., |
| tls_in_tls: Literal[False] = ..., |
| ) -> ssl.SSLSocket: |
| ... |
| |
| |
| @typing.overload |
| def ssl_wrap_socket( |
| sock: socket.socket, |
| keyfile: str | None = ..., |
| certfile: str | None = ..., |
| cert_reqs: int | None = ..., |
| ca_certs: str | None = ..., |
| server_hostname: str | None = ..., |
| ssl_version: int | None = ..., |
| ciphers: str | None = ..., |
| ssl_context: ssl.SSLContext | None = ..., |
| ca_cert_dir: str | None = ..., |
| key_password: str | None = ..., |
| ca_cert_data: None | str | bytes = ..., |
| tls_in_tls: bool = ..., |
| ) -> ssl.SSLSocket | SSLTransportType: |
| ... |
| |
| |
| def ssl_wrap_socket( |
| sock: socket.socket, |
| keyfile: str | None = None, |
| certfile: str | None = None, |
| cert_reqs: int | None = None, |
| ca_certs: str | None = None, |
| server_hostname: str | None = None, |
| ssl_version: int | None = None, |
| ciphers: str | None = None, |
| ssl_context: ssl.SSLContext | None = None, |
| ca_cert_dir: str | None = None, |
| key_password: str | None = None, |
| ca_cert_data: None | str | bytes = None, |
| tls_in_tls: bool = False, |
| ) -> ssl.SSLSocket | SSLTransportType: |
| """ |
| All arguments except for server_hostname, ssl_context, and ca_cert_dir have |
| the same meaning as they do when using :func:`ssl.wrap_socket`. |
| |
| :param server_hostname: |
| When SNI is supported, the expected hostname of the certificate |
| :param ssl_context: |
| A pre-made :class:`SSLContext` object. If none is provided, one will |
| be created using :func:`create_urllib3_context`. |
| :param ciphers: |
| A string of ciphers we wish the client to support. |
| :param ca_cert_dir: |
| A directory containing CA certificates in multiple separate files, as |
| supported by OpenSSL's -CApath flag or the capath argument to |
| SSLContext.load_verify_locations(). |
| :param key_password: |
| Optional password if the keyfile is encrypted. |
| :param ca_cert_data: |
| Optional string containing CA certificates in PEM format suitable for |
| passing as the cadata parameter to SSLContext.load_verify_locations() |
| :param tls_in_tls: |
| Use SSLTransport to wrap the existing socket. |
| """ |
| context = ssl_context |
| if context is None: |
| # Note: This branch of code and all the variables in it are only used in tests. |
| # We should consider deprecating and removing this code. |
| context = create_urllib3_context(ssl_version, cert_reqs, ciphers=ciphers) |
| |
| if ca_certs or ca_cert_dir or ca_cert_data: |
| try: |
| context.load_verify_locations(ca_certs, ca_cert_dir, ca_cert_data) |
| except OSError as e: |
| raise SSLError(e) from e |
| |
| elif ssl_context is None and hasattr(context, "load_default_certs"): |
| # try to load OS default certs; works well on Windows. |
| context.load_default_certs() |
| |
| # Attempt to detect if we get the goofy behavior of the |
| # keyfile being encrypted and OpenSSL asking for the |
| # passphrase via the terminal and instead error out. |
| if keyfile and key_password is None and _is_key_file_encrypted(keyfile): |
| raise SSLError("Client private key is encrypted, password is required") |
| |
| if certfile: |
| if key_password is None: |
| context.load_cert_chain(certfile, keyfile) |
| else: |
| context.load_cert_chain(certfile, keyfile, key_password) |
| |
| try: |
| context.set_alpn_protocols(ALPN_PROTOCOLS) |
| except NotImplementedError: # Defensive: in CI, we always have set_alpn_protocols |
| pass |
| |
| ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, server_hostname) |
| return ssl_sock |
| |
| |
| def is_ipaddress(hostname: str | bytes) -> bool: |
| """Detects whether the hostname given is an IPv4 or IPv6 address. |
| Also detects IPv6 addresses with Zone IDs. |
| |
| :param str hostname: Hostname to examine. |
| :return: True if the hostname is an IP address, False otherwise. |
| """ |
| if isinstance(hostname, bytes): |
| # IDN A-label bytes are ASCII compatible. |
| hostname = hostname.decode("ascii") |
| return bool(_IPV4_RE.match(hostname) or _BRACELESS_IPV6_ADDRZ_RE.match(hostname)) |
| |
| |
| def _is_key_file_encrypted(key_file: str) -> bool: |
| """Detects if a key file is encrypted or not.""" |
| with open(key_file) as f: |
| for line in f: |
| # Look for Proc-Type: 4,ENCRYPTED |
| if "ENCRYPTED" in line: |
| return True |
| |
| return False |
| |
| |
| def _ssl_wrap_socket_impl( |
| sock: socket.socket, |
| ssl_context: ssl.SSLContext, |
| tls_in_tls: bool, |
| server_hostname: str | None = None, |
| ) -> ssl.SSLSocket | SSLTransportType: |
| if tls_in_tls: |
| if not SSLTransport: |
| # Import error, ssl is not available. |
| raise ProxySchemeUnsupported( |
| "TLS in TLS requires support for the 'ssl' module" |
| ) |
| |
| SSLTransport._validate_ssl_context_for_tls_in_tls(ssl_context) |
| return SSLTransport(sock, ssl_context, server_hostname) |
| |
| return ssl_context.wrap_socket(sock, server_hostname=server_hostname) |