@@ -7362,126 +7362,36 @@ def test_datetimeoffset_read_write(cursor, db_connection):
73627362 including timezone information.
73637363 """
73647364 try :
7365- # Define the test cases for DATETIMEOFFSET.
7366- # Each tuple contains the expected SQL string representation and the Python datetime object.
73677365 datetimeoffset_test_cases = [
7368- # Positive offset, no microseconds
73697366 (
73707367 "2023-10-26 10:30:00.0000000 +05:30" ,
73717368 datetime (2023 , 10 , 26 , 10 , 30 , 0 , 0 ,
73727369 tzinfo = timezone (timedelta (hours = 5 , minutes = 30 )))
73737370 ),
7374- # Negative offset, with microseconds
73757371 (
73767372 "2023-10-27 15:45:10.1234567 -08:00" ,
73777373 datetime (2023 , 10 , 27 , 15 , 45 , 10 , 123456 ,
73787374 tzinfo = timezone (timedelta (hours = - 8 )))
73797375 ),
7380- # Zero offset, with microseconds
73817376 (
73827377 "2023-10-28 20:00:05.9876543 +00:00" ,
73837378 datetime (2023 , 10 , 28 , 20 , 0 , 5 , 987654 ,
73847379 tzinfo = timezone (timedelta (hours = 0 )))
73857380 ),
7386- # Naive datetime.datetime should be rejected as a parameter
73877381 (
73887382 "invalid" , # Placeholder for the SQL string
73897383 datetime (2023 , 10 , 29 , 10 , 0 )
73907384 )
73917385 ]
7392-
7393- cursor .execute ("IF OBJECT_ID('tempdb..#pytest_dto', 'U') IS NOT NULL DROP TABLE #pytest_dto;" )
7394- cursor .execute ("CREATE TABLE #pytest_dto (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);" )
7395- db_connection .commit ()
7396-
7397- insert_statement = "INSERT INTO #pytest_dto (id, dto_column) VALUES (?, ?);"
7398-
7399- for i , (sql_str , python_dt ) in enumerate (datetimeoffset_test_cases ):
7400- # Insert timezone-aware datetime objects
7401- cursor .execute (insert_statement , i , python_dt )
7402- db_connection .commit ()
7403-
7404- # Step 3: Fetch the data and verify
7405- cursor .execute ("SELECT id, dto_column FROM #pytest_dto ORDER BY id;" )
7406-
7407- for i , (sql_str , python_dt ) in enumerate (datetimeoffset_test_cases ):
7408- if sql_str == "invalid" :
7409- continue
7410-
7411- row = cursor .fetchone ()
7412- assert row is not None , f"No row fetched for test case { i } ."
7413-
7414- fetched_id , fetched_dto = row
7415-
7416- # Validation 1: The fetched datetime object must be timezone-aware.
7417- assert fetched_dto .tzinfo is not None , "Fetched datetime object is naive."
7418-
7419- # Validation 2: Compare the values by converting both to UTC.
7420- expected_utc = python_dt .astimezone (timezone .utc ).replace (tzinfo = None )
7421- fetched_utc = fetched_dto .astimezone (timezone .utc ).replace (tzinfo = None )
7422-
7423- # Since SQL Server's DATETIMEOFFSET has a scale up to 7, but Python's
7424- # datetime only supports microseconds (6 digits), truncate the last digit.
7425- expected_utc = expected_utc .replace (microsecond = int (expected_utc .microsecond / 1000 ) * 1000 )
7426- fetched_utc = fetched_utc .replace (microsecond = int (fetched_utc .microsecond / 1000 ) * 1000 )
7427-
7428- assert fetched_utc == expected_utc , (
7429- f"Value mismatch for test case { i } . "
7430- f"Expected UTC: { expected_utc } , Got UTC: { fetched_utc } "
7431- )
7432-
7433- finally :
7434- # Cleanup: Drop the temporary table
7435- cursor .execute ("IF OBJECT_ID('tempdb..#pytest_dto', 'U') IS NOT NULL DROP TABLE #pytest_dto;" )
7436- db_connection .commit ()
7437-
7438-
7439- def test_datetimeoffset_read_write (cursor , db_connection ):
7440- """
7441- Test the driver's ability to correctly read and write DATETIMEOFFSET data,
7442- including timezone information.
7443- """
7444- try :
7445- # Define the test cases for DATETIMEOFFSET.
7446- # Each tuple contains the expected SQL string representation and the Python datetime object.
7447- datetimeoffset_test_cases = [
7448- # Positive offset, no microseconds
7449- (
7450- "2023-10-26 10:30:00.0000000 +05:30" ,
7451- datetime (2023 , 10 , 26 , 10 , 30 , 0 , 0 ,
7452- tzinfo = timezone (timedelta (hours = 5 , minutes = 30 )))
7453- ),
7454- # Negative offset, with microseconds
7455- (
7456- "2023-10-27 15:45:10.1234567 -08:00" ,
7457- datetime (2023 , 10 , 27 , 15 , 45 , 10 , 123456 ,
7458- tzinfo = timezone (timedelta (hours = - 8 )))
7459- ),
7460- # Zero offset, with microseconds
7461- (
7462- "2023-10-28 20:00:05.9876543 +00:00" ,
7463- datetime (2023 , 10 , 28 , 20 , 0 , 5 , 987654 ,
7464- tzinfo = timezone (timedelta (hours = 0 )))
7465- ),
7466- # Naive datetime.datetime should be rejected as a parameter
7467- (
7468- "invalid" , # Placeholder for the SQL string
7469- datetime (2023 , 10 , 29 , 10 , 0 )
7470- )
7471- ]
7472-
74737386 cursor .execute ("IF OBJECT_ID('tempdb..#pytest_dto', 'U') IS NOT NULL DROP TABLE #pytest_dto;" )
74747387 cursor .execute ("CREATE TABLE #pytest_dto (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);" )
74757388 db_connection .commit ()
7476-
74777389 insert_statement = "INSERT INTO #pytest_dto (id, dto_column) VALUES (?, ?);"
7478-
74797390 for i , (sql_str , python_dt ) in enumerate (datetimeoffset_test_cases ):
74807391 # Insert timezone-aware datetime objects
74817392 cursor .execute (insert_statement , i , python_dt )
74827393 db_connection .commit ()
74837394
7484- # Step 3: Fetch the data and verify
74857395 cursor .execute ("SELECT id, dto_column FROM #pytest_dto ORDER BY id;" )
74867396
74877397 for i , (sql_str , python_dt ) in enumerate (datetimeoffset_test_cases ):
@@ -7492,26 +7402,16 @@ def test_datetimeoffset_read_write(cursor, db_connection):
74927402 assert row is not None , f"No row fetched for test case { i } ."
74937403
74947404 fetched_id , fetched_dto = row
7495-
7496- # Validation 1: The fetched datetime object must be timezone-aware.
74977405 assert fetched_dto .tzinfo is not None , "Fetched datetime object is naive."
7498-
7499- # Validation 2: Compare the values by converting both to UTC.
75007406 expected_utc = python_dt .astimezone (timezone .utc ).replace (tzinfo = None )
75017407 fetched_utc = fetched_dto .astimezone (timezone .utc ).replace (tzinfo = None )
7502-
7503- # Since SQL Server's DATETIMEOFFSET has a scale up to 7, but Python's
7504- # datetime only supports microseconds (6 digits), truncate the last digit.
75057408 expected_utc = expected_utc .replace (microsecond = int (expected_utc .microsecond / 1000 ) * 1000 )
75067409 fetched_utc = fetched_utc .replace (microsecond = int (fetched_utc .microsecond / 1000 ) * 1000 )
7507-
75087410 assert fetched_utc == expected_utc , (
75097411 f"Value mismatch for test case { i } . "
75107412 f"Expected UTC: { expected_utc } , Got UTC: { fetched_utc } "
75117413 )
7512-
75137414 finally :
7514- # Cleanup: Drop the temporary table
75157415 cursor .execute ("IF OBJECT_ID('tempdb..#pytest_dto', 'U') IS NOT NULL DROP TABLE #pytest_dto;" )
75167416 db_connection .commit ()
75177417
0 commit comments