From 60ebb620b93eda6ca57cdba4794a3350ccd80ec0 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Wed, 24 Sep 2025 11:24:55 +0530 Subject: [PATCH 1/2] fixing bug --- mssql_python/cursor.py | 35 +---------------------------------- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 6365d559..c676ec7d 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -1474,35 +1474,6 @@ def columns(self, table=None, catalog=None, schema=None, column=None): # Use the helper method to prepare the result set return self._prepare_metadata_result_set(fallback_description=fallback_description) - @staticmethod - def _select_best_sample_value(column): - """ - Selects the most representative non-null value from a column for type inference. - - This is used during executemany() to infer SQL/C types based on actual data, - preferring a non-null value that is not the first row to avoid bias from placeholder defaults. - - Args: - column: List of values in the column. - """ - non_nulls = [v for v in column if v is not None] - if not non_nulls: - return None - if all(isinstance(v, int) for v in non_nulls): - # Pick the value with the widest range (min/max) - return max(non_nulls, key=lambda v: abs(v)) - if all(isinstance(v, float) for v in non_nulls): - return 0.0 - if all(isinstance(v, decimal.Decimal) for v in non_nulls): - return max(non_nulls, key=lambda d: len(d.as_tuple().digits)) - if all(isinstance(v, str) for v in non_nulls): - return max(non_nulls, key=lambda s: len(str(s))) - if all(isinstance(v, datetime.datetime) for v in non_nulls): - return datetime.datetime.now() - if all(isinstance(v, datetime.date) for v in non_nulls): - return datetime.date.today() - return non_nulls[0] # fallback - def _transpose_rowwise_to_columnwise(self, seq_of_parameters: list) -> tuple[list, int]: """ Convert sequence of rows (row-wise) into list of columns (column-wise), @@ -1675,11 +1646,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None: else: # Use auto-detection for columns without explicit types column = [row[col_index] for row in seq_of_parameters] if hasattr(seq_of_parameters, '__getitem__') else [] - if not column: - # For generators, use the sample row for inference - sample_value = sample_row[col_index] - else: - sample_value = self._select_best_sample_value(column) + sample_value, min_val, max_val = self._compute_column_type(column) dummy_row = list(sample_row) paraminfo = self._create_parameter_types_list( From 04a7842b60efccee5545258105e98bea6b629f56 Mon Sep 17 00:00:00 2001 From: gargsaumya Date: Mon, 29 Sep 2025 19:14:58 +0530 Subject: [PATCH 2/2] review commnets --- tests/test_004_cursor.py | 112 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 0b53d449..adf75c00 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -1262,6 +1262,118 @@ def test_executemany_binary_data_edge_cases(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_binary_test") db_connection.commit() +def test_executemany_mixed_ints(cursor, db_connection): + """Test executemany with mixed positive and negative integers.""" + try: + cursor.execute("CREATE TABLE #pytest_mixed_ints (val INT)") + data = [(1,), (-5,), (3,)] + cursor.executemany("INSERT INTO #pytest_mixed_ints VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT val FROM #pytest_mixed_ints ORDER BY val") + results = [row[0] for row in cursor.fetchall()] + assert sorted(results) == [-5, 1, 3] + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_mixed_ints") + db_connection.commit() + + +def test_executemany_int_edge_cases(cursor, db_connection): + """Test executemany with very large and very small integers.""" + try: + cursor.execute("CREATE TABLE #pytest_int_edges (val BIGINT)") + data = [(0,), (2**31-1,), (-2**31,), (2**63-1,), (-2**63,)] + cursor.executemany("INSERT INTO #pytest_int_edges VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT val FROM #pytest_int_edges ORDER BY val") + results = [row[0] for row in cursor.fetchall()] + assert results == sorted([0, 2**31-1, -2**31, 2**63-1, -2**63]) + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_int_edges") + db_connection.commit() + + +def test_executemany_bools_and_ints(cursor, db_connection): + """Test executemany with mix of booleans and integers.""" + try: + cursor.execute("CREATE TABLE #pytest_bool_int (val INT)") + data = [(True,), (False,), (2,)] + cursor.executemany("INSERT INTO #pytest_bool_int VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT val FROM #pytest_bool_int ORDER BY val") + results = [row[0] for row in cursor.fetchall()] + # True -> 1, False -> 0 + assert results == [0, 1, 2] + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_bool_int") + db_connection.commit() + + +def test_executemany_ints_with_none(cursor, db_connection): + """Test executemany with integers and None values.""" + try: + cursor.execute("CREATE TABLE #pytest_int_none (val INT)") + data = [(1,), (None,), (3,)] + cursor.executemany("INSERT INTO #pytest_int_none VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT val FROM #pytest_int_none ORDER BY val") + results = [row[0] for row in cursor.fetchall()] + assert results.count(None) == 1 + assert results.count(1) == 1 + assert results.count(3) == 1 + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_int_none") + db_connection.commit() + + +def test_executemany_strings_of_various_lengths(cursor, db_connection): + """Test executemany with strings of different lengths.""" + try: + cursor.execute("CREATE TABLE #pytest_varied_strings (val NVARCHAR(50))") + data = [("a",), ("abcd",), ("abc",)] + cursor.executemany("INSERT INTO #pytest_varied_strings VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT val FROM #pytest_varied_strings ORDER BY val") + results = [row[0] for row in cursor.fetchall()] + assert sorted(results) == ["a", "abc", "abcd"] + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_varied_strings") + db_connection.commit() + + +def test_executemany_bytes_values(cursor, db_connection): + """Test executemany with bytes values.""" + try: + cursor.execute("CREATE TABLE #pytest_bytes (val VARBINARY(50))") + data = [(b"a",), (b"abcdef",)] + cursor.executemany("INSERT INTO #pytest_bytes VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT val FROM #pytest_bytes ORDER BY val") + results = [row[0] for row in cursor.fetchall()] + assert results == [b"a", b"abcdef"] + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_bytes") + db_connection.commit() + +def test_executemany_empty_parameter_list(cursor, db_connection): + """Test executemany with an empty parameter list.""" + try: + cursor.execute("CREATE TABLE #pytest_empty_params (val INT)") + data = [] + cursor.executemany("INSERT INTO #pytest_empty_params VALUES (?)", data) + db_connection.commit() + + cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params") + count = cursor.fetchone()[0] + assert count == 0 + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params") + db_connection.commit() def test_nextset(cursor): """Test nextset"""