1+ import os
2+ import time
3+ import unittest
4+
5+ import httpx
6+
7+ import cohere
8+
9+
10+ class TestConnectionPooling (unittest .TestCase ):
11+ """Test suite for HTTP connection pooling functionality."""
12+
13+ @classmethod
14+ def setUpClass (cls ):
15+ """Set up class-level fixtures."""
16+ # Check if API key is available for integration tests
17+ cls .api_key_available = bool (os .environ .get ("CO_API_KEY" ))
18+
19+ def test_httpx_client_creation_with_limits (self ):
20+ """Test that httpx clients can be created with our connection pooling limits."""
21+ # Test creating httpx client with limits (our implementation)
22+ client_with_limits = httpx .Client (
23+ timeout = 300 ,
24+ limits = httpx .Limits (
25+ max_keepalive_connections = 20 ,
26+ max_connections = 100 ,
27+ keepalive_expiry = 30.0 ,
28+ ),
29+ )
30+
31+ # Verify the client was created successfully
32+ self .assertIsNotNone (client_with_limits )
33+ self .assertIsInstance (client_with_limits , httpx .Client )
34+
35+ # The limits are applied internally - we can't directly access them
36+ # but we verify the client works correctly with our configuration
37+
38+ client_with_limits .close ()
39+
40+ def test_cohere_client_initialization (self ):
41+ """Test that Cohere clients can be initialized with connection pooling."""
42+ # Test with dummy API key - just verifies initialization works
43+ try :
44+ sync_client = cohere .Client (api_key = "dummy-key" )
45+ v2_client = cohere .ClientV2 (api_key = "dummy-key" )
46+
47+ # Verify clients were created
48+ self .assertIsNotNone (sync_client )
49+ self .assertIsNotNone (v2_client )
50+
51+ except Exception as e :
52+ # Should not fail due to httpx configuration
53+ if "httpx" in str (e ).lower () or "limits" in str (e ).lower ():
54+ self .fail (f"Failed to create client with connection pooling: { e } " )
55+
56+ def test_custom_httpx_client_with_pooling (self ):
57+ """Test that custom httpx clients with connection pooling work correctly."""
58+ # Create custom httpx client with explicit pooling configuration
59+ custom_client = httpx .Client (
60+ timeout = 30 ,
61+ limits = httpx .Limits (
62+ max_keepalive_connections = 10 ,
63+ max_connections = 50 ,
64+ keepalive_expiry = 20.0 ,
65+ ),
66+ )
67+
68+ # Create Cohere client with custom httpx client
69+ try :
70+ client = cohere .ClientV2 (api_key = "dummy-key" , httpx_client = custom_client )
71+ self .assertIsNotNone (client )
72+ finally :
73+ custom_client .close ()
74+
75+ def test_connection_pooling_vs_no_pooling_setup (self ):
76+ """Test creating clients with and without connection pooling."""
77+ # Create httpx client without pooling
78+ no_pool_httpx = httpx .Client (
79+ timeout = 30 ,
80+ limits = httpx .Limits (
81+ max_keepalive_connections = 0 ,
82+ max_connections = 1 ,
83+ keepalive_expiry = 0 ,
84+ ),
85+ )
86+
87+ # Verify both configurations work
88+ try :
89+ pooled_client = cohere .ClientV2 (api_key = "dummy-key" )
90+ no_pool_client = cohere .ClientV2 (api_key = "dummy-key" , httpx_client = no_pool_httpx )
91+
92+ self .assertIsNotNone (pooled_client )
93+ self .assertIsNotNone (no_pool_client )
94+
95+ finally :
96+ no_pool_httpx .close ()
97+
98+ @unittest .skipIf (not os .environ .get ("CO_API_KEY" ), "API key not available" )
99+ def test_multiple_requests_performance (self ):
100+ """Test that multiple requests benefit from connection pooling."""
101+ client = cohere .ClientV2 ()
102+
103+ response_times = []
104+
105+ # Make multiple requests
106+ for i in range (3 ):
107+ start_time = time .time ()
108+ try :
109+ response = client .chat (
110+ model = "command-r-plus-08-2024" ,
111+ messages = [{"role" : "user" , "content" : f"Say the number { i + 1 } " }],
112+ )
113+ elapsed = time .time () - start_time
114+ response_times .append (elapsed )
115+
116+ # Verify response
117+ self .assertIsNotNone (response )
118+ self .assertIsNotNone (response .message )
119+
120+ # Rate limit protection
121+ if i < 2 :
122+ time .sleep (2 )
123+
124+ except Exception as e :
125+ if "429" in str (e ) or "rate" in str (e ).lower ():
126+ self .skipTest ("Rate limited" )
127+ raise
128+
129+ # Verify all requests completed
130+ self .assertEqual (len (response_times ), 3 )
131+
132+ # Generally, subsequent requests should be faster due to connection reuse
133+ # First request establishes connection, subsequent ones reuse it
134+ print (f"Response times: { response_times } " )
135+
136+ @unittest .skipIf (not os .environ .get ("CO_API_KEY" ), "API key not available" )
137+ def test_streaming_with_pooling (self ):
138+ """Test that streaming works correctly with connection pooling."""
139+ client = cohere .ClientV2 ()
140+
141+ try :
142+ response = client .chat_stream (
143+ model = "command-r-plus-08-2024" ,
144+ messages = [{"role" : "user" , "content" : "Count to 3" }],
145+ )
146+
147+ chunks = []
148+ for event in response :
149+ if event .type == "content-delta" :
150+ chunks .append (event .delta .message .content .text )
151+
152+ # Verify streaming worked
153+ self .assertGreater (len (chunks ), 0 )
154+ full_response = "" .join (chunks )
155+ self .assertGreater (len (full_response ), 0 )
156+
157+ except Exception as e :
158+ if "429" in str (e ) or "rate" in str (e ).lower ():
159+ self .skipTest ("Rate limited" )
160+ raise
161+
162+
163+ if __name__ == "__main__" :
164+ unittest .main ()
0 commit comments