diff --git a/vertica_python/tests/integration_tests/test_tls.py b/vertica_python/tests/integration_tests/test_tls.py index f975ae52..4e1e772f 100644 --- a/vertica_python/tests/integration_tests/test_tls.py +++ b/vertica_python/tests/integration_tests/test_tls.py @@ -298,6 +298,59 @@ def test_sslcontext_verify_full(self): res = self._query_and_fetchone(self.SSL_STATE_SQL) self.assertEqual(res[0], 'Server') + def _get_tls_version(self, conn): + sock = getattr(conn, '_socket', None) + if not sock: + return None + + if hasattr(sock, 'version') and callable(sock.version): + return sock.version() + + ssl_obj = getattr(sock, '_sslobj', None) + if ssl_obj and hasattr(ssl_obj, 'version'): + return ssl_obj.version() + + return None + + def test_tls13_support_auto_negotiation(self): + """ + Verify that the client supports TLS 1.3 negotiation. + If the server supports TLS 1.3, the connection should establish using it. + If the server supports only TLS 1.2, the connection should still succeed. + """ + + # Set up server certificates and enable TLS + try: + CA_cert = self._generate_and_set_certificates() + except Exception: + self.skipTest("Failed to generate CA certificates; skipping TLS test") + + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(cadata=CA_cert) + + self._conn_info['ssl'] = ssl_context + self._conn_info['tlsmode'] = 'require' + + with self._connect() as conn: + # First ensure TLS really got enabled on server + res = self._query_and_fetchone(self.SSL_STATE_SQL) + if res[0] != 'Server': + self.skipTest("TLS is not configured on server") + + # Prefer public API, fall back only if needed + tls_version = self._get_tls_version(conn) + + if tls_version is None: + self.skipTest("Could not determine negotiated TLS version.") + + self.assertIn( + tls_version, + ("TLSv1.2", "TLSv1.3"), + msg=f"Unexpected TLS version negotiated: {tls_version}" + ) + def test_sslcontext_mutual_TLS(self): # Setting certificates with TLS configuration CA_cert = self._generate_and_set_certificates(mutual_mode=True)