Skip to content

Commit 53e0165

Browse files
Support WIF Impersonation on GCP workloads (snowflakedb#2496)
Co-authored-by: Peter Mansour <peter.mansour@snowflake.com>
1 parent 8c32f5e commit 53e0165

File tree

8 files changed

+240
-11
lines changed

8 files changed

+240
-11
lines changed

DESCRIPTION.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ https://docs.snowflake.com/
77
Source code is also available at: https://github.com/snowflakedb/snowflake-connector-python
88

99
# Release Notes
10+
- v3.18.0(TBD)
11+
- Added the `workload_identity_impersonation_path` parameter to support service account impersonation for Workload Identity Federation on GCP workloads only
12+
1013
- v3.17.3(September 02,2025)
1114
- Enhanced configuration file permission warning messages.
1215
- Improved warning messages for readable permission issues to include clear instructions on how to skip warnings using the `SF_SKIP_WARNING_FOR_READ_PERMISSIONS_ON_CONFIG_FILE` environment variable.

src/snowflake/connector/auth/workload_identity.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ def __init__(
5555
provider: AttestationProvider | None = None,
5656
token: str | None = None,
5757
entra_resource: str | None = None,
58+
impersonation_path: list[str] | None = None,
5859
**kwargs,
5960
) -> None:
6061
super().__init__(**kwargs)
6162
self.provider = provider
6263
self.token = token
6364
self.entra_resource = entra_resource
65+
self.impersonation_path = impersonation_path
6466

6567
self.attestation: WorkloadIdentityAttestation | None = None
6668

@@ -85,6 +87,7 @@ def prepare(
8587
self.provider,
8688
self.entra_resource,
8789
self.token,
90+
self.impersonation_path,
8891
session_manager=(
8992
conn._session_manager.clone(max_retries=0) if conn else None
9093
),

src/snowflake/connector/connection.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ def _get_private_bytes_from_file(
214214
"authenticator": (DEFAULT_AUTHENTICATOR, (type(None), str)),
215215
"workload_identity_provider": (None, (type(None), AttestationProvider)),
216216
"workload_identity_entra_resource": (None, (type(None), str)),
217+
"workload_identity_impersonation_path": (None, (type(None), list[str])),
217218
"mfa_callback": (None, (type(None), Callable)),
218219
"password_callback": (None, (type(None), Callable)),
219220
"auth_class": (None, (type(None), AuthByPlugin)),
@@ -1355,10 +1356,24 @@ def __open_connection(self):
13551356
"errno": ER_INVALID_WIF_SETTINGS,
13561357
},
13571358
)
1359+
if (
1360+
self._workload_identity_impersonation_path
1361+
and self._workload_identity_provider != AttestationProvider.GCP
1362+
):
1363+
Error.errorhandler_wrapper(
1364+
self,
1365+
None,
1366+
ProgrammingError,
1367+
{
1368+
"msg": "workload_identity_impersonation_path is currently only supported for GCP.",
1369+
"errno": ER_INVALID_WIF_SETTINGS,
1370+
},
1371+
)
13581372
self.auth_class = AuthByWorkloadIdentity(
13591373
provider=self._workload_identity_provider,
13601374
token=self._token,
13611375
entra_resource=self._workload_identity_entra_resource,
1376+
impersonation_path=self._workload_identity_impersonation_path,
13621377
)
13631378
else:
13641379
# okta URL, e.g., https://<account>.okta.com/
@@ -1531,6 +1546,7 @@ def __config(self, **kwargs):
15311546
workload_identity_dependent_options = [
15321547
"workload_identity_provider",
15331548
"workload_identity_entra_resource",
1549+
"workload_identity_impersonation_path",
15341550
]
15351551
for dependent_option in workload_identity_dependent_options:
15361552
if (

src/snowflake/connector/wif_util.py

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
logger = logging.getLogger(__name__)
2121
SNOWFLAKE_AUDIENCE = "snowflakecomputing.com"
2222
DEFAULT_ENTRA_SNOWFLAKE_RESOURCE = "api://fd3f753b-eed3-462c-b6a7-a4b5bb650aad"
23+
GCP_METADATA_SERVICE_ACCOUNT_BASE_URL = (
24+
"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default"
25+
)
2326

2427

2528
@unique
@@ -184,29 +187,103 @@ def create_aws_attestation(
184187
)
185188

186189

187-
def create_gcp_attestation(
188-
session_manager: SessionManager | None = None,
189-
) -> WorkloadIdentityAttestation:
190-
"""Tries to create a workload identity attestation for GCP.
190+
def get_gcp_access_token(session_manager: SessionManager) -> str:
191+
"""Gets a GCP access token from the metadata server.
192+
193+
If the application isn't running on GCP or no credentials were found, raises an error.
194+
"""
195+
try:
196+
res = session_manager.request(
197+
method="GET",
198+
url=f"{GCP_METADATA_SERVICE_ACCOUNT_BASE_URL}/token",
199+
headers={
200+
"Metadata-Flavor": "Google",
201+
},
202+
)
203+
res.raise_for_status()
204+
return res.json()["access_token"]
205+
except Exception as e:
206+
raise ProgrammingError(
207+
msg=f"Error fetching GCP access token: {e}. Ensure the application is running on GCP.",
208+
errno=ER_WIF_CREDENTIALS_NOT_FOUND,
209+
)
210+
211+
212+
def get_gcp_identity_token_via_impersonation(
213+
impersonation_path: list[str], session_manager: SessionManager
214+
) -> str:
215+
"""Gets a GCP identity token from the metadata server.
216+
217+
If the application isn't running on GCP or no credentials were found, raises an error.
218+
"""
219+
if not impersonation_path:
220+
raise ProgrammingError(
221+
msg="Error: impersonation_path cannot be empty.",
222+
errno=ER_WIF_CREDENTIALS_NOT_FOUND,
223+
)
224+
225+
current_sa_token = get_gcp_access_token(session_manager)
226+
impersonation_path = [
227+
f"projects/-/serviceAccounts/{client_id}" for client_id in impersonation_path
228+
]
229+
try:
230+
res = session_manager.post(
231+
url=f"https://iamcredentials.googleapis.com/v1/{impersonation_path[-1]}:generateIdToken",
232+
headers={
233+
"Authorization": f"Bearer {current_sa_token}",
234+
"Content-Type": "application/json",
235+
},
236+
json={
237+
"delegates": impersonation_path[:-1],
238+
"audience": SNOWFLAKE_AUDIENCE,
239+
},
240+
)
241+
res.raise_for_status()
242+
return res.json()["token"]
243+
except Exception as e:
244+
raise ProgrammingError(
245+
msg=f"Error fetching GCP identity token for impersonated GCP service account '{impersonation_path[-1]}': {e}. Ensure the application is running on GCP.",
246+
errno=ER_WIF_CREDENTIALS_NOT_FOUND,
247+
)
248+
249+
250+
def get_gcp_identity_token(session_manager: SessionManager) -> str:
251+
"""Gets a GCP identity token from the metadata server.
191252
192253
If the application isn't running on GCP or no credentials were found, raises an error.
193254
"""
194255
try:
195256
res = session_manager.request(
196257
method="GET",
197-
url=f"http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/identity?audience={SNOWFLAKE_AUDIENCE}",
258+
url=f"{GCP_METADATA_SERVICE_ACCOUNT_BASE_URL}/identity?audience={SNOWFLAKE_AUDIENCE}",
198259
headers={
199260
"Metadata-Flavor": "Google",
200261
},
201262
)
202263
res.raise_for_status()
264+
return res.content.decode("utf-8")
203265
except Exception as e:
204266
raise ProgrammingError(
205-
msg=f"Error fetching GCP metadata: {e}. Ensure the application is running on GCP.",
267+
msg=f"Error fetching GCP identity token: {e}. Ensure the application is running on GCP.",
206268
errno=ER_WIF_CREDENTIALS_NOT_FOUND,
207269
)
208270

209-
jwt_str = res.content.decode("utf-8")
271+
272+
def create_gcp_attestation(
273+
session_manager: SessionManager,
274+
impersonation_path: list[str] | None = None,
275+
) -> WorkloadIdentityAttestation:
276+
"""Tries to create a workload identity attestation for GCP.
277+
278+
If the application isn't running on GCP or no credentials were found, raises an error.
279+
"""
280+
if impersonation_path:
281+
jwt_str = get_gcp_identity_token_via_impersonation(
282+
impersonation_path, session_manager
283+
)
284+
else:
285+
jwt_str = get_gcp_identity_token(session_manager)
286+
210287
_, subject = extract_iss_and_sub_without_signature_verification(jwt_str)
211288
return WorkloadIdentityAttestation(
212289
AttestationProvider.GCP, jwt_str, {"sub": subject}
@@ -295,6 +372,7 @@ def create_attestation(
295372
provider: AttestationProvider,
296373
entra_resource: str | None = None,
297374
token: str | None = None,
375+
impersonation_path: list[str] | None = None,
298376
session_manager: SessionManager | None = None,
299377
) -> WorkloadIdentityAttestation:
300378
"""Entry point to create an attestation using the given provider.
@@ -313,7 +391,7 @@ def create_attestation(
313391
elif provider == AttestationProvider.AZURE:
314392
return create_azure_attestation(entra_resource, session_manager)
315393
elif provider == AttestationProvider.GCP:
316-
return create_gcp_attestation(session_manager)
394+
return create_gcp_attestation(session_manager, impersonation_path)
317395
elif provider == AttestationProvider.OIDC:
318396
return create_oidc_attestation(token)
319397
else:

test/csp_helpers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ def gen_dummy_id_token(
4040
)
4141

4242

43+
def gen_dummy_access_token(sub="test-subject") -> str:
44+
"""Generates a dummy access token using the given subject."""
45+
key = "secret"
46+
logger.debug(f"Generating dummy access token for subject {sub}")
47+
return (sub + key).encode("utf-8").hex()
48+
49+
4350
def build_response(content: bytes, status_code: int = 200, headers=None) -> Response:
4451
"""Builds a requests.Response object with the given status code and content."""
4552
response = Response()
@@ -285,6 +292,19 @@ def handle_request(self, method, parsed_url, headers, timeout):
285292
audience = query_string["audience"][0]
286293
self.token = gen_dummy_id_token(sub=self.sub, iss=self.iss, aud=audience)
287294
return build_response(self.token.encode("utf-8"))
295+
elif (
296+
method == "GET"
297+
and parsed_url.path
298+
== "/computeMetadata/v1/instance/service-accounts/default/token"
299+
and headers.get("Metadata-Flavor") == "Google"
300+
):
301+
self.token = gen_dummy_access_token(sub=self.sub)
302+
ret = {
303+
"access_token": self.token,
304+
"expires_in": 3599,
305+
"token_type": "Bearer",
306+
}
307+
return build_response(json.dumps(ret).encode("utf-8"))
288308
else:
289309
# Reject malformed requests.
290310
raise HTTPError()

test/unit/test_auth_workload_identity.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616
)
1717
from snowflake.connector.wif_util import AttestationProvider, get_aws_sts_hostname
1818

19-
from ..csp_helpers import FakeAwsEnvironment, FakeGceMetadataService, gen_dummy_id_token
19+
from ..csp_helpers import (
20+
FakeAwsEnvironment,
21+
FakeGceMetadataService,
22+
build_response,
23+
gen_dummy_access_token,
24+
gen_dummy_id_token,
25+
)
2026

2127
logger = logging.getLogger(__name__)
2228

@@ -288,7 +294,7 @@ def test_explicit_gcp_metadata_server_error_bubbles_up(exception):
288294
with pytest.raises(ProgrammingError) as excinfo:
289295
auth_class.prepare(conn=None)
290296

291-
assert "Error fetching GCP metadata:" in str(excinfo.value)
297+
assert "Error fetching GCP identity token:" in str(excinfo.value)
292298
assert "Ensure the application is running on GCP." in str(excinfo.value)
293299

294300

@@ -316,6 +322,44 @@ def test_explicit_gcp_generates_unique_assertion_content(
316322
assert auth_class.assertion_content == '{"_provider":"GCP","sub":"123456"}'
317323

318324

325+
@mock.patch("snowflake.connector.session_manager.SessionManager.post")
326+
def test_gcp_calls_correct_apis_and_populates_auth_data_for_final_sa(
327+
mock_post_request, fake_gce_metadata_service: FakeGceMetadataService
328+
):
329+
fake_gce_metadata_service.sub = "sa1"
330+
impersonation_path = ["sa2", "sa3"]
331+
sa1_access_token = gen_dummy_access_token("sa1")
332+
sa3_id_token = gen_dummy_id_token("sa3")
333+
334+
mock_post_request.return_value = build_response(
335+
json.dumps({"token": sa3_id_token}).encode("utf-8")
336+
)
337+
338+
auth_class = AuthByWorkloadIdentity(
339+
provider=AttestationProvider.GCP, impersonation_path=impersonation_path
340+
)
341+
auth_class.prepare(conn=None)
342+
343+
mock_post_request.assert_called_once_with(
344+
url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/sa3:generateIdToken",
345+
headers={
346+
"Authorization": f"Bearer {sa1_access_token}",
347+
"Content-Type": "application/json",
348+
},
349+
json={
350+
"delegates": ["projects/-/serviceAccounts/sa2"],
351+
"audience": "snowflakecomputing.com",
352+
},
353+
)
354+
355+
assert auth_class.assertion_content == '{"_provider":"GCP","sub":"sa3"}'
356+
assert extract_api_data(auth_class) == {
357+
"AUTHENTICATOR": "WORKLOAD_IDENTITY",
358+
"PROVIDER": "GCP",
359+
"TOKEN": sa3_id_token,
360+
}
361+
362+
319363
# -- Azure Tests --
320364

321365

test/unit/test_connection.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ def test_otel_error_message(caplog, mock_post_requests):
631631
"workload_identity_entra_resource",
632632
"api://0b2f151f-09a2-46eb-ad5a-39d5ebef917b",
633633
),
634+
("workload_identity_impersonation_path", ["subject-b", "subject-c"]),
634635
],
635636
)
636637
def test_cannot_set_dependent_params_without_wlid_authenticator(
@@ -677,6 +678,71 @@ def test_workload_identity_provider_is_required_for_wif_authenticator(
677678
)
678679

679680

681+
@pytest.mark.parametrize(
682+
"provider_param",
683+
[
684+
# Strongly-typed values.
685+
AttestationProvider.AWS,
686+
AttestationProvider.AZURE,
687+
AttestationProvider.OIDC,
688+
# String values.
689+
"AWS",
690+
"AZURE",
691+
"OIDC",
692+
],
693+
)
694+
def test_workload_identity_impersonation_path_unsupported_for_non_gcp_providers(
695+
monkeypatch, provider_param
696+
):
697+
with monkeypatch.context() as m:
698+
m.setattr(
699+
"snowflake.connector.SnowflakeConnection._authenticate", lambda *_: None
700+
)
701+
702+
with pytest.raises(ProgrammingError) as excinfo:
703+
snowflake.connector.connect(
704+
account="account",
705+
authenticator="WORKLOAD_IDENTITY",
706+
workload_identity_provider=provider_param,
707+
workload_identity_impersonation_path=[
708+
"sa2@project.iam.gserviceaccount.com"
709+
],
710+
)
711+
assert (
712+
"workload_identity_impersonation_path is currently only supported for GCP."
713+
in str(excinfo.value)
714+
)
715+
716+
717+
@pytest.mark.parametrize(
718+
"provider_param",
719+
[
720+
AttestationProvider.GCP,
721+
"GCP",
722+
],
723+
)
724+
def test_workload_identity_impersonation_path_supported_for_gcp_provider(
725+
monkeypatch, provider_param
726+
):
727+
with monkeypatch.context() as m:
728+
m.setattr(
729+
"snowflake.connector.SnowflakeConnection._authenticate", lambda *_: None
730+
)
731+
732+
conn = snowflake.connector.connect(
733+
account="account",
734+
authenticator="WORKLOAD_IDENTITY",
735+
workload_identity_provider=provider_param,
736+
workload_identity_impersonation_path=[
737+
"sa2@project.iam.gserviceaccount.com"
738+
],
739+
)
740+
assert conn.auth_class.provider == AttestationProvider.GCP
741+
assert conn.auth_class.impersonation_path == [
742+
"sa2@project.iam.gserviceaccount.com"
743+
]
744+
745+
680746
@pytest.mark.parametrize(
681747
"provider_param, parsed_provider",
682748
[

0 commit comments

Comments
 (0)