Skip to content
53 changes: 53 additions & 0 deletions vertica_python/tests/integration_tests/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,59 @@ def test_sslcontext_verify_full(self):
res = self._query_and_fetchone(self.SSL_STATE_SQL)
self.assertEqual(res[0], 'Server')

def _get_tls_version(self, conn):
sock = getattr(conn, '_socket', None)
if not sock:
return None

if hasattr(sock, 'version') and callable(sock.version):
return sock.version()

ssl_obj = getattr(sock, '_sslobj', None)
if ssl_obj and hasattr(ssl_obj, 'version'):
return ssl_obj.version()

return None

def test_tls13_support_auto_negotiation(self):
"""
Verify that the client supports TLS 1.3 negotiation.
If the server supports TLS 1.3, the connection should establish using it.
If the server supports only TLS 1.2, the connection should still succeed.
"""

# Set up server certificates and enable TLS
try:
CA_cert = self._generate_and_set_certificates()
except Exception:
self.skipTest("Failed to generate CA certificates; skipping TLS test")

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
ssl_context.load_verify_locations(cadata=CA_cert)

self._conn_info['ssl'] = ssl_context
self._conn_info['tlsmode'] = 'require'

with self._connect() as conn:
# First ensure TLS really got enabled on server
res = self._query_and_fetchone(self.SSL_STATE_SQL)
if res[0] != 'Server':
self.skipTest("TLS is not configured on server")

# Prefer public API, fall back only if needed
tls_version = self._get_tls_version(conn)

if tls_version is None:
self.skipTest("Could not determine negotiated TLS version.")

self.assertIn(
tls_version,
("TLSv1.2", "TLSv1.3"),
msg=f"Unexpected TLS version negotiated: {tls_version}"
)

def test_sslcontext_mutual_TLS(self):
# Setting certificates with TLS configuration
CA_cert = self._generate_and_set_certificates(mutual_mode=True)
Expand Down