Skip to content

Commit 5b0f635

Browse files
authored
SNOW-1625324: improve error handling for async query (snowflakedb#2035)
1 parent 505e389 commit 5b0f635

File tree

4 files changed

+60
-32
lines changed

4 files changed

+60
-32
lines changed

DESCRIPTION.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
88

99
# Release Notes
1010

11-
- v3.12.2 (TBD)
11+
- v3.12.2(TBD)
12+
- Enhanced error handling for asynchronous queries, providing more detailed and informative error messages when an async query fails.
1213
- Improved implementation of `snowflake.connector.util_text.random_string` to avoid collisions.
1314
- If the account specifies a region, and that region is in China, the TLD is now inferred to be snowflakecomputing.cn.
1415
- Changed loglevel to WARNING for OCSP fail-open warning messages (was: ERROR)

src/snowflake/connector/connection.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,6 +1817,39 @@ def _close_at_exit(self):
18171817
with suppress(Exception):
18181818
self.close(retry=False)
18191819

1820+
def _process_error_query_status(
1821+
self,
1822+
sf_qid: str,
1823+
status_resp: dict,
1824+
error_message: str = "",
1825+
error_cls: type[Exception] = ProgrammingError,
1826+
) -> None:
1827+
status_resp = status_resp or {}
1828+
data = status_resp.get("data", {})
1829+
queries = data.get("queries")
1830+
1831+
if sf_qid in self._async_sfqids:
1832+
self._async_sfqids.pop(sf_qid, None)
1833+
message = status_resp.get("message")
1834+
if message is None:
1835+
message = ""
1836+
code = queries[0].get("errorCode", -1) if queries else -1
1837+
sql_state = None
1838+
if "data" in status_resp:
1839+
message += queries[0].get("errorMessage", "") if queries else ""
1840+
sql_state = data.get("sqlState")
1841+
Error.errorhandler_wrapper(
1842+
self,
1843+
None,
1844+
error_cls,
1845+
{
1846+
"msg": message or error_message,
1847+
"errno": int(code),
1848+
"sqlstate": sql_state,
1849+
"sfqid": sf_qid,
1850+
},
1851+
)
1852+
18201853
def get_query_status(self, sf_qid: str) -> QueryStatus:
18211854
"""Retrieves the status of query with sf_qid.
18221855
@@ -1845,31 +1878,8 @@ def get_query_status_throw_if_error(self, sf_qid: str) -> QueryStatus:
18451878
"""
18461879
status, status_resp = self._get_query_status(sf_qid)
18471880
self._cache_query_status(sf_qid, status)
1848-
queries = status_resp["data"]["queries"]
18491881
if self.is_an_error(status):
1850-
if sf_qid in self._async_sfqids:
1851-
self._async_sfqids.pop(sf_qid, None)
1852-
message = status_resp.get("message")
1853-
if message is None:
1854-
message = ""
1855-
code = queries[0].get("errorCode", -1)
1856-
sql_state = None
1857-
if "data" in status_resp:
1858-
message += (
1859-
queries[0].get("errorMessage", "") if len(queries) > 0 else ""
1860-
)
1861-
sql_state = status_resp["data"].get("sqlState")
1862-
Error.errorhandler_wrapper(
1863-
self,
1864-
None,
1865-
ProgrammingError,
1866-
{
1867-
"msg": message,
1868-
"errno": int(code),
1869-
"sqlstate": sql_state,
1870-
"sfqid": sf_qid,
1871-
},
1872-
)
1882+
self._process_error_query_status(sf_qid, status_resp)
18731883
return status
18741884

18751885
def initialize_query_context_cache(self) -> None:

src/snowflake/connector/cursor.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,7 +1638,8 @@ def wait_until_ready() -> None:
16381638
no_data_counter = 0
16391639
retry_pattern_pos = 0
16401640
while True:
1641-
status = self.connection.get_query_status(sfqid)
1641+
status, status_resp = self.connection._get_query_status(sfqid)
1642+
self.connection._cache_query_status(sfqid, status)
16421643
if not self.connection.is_still_running(status):
16431644
break
16441645
if status == QueryStatus.NO_DATA: # pragma: no cover
@@ -1655,10 +1656,12 @@ def wait_until_ready() -> None:
16551656
if retry_pattern_pos < (len(ASYNC_RETRY_PATTERN) - 1):
16561657
retry_pattern_pos += 1
16571658
if status != QueryStatus.SUCCESS:
1658-
raise DatabaseError(
1659-
"Status of query '{}' is {}, results are unavailable".format(
1660-
sfqid, status.name
1661-
)
1659+
logger.info(f"Status of query '{sfqid}' is {status.name}")
1660+
self.connection._process_error_query_status(
1661+
sfqid,
1662+
status_resp,
1663+
error_message=f"Status of query '{sfqid}' is {status.name}, results are unavailable",
1664+
error_cls=DatabaseError,
16621665
)
16631666
self._inner_cursor.execute(f"select * from table(result_scan('{sfqid}'))")
16641667
self._result = self._inner_cursor._result

test/integ/test_async.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55

66
from __future__ import annotations
77

8+
import logging
89
import time
910

1011
import pytest
1112

12-
from snowflake.connector import ProgrammingError
13+
from snowflake.connector import DatabaseError, ProgrammingError
1314

1415
# Mark all tests in this file to time out after 2 minutes to prevent hanging forever
1516
pytestmark = [pytest.mark.timeout(120), pytest.mark.skipolddriver]
@@ -91,7 +92,7 @@ def test_async_exec(conn_cnx):
9192
assert len(cur.fetchall()) == 1
9293

9394

94-
def test_async_error(conn_cnx):
95+
def test_async_error(conn_cnx, caplog):
9596
"""Tests whether simple async query error retrieval works.
9697
9798
Runs a query that will fail to execute and then tests that if we tried to get results for the query
@@ -116,6 +117,19 @@ def test_async_error(conn_cnx):
116117
cur.get_results_from_sfqid(q_id)
117118
assert e1.value.errno == e2.value.errno == sync_error.value.errno
118119

120+
sfqid = cur.execute_async("SELECT SYSTEM$WAIT(2)")["queryId"]
121+
cur.get_results_from_sfqid(sfqid)
122+
with con.cursor() as cancel_cursor:
123+
# use separate cursor to cancel as execute will overwrite the previous query status
124+
cancel_cursor.execute(f"SELECT SYSTEM$CANCEL_QUERY('{sfqid}')")
125+
with pytest.raises(DatabaseError) as e3, caplog.at_level(logging.INFO):
126+
cur.fetchall()
127+
assert (
128+
"SQL execution canceled" in e3.value.msg
129+
and f"Status of query '{sfqid}' is {QueryStatus.FAILED_WITH_ERROR.name}"
130+
in caplog.text
131+
)
132+
119133

120134
def test_mix_sync_async(conn_cnx):
121135
with conn_cnx() as con:

0 commit comments

Comments
 (0)