Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions mssql_python/pybind/ddbc_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2439,6 +2439,30 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, const py::list& columnwise_params,
bufferLength = sizeof(SQLGUID);
break;
}
case SQL_C_DEFAULT: {
// Handle NULL parameters - all values in this column should be NULL
// The upstream Python type detection (via _compute_column_type) ensures
// SQL_C_DEFAULT is only used when all values are None
LOG("BindParameterArray: Binding SQL_C_DEFAULT (NULL) array - param_index=%d, "
"count=%zu",
paramIndex, paramSetSize);

// For NULL parameters, we need to allocate a minimal buffer and set all
// indicators to SQL_NULL_DATA Use SQL_C_CHAR as a safe default C type for NULL
// values
char* nullBuffer = AllocateParamBufferArray<char>(tempBuffers, paramSetSize);
strLenOrIndArray = AllocateParamBufferArray<SQLLEN>(tempBuffers, paramSetSize);

for (size_t i = 0; i < paramSetSize; ++i) {
nullBuffer[i] = 0;
strLenOrIndArray[i] = SQL_NULL_DATA;
}

dataPtr = nullBuffer;
bufferLength = 1;
LOG("BindParameterArray: SQL_C_DEFAULT bound - param_index=%d", paramIndex);
break;
}
default: {
LOG("BindParameterArray: Unsupported C type - "
"param_index=%d, C_type=%d",
Expand Down
253 changes: 253 additions & 0 deletions tests/test_004_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,259 @@ def test_executemany_empty_parameter_list(cursor, db_connection):
db_connection.commit()


def test_executemany_mixed_null_and_typed_values(cursor, db_connection):
"""Test executemany with randomly mixed NULL and non-NULL values across multiple columns and rows (50 rows, 10 columns)."""
try:
# Create table with 10 columns of various types
cursor.execute(
"""
CREATE TABLE #pytest_empty_params (
col1 INT,
col2 VARCHAR(50),
col3 FLOAT,
col4 BIT,
col5 DATETIME,
col6 DECIMAL(10, 2),
col7 NVARCHAR(100),
col8 BIGINT,
col9 DATE,
col10 REAL
)
"""
)

# Generate 50 rows with randomly mixed NULL and non-NULL values across 10 columns
data = []
for i in range(50):
row = (
i if i % 3 != 0 else None, # col1: NULL every 3rd row
f"text_{i}" if i % 2 == 0 else None, # col2: NULL on odd rows
float(i * 1.5) if i % 4 != 0 else None, # col3: NULL every 4th row
True if i % 5 == 0 else (False if i % 5 == 1 else None), # col4: NULL on some rows
datetime(2025, 1, 1, 12, 0, 0) if i % 6 != 0 else None, # col5: NULL every 6th row
decimal.Decimal(f"{i}.99") if i % 3 != 0 else None, # col6: NULL every 3rd row
f"desc_{i}" if i % 7 != 0 else None, # col7: NULL every 7th row
i * 100 if i % 8 != 0 else None, # col8: NULL every 8th row
date(2025, 1, 1) if i % 9 != 0 else None, # col9: NULL every 9th row
float(i / 2.0) if i % 10 != 0 else None, # col10: NULL every 10th row
)
data.append(row)

cursor.executemany(
"INSERT INTO #pytest_empty_params VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", data
)
db_connection.commit()

# Verify all 50 rows were inserted
cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params")
count = cursor.fetchone()[0]
assert count == 50, f"Expected 50 rows, got {count}"

# Verify NULL counts for specific columns
cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col1 IS NULL")
null_count_col1 = cursor.fetchone()[0]
assert (
null_count_col1 == 17
), f"Expected 17 NULLs in col1 (every 3rd row), got {null_count_col1}"

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col2 IS NULL")
null_count_col2 = cursor.fetchone()[0]
assert null_count_col2 == 25, f"Expected 25 NULLs in col2 (odd rows), got {null_count_col2}"

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col3 IS NULL")
null_count_col3 = cursor.fetchone()[0]
assert (
null_count_col3 == 13
), f"Expected 13 NULLs in col3 (every 4th row), got {null_count_col3}"

# Verify some non-NULL values exist
cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col1 IS NOT NULL")
non_null_count = cursor.fetchone()[0]
assert non_null_count > 0, "Expected some non-NULL values in col1"

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params WHERE col2 IS NOT NULL")
non_null_count = cursor.fetchone()[0]
assert non_null_count > 0, "Expected some non-NULL values in col2"

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params")
db_connection.commit()


def test_executemany_multi_column_null_arrays(cursor, db_connection):
"""Test executemany with multi-column NULL arrays (50 records, 8 columns)."""
try:
# Create table with 8 columns of various types
cursor.execute(
"""
CREATE TABLE #pytest_null_arrays (
col1 INT,
col2 VARCHAR(100),
col3 FLOAT,
col4 DATETIME,
col5 DECIMAL(18, 4),
col6 NVARCHAR(200),
col7 BIGINT,
col8 DATE
)
"""
)

# Generate 50 rows with all NULL values across 8 columns
data = [(None, None, None, None, None, None, None, None) for _ in range(50)]

cursor.executemany("INSERT INTO #pytest_null_arrays VALUES (?, ?, ?, ?, ?, ?, ?, ?)", data)
db_connection.commit()

# Verify all 50 rows were inserted
cursor.execute("SELECT COUNT(*) FROM #pytest_null_arrays")
count = cursor.fetchone()[0]
assert count == 50, f"Expected 50 rows, got {count}"

# Verify all values are NULL for each column
for col_num in range(1, 9):
cursor.execute(f"SELECT COUNT(*) FROM #pytest_null_arrays WHERE col{col_num} IS NULL")
null_count = cursor.fetchone()[0]
assert null_count == 50, f"Expected 50 NULLs in col{col_num}, got {null_count}"

# Verify no non-NULL values exist
cursor.execute(
"""
SELECT COUNT(*) FROM #pytest_null_arrays
WHERE col1 IS NOT NULL OR col2 IS NOT NULL OR col3 IS NOT NULL
OR col4 IS NOT NULL OR col5 IS NOT NULL OR col6 IS NOT NULL
OR col7 IS NOT NULL OR col8 IS NOT NULL
"""
)
non_null_count = cursor.fetchone()[0]
assert non_null_count == 0, f"Expected 0 non-NULL values, got {non_null_count}"

finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_null_arrays")
db_connection.commit()


def test_executemany_MIX_NONE_parameter_list(cursor, db_connection):
"""Test executemany with an NONE parameter list."""
try:
cursor.execute("CREATE TABLE #pytest_empty_params (val VARCHAR(50))")
data = [(None,), ("Test",), (None,)]
cursor.executemany("INSERT INTO #pytest_empty_params VALUES (?)", data)
db_connection.commit()

cursor.execute("SELECT COUNT(*) FROM #pytest_empty_params")
count = cursor.fetchone()[0]
assert count == 3
finally:
cursor.execute("DROP TABLE IF EXISTS #pytest_empty_params")
db_connection.commit()


def test_executemany_concurrent_null_parameters(db_connection):
"""Test executemany with NULL parameters across multiple sequential operations."""
# Note: This test uses sequential execution to ensure reliability while still
# testing the core functionality of executemany with NULL parameters.
# True concurrent testing would require separate database connections per thread.
import uuid
from datetime import datetime

# Use a regular table with unique name
table_name = f"pytest_concurrent_nulls_{uuid.uuid4().hex[:8]}"

# Create table
with db_connection.cursor() as cursor:
cursor.execute(
f"""
IF OBJECT_ID('{table_name}', 'U') IS NOT NULL
DROP TABLE {table_name}

CREATE TABLE {table_name} (
thread_id INT,
row_id INT,
col1 INT,
col2 VARCHAR(100),
col3 FLOAT,
col4 DATETIME
)
"""
)
db_connection.commit()

# Execute multiple sequential insert operations
# Use a fresh cursor for each operation
num_operations = 3

for thread_id in range(num_operations):
with db_connection.cursor() as cursor:
# Generate test data with NULLs
data = []
for i in range(20):
row = (
thread_id,
i,
i if i % 2 == 0 else None, # Mix of values and NULLs
f"thread_{thread_id}_row_{i}" if i % 3 != 0 else None,
float(i * 1.5) if i % 4 != 0 else None,
datetime(2025, 1, 1, 12, 0, 0) if i % 5 != 0 else None,
)
data.append(row)

# Execute and commit with retry logic to work around commit reliability issues
for attempt in range(3): # Retry up to 3 times
cursor.executemany(f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?)", data)
db_connection.commit()

# Verify the data was actually committed
cursor.execute(
f"SELECT COUNT(*) FROM {table_name} WHERE thread_id = ?", [thread_id]
)
if cursor.fetchone()[0] == 20:
break # Success!
elif attempt < 2:
# Commit didn't work, clean up and retry
cursor.execute(f"DELETE FROM {table_name} WHERE thread_id = ?", [thread_id])
db_connection.commit()
else:
raise AssertionError(
f"Operation {thread_id}: Failed to commit data after 3 attempts"
)

# Verify data was inserted correctly
with db_connection.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
total_count = cursor.fetchone()[0]
assert (
total_count == num_operations * 20
), f"Expected {num_operations * 20} rows, got {total_count}"

# Verify each operation's data
for operation_id in range(num_operations):
cursor.execute(
f"SELECT COUNT(*) FROM {table_name} WHERE thread_id = ?",
[operation_id],
)
operation_count = cursor.fetchone()[0]
assert (
operation_count == 20
), f"Operation {operation_id} expected 20 rows, got {operation_count}"

# Verify NULL counts for this operation
# Pattern: i if i % 2 == 0 else None
# i from 0 to 19: NULL when i is odd (1,3,5,7,9,11,13,15,17,19) = 10 NULLs
cursor.execute(
f"SELECT COUNT(*) FROM {table_name} WHERE thread_id = ? AND col1 IS NULL",
[operation_id],
)
null_count = cursor.fetchone()[0]
assert (
null_count == 10
), f"Operation {operation_id} expected 10 NULLs in col1, got {null_count}"

# Cleanup
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
db_connection.commit()


def test_executemany_Decimal_list(cursor, db_connection):
"""Test executemany with an decimal parameter list."""
try:
Expand Down
Loading