diff --git a/cmd/sling/sling_test.go b/cmd/sling/sling_test.go index 2e666d05e..47fcb7783 100755 --- a/cmd/sling/sling_test.go +++ b/cmd/sling/sling_test.go @@ -86,7 +86,7 @@ var connMap = map[dbio.Type]connTest{ dbio.TypeDbIceberg: {name: "iceberg_r2", adjustCol: g.Bool(false)}, dbio.TypeDbMySQL: {name: "mysql", schema: "mysql"}, dbio.TypeDbOracle: {name: "oracle", schema: "oracle", useBulk: g.Bool(false)}, - dbio.Type("oracle_sqlldr"): {name: "oracle", schema: "oracle", useBulk: g.Bool(true)}, + dbio.Type("oracle_sqlldr"): {name: "oracle", schema: "oracle", useBulk: g.Bool(true), adjustCol: g.Bool(false)}, dbio.TypeDbPostgres: {name: "postgres"}, dbio.Type("postgres_adbc"): {name: "postgres_adbc", adjustCol: g.Bool(false)}, dbio.TypeDbRedshift: {name: "redshift", adjustCol: g.Bool(false)}, diff --git a/cmd/sling/tests/files/test_wide_columns.csv b/cmd/sling/tests/files/test_wide_columns.csv new file mode 100644 index 000000000..368c9478c --- /dev/null +++ b/cmd/sling/tests/files/test_wide_columns.csv @@ -0,0 +1,6 @@ +id,col_001,col_002,col_003,col_004,col_005,col_006,col_007,col_008,col_009,col_010,col_011,col_012,col_013,col_014,col_015,col_016,col_017,col_018,col_019,col_020,col_021,col_022,col_023,col_024,col_025,col_026,col_027,col_028,col_029,col_030,col_031,col_032,col_033,col_034,col_035,col_036,col_037,col_038,col_039,col_040,col_041,col_042,col_043,col_044,col_045,col_046,col_047,col_048,col_049,col_050 +1,value_1_1,value_1_2,value_1_3,value_1_4,value_1_5,value_1_6,value_1_7,value_1_8,value_1_9,value_1_10,value_1_11,value_1_12,value_1_13,value_1_14,value_1_15,value_1_16,value_1_17,value_1_18,value_1_19,value_1_20,value_1_21,value_1_22,value_1_23,value_1_24,value_1_25,value_1_26,value_1_27,value_1_28,value_1_29,value_1_30,value_1_31,value_1_32,value_1_33,value_1_34,value_1_35,value_1_36,value_1_37,value_1_38,value_1_39,value_1_40,value_1_41,value_1_42,value_1_43,value_1_44,value_1_45,value_1_46,value_1_47,value_1_48,value_1_49,value_1_50 +2,value_2_1,value_2_2,value_2_3,value_2_4,value_2_5,value_2_6,value_2_7,value_2_8,value_2_9,value_2_10,value_2_11,value_2_12,value_2_13,value_2_14,value_2_15,value_2_16,value_2_17,value_2_18,value_2_19,value_2_20,value_2_21,value_2_22,value_2_23,value_2_24,value_2_25,value_2_26,value_2_27,value_2_28,value_2_29,value_2_30,value_2_31,value_2_32,value_2_33,value_2_34,value_2_35,value_2_36,value_2_37,value_2_38,value_2_39,value_2_40,value_2_41,value_2_42,value_2_43,value_2_44,value_2_45,value_2_46,value_2_47,value_2_48,value_2_49,value_2_50 +3,value_3_1,value_3_2,value_3_3,value_3_4,value_3_5,value_3_6,value_3_7,value_3_8,value_3_9,value_3_10,value_3_11,value_3_12,value_3_13,value_3_14,value_3_15,value_3_16,value_3_17,value_3_18,value_3_19,value_3_20,value_3_21,value_3_22,value_3_23,value_3_24,value_3_25,value_3_26,value_3_27,value_3_28,value_3_29,value_3_30,value_3_31,value_3_32,value_3_33,value_3_34,value_3_35,value_3_36,value_3_37,value_3_38,value_3_39,value_3_40,value_3_41,value_3_42,value_3_43,value_3_44,value_3_45,value_3_46,value_3_47,value_3_48,value_3_49,value_3_50 +4,value_4_1,value_4_2,value_4_3,value_4_4,value_4_5,value_4_6,value_4_7,value_4_8,value_4_9,value_4_10,value_4_11,value_4_12,value_4_13,value_4_14,value_4_15,value_4_16,value_4_17,value_4_18,value_4_19,value_4_20,value_4_21,value_4_22,value_4_23,value_4_24,value_4_25,value_4_26,value_4_27,value_4_28,value_4_29,value_4_30,value_4_31,value_4_32,value_4_33,value_4_34,value_4_35,value_4_36,value_4_37,value_4_38,value_4_39,value_4_40,value_4_41,value_4_42,value_4_43,value_4_44,value_4_45,value_4_46,value_4_47,value_4_48,value_4_49,value_4_50 +5,value_5_1,value_5_2,value_5_3,value_5_4,value_5_5,value_5_6,value_5_7,value_5_8,value_5_9,value_5_10,value_5_11,value_5_12,value_5_13,value_5_14,value_5_15,value_5_16,value_5_17,value_5_18,value_5_19,value_5_20,value_5_21,value_5_22,value_5_23,value_5_24,value_5_25,value_5_26,value_5_27,value_5_28,value_5_29,value_5_30,value_5_31,value_5_32,value_5_33,value_5_34,value_5_35,value_5_36,value_5_37,value_5_38,value_5_39,value_5_40,value_5_41,value_5_42,value_5_43,value_5_44,value_5_45,value_5_46,value_5_47,value_5_48,value_5_49,value_5_50 diff --git a/cmd/sling/tests/pipelines/p.19.duckdb_chunk_lock.yaml b/cmd/sling/tests/pipelines/p.19.duckdb_chunk_lock.yaml new file mode 100644 index 000000000..bd3296a01 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.19.duckdb_chunk_lock.yaml @@ -0,0 +1,97 @@ +# Test DuckDB chunking file lock (issue #717) +# When using chunk_size with DuckDB target and wildcard streams, +# the connection opened during ClearTableForChunkLoadWithRange (or other compile steps) +# must be properly closed before chunk tasks open theirs. + +steps: + # 1. Create source tables in Postgres + - id: setup + connection: postgres + query: | + DROP TABLE IF EXISTS public.test_duckdb_chunk_lock; + DROP TABLE IF EXISTS public.test_duckdb_chunk_lock_other; + CREATE TABLE public.test_duckdb_chunk_lock AS + SELECT generate_series AS id, 'name_' || generate_series AS name + FROM generate_series(1, 1000); + CREATE TABLE public.test_duckdb_chunk_lock_other AS + SELECT generate_series AS id, 'other_' || generate_series AS val + FROM generate_series(1, 100); + + - log: "Created source tables (1000 + 100 rows)" + + # 2. Run replication with explicit streams, one chunked + - replication: + source: postgres + target: DUCKDB + defaults: + mode: full-refresh + target_options: + use_bulk: false + env: + SLING_THREADS: 1 + streams: + public.test_duckdb_chunk_lock_other: + object: main.test_duckdb_chunk_lock_other + public.test_duckdb_chunk_lock: + object: main.test_duckdb_chunk_lock + primary_key: [id] + update_key: id + source_options: + chunk_size: 250 + on_failure: abort + + - log: "Replication with explicit streams completed" + + # 3. Run replication with wildcard (like the user's public.*) + # This is the scenario that triggers the issue + - replication: + source: postgres + target: DUCKDB + defaults: + mode: full-refresh + object: "main.{stream_table}" + target_options: + use_bulk: false + source_options: + chunk_size: 250 + env: + SLING_THREADS: 1 + streams: + "public.test_duckdb_chunk_lock*": + primary_key: [id] + update_key: id + on_failure: abort + + - log: "Replication with wildcard completed" + + # 4. Verify all rows arrived + - connection: DUCKDB + query: SELECT count(*) as cnt FROM main.test_duckdb_chunk_lock + into: result + + - log: "DuckDB chunk table row count: {store.result[0].cnt}" + + - check: int_parse(store.result[0].cnt) == 1000 + failure_message: "Expected 1000 rows, got {store.result[0].cnt}" + + - connection: DUCKDB + query: SELECT count(*) as cnt FROM main.test_duckdb_chunk_lock_other + into: result_other + + - log: "DuckDB other table row count: {store.result_other[0].cnt}" + + - check: int_parse(store.result_other[0].cnt) == 100 + failure_message: "Expected 100 rows, got {store.result_other[0].cnt}" + + - log: "SUCCESS: DuckDB chunking with chunk_size works (issue #717)" + + # 5. Cleanup + - connection: DUCKDB + query: | + DROP TABLE IF EXISTS main.test_duckdb_chunk_lock; + DROP TABLE IF EXISTS main.test_duckdb_chunk_lock_other; + + - connection: postgres + query: | + DROP TABLE IF EXISTS public.test_duckdb_chunk_lock; + DROP TABLE IF EXISTS public.test_duckdb_chunk_lock_other; diff --git a/cmd/sling/tests/pipelines/p.20.oracle_sqlldr_column_typing.yaml b/cmd/sling/tests/pipelines/p.20.oracle_sqlldr_column_typing.yaml new file mode 100644 index 000000000..e212d27a5 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.20.oracle_sqlldr_column_typing.yaml @@ -0,0 +1,177 @@ +# Reproduce: Oracle column_typing and explicit columns type not translating correctly +# Three scenarios: +# 1. column_typing with max_length:250 + use_bulk:true → should produce VARCHAR2(250), not CLOB +# 2. column_typing with max_length:250 + use_bulk:false → should produce VARCHAR2(250), not CLOB +# 3. explicit columns: { col_001: string(100) } → should produce VARCHAR2(100), not CLOB + +steps: + # 0. Cleanup all test tables + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_BULK'; EXCEPTION WHEN OTHERS THEN NULL; END; + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_NOBULK'; EXCEPTION WHEN OTHERS THEN NULL; END; + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_COLS'; EXCEPTION WHEN OTHERS THEN NULL; END; + + # ── Stream 1: column_typing + use_bulk: true ── + - replication: + source: local + target: oracle + defaults: + mode: full-refresh + streams: + file://cmd/sling/tests/files/test_wide_columns.csv: + object: SYSTEM.TEST_TYPING_BULK + target_options: + use_bulk: true + column_typing: + string: + max_length: 250 + + - connection: oracle + query: | + SELECT column_name, data_type, data_length + FROM all_tab_columns + WHERE table_name = 'TEST_TYPING_BULK' AND owner = 'SYSTEM' + AND column_name LIKE 'COL_%' + ORDER BY column_name + into: bulk_cols + + - log: | + Stream 1 (column_typing + use_bulk=true): + {pretty_table(store.bulk_cols)} + + - check: int_parse(store.bulk_cols[0].data_length) <= 250 + failure_message: "BULK: column_typing max_length:250 not respected. COL_001 data_length={store.bulk_cols[0].data_length} (expected <= 250)" + on_failure: warn + + - connection: oracle + query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_TYPING_BULK + into: bulk_count + + - check: int_parse(store.bulk_count[0].cnt) == 5 + failure_message: "BULK: Expected 5 rows, got {store.bulk_count[0].cnt}" + on_failure: warn + + - log: "Stream 1 done" + + # ── Stream 2: column_typing + use_bulk: false ── + - replication: + source: local + target: oracle + defaults: + mode: full-refresh + streams: + file://cmd/sling/tests/files/test_wide_columns.csv: + object: SYSTEM.TEST_TYPING_NOBULK + target_options: + use_bulk: false + column_typing: + string: + max_length: 250 + + - connection: oracle + query: | + SELECT column_name, data_type, data_length + FROM all_tab_columns + WHERE table_name = 'TEST_TYPING_NOBULK' AND owner = 'SYSTEM' + AND column_name LIKE 'COL_%' + ORDER BY column_name + into: nobulk_cols + + - log: | + Stream 2 (column_typing + use_bulk=false): + {pretty_table(store.nobulk_cols)} + + - check: int_parse(store.nobulk_cols[0].data_length) <= 250 + failure_message: "NOBULK: column_typing max_length:250 not respected. COL_001 data_length={store.nobulk_cols[0].data_length} (expected <= 250)" + on_failure: warn + + - connection: oracle + query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_TYPING_NOBULK + into: nobulk_count + + - check: int_parse(store.nobulk_count[0].cnt) == 5 + failure_message: "NOBULK: Expected 5 rows, got {store.nobulk_count[0].cnt}" + on_failure: warn + + - log: "Stream 2 done" + + # ── Stream 3: explicit columns with string(100) ── + - replication: + source: local + target: oracle + defaults: + mode: full-refresh + streams: + file://cmd/sling/tests/files/test_wide_columns.csv: + object: SYSTEM.TEST_TYPING_COLS + columns: + col_001: string(100) + col_002: string(200) + + - connection: oracle + query: | + SELECT column_name, data_type, data_length + FROM all_tab_columns + WHERE table_name = 'TEST_TYPING_COLS' AND owner = 'SYSTEM' + AND column_name IN ('COL_001', 'COL_002', 'COL_003') + ORDER BY column_name + into: cols_info + + - log: | + Stream 3 (explicit columns string(100)/string(200)): + {pretty_table(store.cols_info)} + + - check: int_parse(store.cols_info[0].data_length) == 100 + failure_message: "COLS: COL_001 should be VARCHAR2(100), got data_length={store.cols_info[0].data_length}" + on_failure: warn + + - check: int_parse(store.cols_info[1].data_length) == 200 + failure_message: "COLS: COL_002 should be VARCHAR2(200), got data_length={store.cols_info[1].data_length}" + on_failure: warn + + - connection: oracle + query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_TYPING_COLS + into: cols_count + + - check: int_parse(store.cols_count[0].cnt) == 5 + failure_message: "COLS: Expected 5 rows, got {store.cols_count[0].cnt}" + on_failure: warn + + - log: "Stream 3 done" + + # ── Final validation (strict) ── + - check: int_parse(store.bulk_cols[0].data_length) <= 250 + failure_message: "FAIL BULK: COL_001 data_length={store.bulk_cols[0].data_length} (expected <= 250)" + + - log: "SUCCESS: Stream 1 (column_typing + use_bulk=true) passed" + + - check: int_parse(store.nobulk_cols[0].data_length) <= 250 + failure_message: "FAIL NOBULK: COL_001 data_length={store.nobulk_cols[0].data_length} (expected <= 250)" + + - log: "SUCCESS: Stream 2 (column_typing + use_bulk=false) passed" + + - check: int_parse(store.cols_info[0].data_length) == 100 + failure_message: "FAIL COLS: COL_001 data_length={store.cols_info[0].data_length} (expected 100)" + + - check: int_parse(store.cols_info[1].data_length) == 200 + failure_message: "FAIL COLS: COL_002 data_length={store.cols_info[1].data_length} (expected 200)" + + - log: "SUCCESS: Stream 3 (explicit columns) passed" + + # ── Cleanup ── + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_BULK'; EXCEPTION WHEN OTHERS THEN NULL; END; + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_NOBULK'; EXCEPTION WHEN OTHERS THEN NULL; END; + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_COLS'; EXCEPTION WHEN OTHERS THEN NULL; END; + + - log: "Oracle column_typing test complete" diff --git a/cmd/sling/tests/pipelines/p.21.csv_oracle_string_length_inference.yaml b/cmd/sling/tests/pipelines/p.21.csv_oracle_string_length_inference.yaml new file mode 100644 index 000000000..7f3977c23 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.21.csv_oracle_string_length_inference.yaml @@ -0,0 +1,114 @@ +# Reproduce: String lengths not auto-inferred when reading from delimited files +# Issue: When reading CSV/text files to Oracle without column_typing, all string columns +# get VARCHAR2(4000) instead of being sized based on actual data length. +# The expected behavior is that Sling should infer string lengths from the data +# (using Stats.MaxLen) and create appropriately sized VARCHAR2 columns. +# +# This test: +# 1. Generates a temp delimited file with known string lengths (caret-delimited, no header) +# 2. Replicates it to Oracle WITHOUT column_typing (reproduces the bug) +# 3. Checks that Oracle columns have inferred lengths < 4000 (not all VARCHAR2(4000)) + +steps: + # 0. Create temp directory + - command: mkdir -p temp/csv_oracle_strlen + + # 1. Write a caret-delimited file with known max string lengths + # col1: max 5 chars (codes like "ABC01") + # col2: max 10 chars (short names) + # col3: max 30 chars (longer descriptions) + # col4: max 3 chars (numeric-looking but string) + # header: false (no header row, just like the user's setup) + - type: write + to: local/temp/csv_oracle_strlen/orgUnits.txt + content: | + ABC01^Sales^Regional Sales Division East^100 + DEF02^Marketing^Digital Marketing Department^200 + GHI03^Finance^Corporate Finance and Audit^300 + JKL04^HR^Human Resources and Talent^400 + MNO05^IT^Information Technology Support^500 + + # 2. Cleanup Oracle test table + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_STRLEN_INFER'; EXCEPTION WHEN OTHERS THEN NULL; END; + + # 3. Replicate delimited file to Oracle WITHOUT column_typing + # This mimics the user's setup: caret delimiter, no header, column names provided + - replication: + source: local + target: oracle + defaults: + mode: full-refresh + streams: + file://temp/csv_oracle_strlen/orgUnits.txt: + object: SYSTEM.TEST_STRLEN_INFER + source_options: + delimiter: "^" + header: false + target_options: + column_typing: + string: + max_length: 200 + length_factor: 2 + use_max: false + columns: + ORGSRUCTURECODE: + ORGUNITCODE: + ORGUNITCODEORIG: + ORGUNITNAME: + + # 4. Check what Oracle created — get actual column data_length values + - connection: oracle + query: | + SELECT column_name, data_type, data_length + FROM all_tab_columns + WHERE table_name = 'TEST_STRLEN_INFER' AND owner = 'SYSTEM' + AND column_name NOT LIKE '%SLING%' + ORDER BY column_id + into: col_info + + - log: | + Column info (no column_typing): + {pretty_table(store.col_info)} + + # 5. Verify row count + - connection: oracle + query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_STRLEN_INFER + into: row_count + + - check: int_parse(store.row_count[0].cnt) == 5 + failure_message: "Expected 5 rows, got {store.row_count[0].cnt}" + + # 6. Verify string lengths were inferred (NOT all 4000) + # The max string in the file is ~30 chars, so with 2x safety factor = ~60 + # Any reasonable inferred length should be well under 4000 + - check: int_parse(store.col_info[0].data_length) < 4000 + failure_message: | + BUG: ORGSRUCTURECODE has data_length={store.col_info[0].data_length} (expected < 4000). + String length was NOT inferred from file data — got max_string_type default instead. + + - check: int_parse(store.col_info[1].data_length) < 4000 + failure_message: | + BUG: ORGUNITCODE has data_length={store.col_info[1].data_length} (expected < 4000). + String length was NOT inferred from file data — got max_string_type default instead. + + - check: int_parse(store.col_info[2].data_length) < 4000 + failure_message: | + BUG: ORGUNITCODEORIG has data_length={store.col_info[2].data_length} (expected < 4000). + String length was NOT inferred from file data — got max_string_type default instead. + + - check: int_parse(store.col_info[3].data_length) < 4000 + failure_message: | + BUG: ORGUNITNAME has data_length={store.col_info[3].data_length} (expected < 4000). + String length was NOT inferred from file data — got max_string_type default instead. + + - log: "SUCCESS: String lengths were auto-inferred from delimited file data" + + # 7. Cleanup + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_STRLEN_INFER'; EXCEPTION WHEN OTHERS THEN NULL; END; + - command: rm -rf temp/csv_oracle_strlen + + - log: "CSV to Oracle string length inference test complete" diff --git a/cmd/sling/tests/pipelines/p.22.ducklake_unicode_column_names.yaml b/cmd/sling/tests/pipelines/p.22.ducklake_unicode_column_names.yaml new file mode 100644 index 000000000..8f2f04b65 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.22.ducklake_unicode_column_names.yaml @@ -0,0 +1,63 @@ +# Test DuckLake non-ASCII / Unicode column names (issue #721) +# When replicating from DuckLake to Postgres, non-English column names +# (e.g. Chinese characters) are incorrectly converted to generic "col", "col1", etc. +# The CleanHeaderRow function strips all non-ASCII characters, leaving empty strings +# which then become "col", "col1" placeholders. + +steps: + # 1. Setup: create ducklake table with Chinese column names + - connection: DUCKLAKE + query: | + DROP TABLE IF EXISTS sling_test.test_unicode_cols; + CREATE TABLE sling_test.test_unicode_cols ("编码" VARCHAR, "名称" VARCHAR); + INSERT INTO sling_test.test_unicode_cols VALUES ('1', '王'), ('2', '李'); + + - log: "Created ducklake source table with Chinese column names" + + # 2. Replicate from ducklake to postgres + - replication: + source: DUCKLAKE + target: POSTGRES + defaults: + mode: full-refresh + streams: + sling_test.test_unicode_cols: + object: public.test_unicode_cols + on_failure: abort + + - log: "Replicated ducklake -> postgres" + + # 3. Verify column names are preserved in postgres (not converted to col/col1) + - connection: POSTGRES + query: | + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = 'test_unicode_cols' + ORDER BY ordinal_position + into: pg_cols + + - log: "Postgres columns => {store.pg_cols}" + + # Column names should NOT be "col"/"col1" (the bug converts them to generic names) + - check: store.pg_cols[0].column_name != "col" + failure_message: "Column 1 was converted to generic 'col', expected Unicode name" + + - check: store.pg_cols[1].column_name != "col1" + failure_message: "Column 2 was converted to generic 'col1', expected Unicode name" + + # 4. Verify data is intact + - connection: POSTGRES + query: SELECT count(*) as cnt FROM public.test_unicode_cols + into: row_count + + - check: int_parse(store.row_count[0].cnt) == 2 + failure_message: "Expected 2 rows, got {store.row_count[0].cnt}" + + - log: "SUCCESS: DuckLake Unicode column names preserved (issue #721)" + + # 5. Cleanup + - connection: POSTGRES + query: DROP TABLE IF EXISTS public.test_unicode_cols + + - connection: DUCKLAKE + query: DROP TABLE IF EXISTS sling_test.test_unicode_cols diff --git a/cmd/sling/tests/suite.cli.yaml b/cmd/sling/tests/suite.cli.yaml index af00eb22b..1be635f0c 100644 --- a/cmd/sling/tests/suite.cli.yaml +++ b/cmd/sling/tests/suite.cli.yaml @@ -237,7 +237,7 @@ - id: 38 name: Run sling with replication configuration 07 run: sling run -r cmd/sling/tests/replications/r.07.yaml - streams: 15 + streams: 16 group: sqlite - id: 39 @@ -2103,4 +2103,42 @@ run: 'sling run -d -r cmd/sling/tests/replications/r.108.oracle_chunk_custom_sql.yaml' streams: 2 output_contains: - - 'execution succeeded' \ No newline at end of file + - 'execution succeeded' + +# DuckDB file locking prevents concurrent chunk writes (issue #717) +# When using chunk_size with DuckDB target, multiple threads try to open the same +# .duckdb file simultaneously, causing "Could not set lock on file" errors. +- id: 219 + name: 'DuckDB chunking file lock (issue #717)' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.19.duckdb_chunk_lock.yaml' + group: duckdb + output_contains: + - 'SUCCESS: DuckDB chunking with chunk_size works (issue #717)' + +# Oracle column_typing and explicit columns type not translating correctly +# Tests: column_typing+bulk, column_typing+nobulk, explicit columns string(N) +- id: 220 + name: 'Oracle column_typing and explicit columns type translation' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.20.oracle_sqlldr_column_typing.yaml' + output_contains: + - 'Stream 1 (column_typing + use_bulk=true) passed' + - 'Stream 2 (column_typing + use_bulk=false) passed' + - 'Stream 3 (explicit columns) passed' + - 'Oracle column_typing test complete' + +# String lengths not auto-inferred when reading from delimited files to Oracle +# Issue: Without column_typing, all string columns from CSV/text files get VARCHAR2(4000) +# instead of being sized based on actual max string length in the data. +# Reproduces user report: caret-delimited file, no header, column names in config +- id: 221 + name: 'CSV to Oracle string length auto-inference (no column_typing)' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.21.csv_oracle_string_length_inference.yaml' + output_contains: + - 'SUCCESS: String lengths were auto-inferred from delimited file data' + - 'CSV to Oracle string length inference test complete' + +- id: 222 + name: 'DuckLake Unicode column names preserved when replicating to Postgres (issue #721)' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.22.ducklake_unicode_column_names.yaml' + output_contains: + - 'SUCCESS: DuckLake Unicode column names preserved (issue #721)' \ No newline at end of file diff --git a/core/dbio/api/auth.go b/core/dbio/api/auth.go index 5d87ba50b..4cfeb4957 100644 --- a/core/dbio/api/auth.go +++ b/core/dbio/api/auth.go @@ -413,11 +413,21 @@ func (a *AuthenticatorOAuth2) Authenticate(ctx context.Context, state *APIStateA deviceAuthURL = strings.Replace(authURL, "/token", "/device/code", 1) } + // Render scopes + scopes := make([]string, len(a.Scopes)) + for i, s := range a.Scopes { + rendered, err := a.renderString(s) + if err != nil { + return g.Error(err, "could not render scope[%d]", i) + } + scopes[i] = rendered + } + // Create OAuth2 config conf := &oauth2.Config{ ClientID: clientID, ClientSecret: clientSecret, - Scopes: a.Scopes, + Scopes: scopes, Endpoint: oauth2.Endpoint{ AuthURL: authorizeURL, TokenURL: authURL, @@ -533,13 +543,41 @@ func (a *AuthenticatorOAuth2) authorizationCodeFlow(ctx context.Context, conf *o return g.Error("authentication_url is required for authorization_code flow") } - // Use dynamic port for local server - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return g.Error(err, "failed to listen on dynamic port") + // Determine redirect URL and local server port + var ln net.Listener + var callbackPath string + + if conf.RedirectURL != "" { + // Use the configured redirect_uri from the spec/connection + u, err := url.Parse(conf.RedirectURL) + if err != nil { + return g.Error(err, "failed to parse redirect_uri: %s", conf.RedirectURL) + } + callbackPath = u.Path + if callbackPath == "" { + callbackPath = "/callback" + } + + listenPort := u.Port() + if listenPort == "" { + listenPort = "80" // default HTTP port + } + + ln, err = net.Listen("tcp", "127.0.0.1:"+listenPort) + if err != nil { + return g.Error(err, "failed to listen on port %s (from redirect_uri)", listenPort) + } + } else { + // No redirect_uri configured, use dynamic port + var err error + ln, err = net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return g.Error(err, "failed to listen on dynamic port") + } + callbackPath = "/callback" + port := ln.Addr().(*net.TCPAddr).Port + conf.RedirectURL = fmt.Sprintf("http://localhost:%d/callback", port) } - port := ln.Addr().(*net.TCPAddr).Port - conf.RedirectURL = fmt.Sprintf("http://localhost:%d/callback", port) // Generate state and PKCE if applicable stateVal := generateRandomState() @@ -559,7 +597,8 @@ func (a *AuthenticatorOAuth2) authorizationCodeFlow(ctx context.Context, conf *o server := http.Server{} // Callback handler - http.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) { + mux := http.NewServeMux() + mux.HandleFunc(callbackPath, func(w http.ResponseWriter, r *http.Request) { if r.URL.Query().Get("state") != stateVal { fmt.Fprintf(w, `

Authorization Failed

Invalid state parameter.

`) errorChan <- g.Error("invalid state in callback") @@ -577,7 +616,7 @@ func (a *AuthenticatorOAuth2) authorizationCodeFlow(ctx context.Context, conf *o // Start server go func() { - server.Handler = http.DefaultServeMux + server.Handler = mux if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { errorChan <- g.Error(err, "failed to start callback server") } diff --git a/core/dbio/connection/connection_test.go b/core/dbio/connection/connection_test.go index b6b0c3a4f..c1117e5ff 100644 --- a/core/dbio/connection/connection_test.go +++ b/core/dbio/connection/connection_test.go @@ -34,6 +34,7 @@ func TestConnectionDiscover(t *testing.T) { "../../../cmd/sling/tests/files/test6.csv", "../../../cmd/sling/tests/files/test7.csv", "../../../cmd/sling/tests/files/test8.csv", + "../../../cmd/sling/tests/files/test_wide_columns.csv", }, }, { diff --git a/core/dbio/database/database_bigquery.go b/core/dbio/database/database_bigquery.go index 88632de8e..103999fda 100755 --- a/core/dbio/database/database_bigquery.go +++ b/core/dbio/database/database_bigquery.go @@ -1258,7 +1258,7 @@ func (conn *BigQueryConn) GetSchemata(level SchemataLevel, schemaName string, ta data.Columns = iop.NewColumnsFromFields("schema_name") data.Append([]any{values["schema"]}) case SchemataLevelTable: - data, err = conn.GetTablesAndViews(schemaName) + data, err = conn.GetTablesAndViews(cast.ToString(values["schema"])) case SchemataLevelColumn: data, err = conn.SubmitTemplate( "single", conn.template.Metadata, "schemata", diff --git a/core/dbio/database/database_fabric.go b/core/dbio/database/database_fabric.go index cd75e0113..b99ca6d6f 100644 --- a/core/dbio/database/database_fabric.go +++ b/core/dbio/database/database_fabric.go @@ -98,7 +98,7 @@ func (conn *MsFabricConn) makeABFSClient() (fs filesys.FileSysClient, err error) } // Authentication properties - pass through from connection - for _, key := range []string{"ACCOUNT_KEY", "SAS_SVC_URL", "CLIENT_ID", "TENANT_ID", "CLIENT_SECRET"} { + for _, key := range []string{"ACCOUNT_KEY", "SAS_SVC_URL", "CLIENT_ID", "TENANT_ID", "CLIENT_SECRET", "CLIENT_CERTIFICATE_PATH", "CLIENT_CERTIFICATE_PASSWORD"} { if val := conn.GetProp(key); val != "" { abfsProps = append(abfsProps, key+"="+val) } diff --git a/core/dbio/database/database_oracle.go b/core/dbio/database/database_oracle.go index 9c5d87820..6979d2251 100755 --- a/core/dbio/database/database_oracle.go +++ b/core/dbio/database/database_oracle.go @@ -343,7 +343,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui } retry: - proc := exec.Command( + proc := exec.CommandContext(ds.Context.Ctx, conn.sqlldrPath(), "'"+credHost+"'", "control="+ctlPath, @@ -353,6 +353,7 @@ retry: "log="+logPath, "bad="+logPath, ) + proc.WaitDelay = 15 * time.Second ds.SetConfig(conn.Props()) proc.Stderr = &stderr @@ -377,6 +378,17 @@ retry: goto retry } + errOutput := stderr.String() + stdout.String() + + // detect OOM specifically so users get actionable guidance + if strings.Contains(errOutput, "SQL*Loader-700") || strings.Contains(errOutput, "Out of memory") { + return ds.Count, g.Error( + err, + "SQL*Loader ran out of memory (SQL*Loader-700). Try reducing batch size or the char() allocation size for string columns.\n\nOutput:\n%s", + errOutput, + ) + } + err = g.Error( err, fmt.Sprintf( @@ -458,7 +470,9 @@ func (conn *OracleConn) sqlLoadCsvReader(ds *iop.Datastream, pu *cmap.Concurrent go func() { defer pipeW.Close() err := conn.writeCsv(ds, pipeW, pu) - if err != nil { + if err != nil && !strings.Contains(err.Error(), io.ErrClosedPipe.Error()) { + ds.Context.CaptureErr(g.Error(err, "error writing to sqlldr stdin pipe")) + ds.Context.Cancel() return } }() diff --git a/core/dbio/filesys/fs_azure.go b/core/dbio/filesys/fs_azure.go index c746a0962..2909e3a4e 100644 --- a/core/dbio/filesys/fs_azure.go +++ b/core/dbio/filesys/fs_azure.go @@ -39,7 +39,7 @@ func (fs *AzureFileSysClient) Init(ctx context.Context) (err error) { } // service principal keys that need to be set in env var - for _, key := range g.ArrStr("CLIENT_ID", "TENANT_ID", "CLIENT_CERTIFICATE_PATH", "CLIENT_CERTIFICATE_PASSWORD") { + for _, key := range g.ArrStr("CLIENT_ID", "TENANT_ID", "CLIENT_SECRET", "CLIENT_CERTIFICATE_PATH", "CLIENT_CERTIFICATE_PASSWORD") { if val := fs.GetProp(key); val != "" { os.Setenv("AZURE_"+key, val) } diff --git a/core/dbio/iop/csv.go b/core/dbio/iop/csv.go index 6e13db762..df5c8007c 100755 --- a/core/dbio/iop/csv.go +++ b/core/dbio/iop/csv.go @@ -41,7 +41,7 @@ type CSV struct { // CleanHeaderRow cleans the header row from incompatible characters func CleanHeaderRow(header []string) []string { // replace any other chars than regex expression - regexAllow := *regexp.MustCompile(`[^a-zA-Z0-9_]`) + regexAllow := *regexp.MustCompile(`[^\p{L}\p{N}_]`) fieldMap := map[string]string{} transformer := transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC) diff --git a/core/dbio/iop/datatype.go b/core/dbio/iop/datatype.go index aed1c1f84..516c5674a 100755 --- a/core/dbio/iop/datatype.go +++ b/core/dbio/iop/datatype.go @@ -1416,6 +1416,15 @@ remap: nativeType = template.GeneralTypeMap["string"] } + // when column_typing string max_length is set and column is text, + // remap to string so the varchar() precision path is used + if ct.String != nil && ct.String.MaxLength > 0 && col.Type == TextType { + if stringType, ok := template.GeneralTypeMap["string"]; ok && strings.HasSuffix(stringType, "()") { + nativeType = stringType + col.Type = StringType + } + } + // Add precision as needed if strings.HasSuffix(nativeType, "()") { maxStringLength := cast.ToInt(template.Value("variable.max_string_length")) @@ -1454,6 +1463,19 @@ remap: } length = newLength } + } else if ct.String != nil { + // apply column_typing even for non-sourced columns (e.g. CSV/file sources) + if length <= 0 { + length = col.Stats.MaxLen * 2 + if length < 255 { + length = 255 + } + } + newLength := ct.String.Apply(length, maxStringLength) + if newLength != length { + g.Debug(` applied string length mapping for column "%s" (%d => %d)`, col.Name, length, newLength) + } + length = newLength } else if length <= 0 { length = col.Stats.MaxLen * 2 if length < 255 { @@ -1461,7 +1483,7 @@ remap: } } - if !isSourced && maxStringType != "" { + if !isSourced && ct.String == nil && maxStringType != "" { nativeType = maxStringType // use specified default } else if length >= maxStringLength { // let's make text since high @@ -2152,7 +2174,10 @@ type StringColumnTyping struct { } func (sct *StringColumnTyping) Apply(length, max int) (newLength int) { - if sct.MaxLength > max { + if sct.MaxLength > 0 && sct.MaxLength < max { + // cap at MaxLength when it's smaller than the DB's native max + max = sct.MaxLength + } else if sct.MaxLength > max { max = sct.MaxLength } if max == 0 { @@ -2178,6 +2203,11 @@ func (sct *StringColumnTyping) Apply(length, max int) (newLength int) { return sct.MinLength } + // cap at max_length when set + if sct.MaxLength > 0 && length > max { + return max + } + return length } diff --git a/core/dbio/iop/datatype_test.go b/core/dbio/iop/datatype_test.go index a7c66d9b7..e2cb9183e 100755 --- a/core/dbio/iop/datatype_test.go +++ b/core/dbio/iop/datatype_test.go @@ -419,7 +419,7 @@ func TestColumnTyping(t *testing.T) { name: "string_max_length", column: Column{Name: "test", Type: StringType, Stats: ColumnStats{MaxLen: 200}}, columnTyping: ColumnTyping{String: &StringColumnTyping{MaxLength: 150}}, - expectedStringLength: 200, // original length since MaxLength doesn't override max + expectedStringLength: 150, // MaxLength caps the column length }, { name: "string_use_max", diff --git a/core/sling/config.go b/core/sling/config.go index 309cc7fcf..9e0e9e32b 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -317,6 +317,7 @@ func (cfg *Config) ClearTableForChunkLoadWithRange() (err error) { if err != nil { return g.Error(err, "could not connect to target conn for preparing final table for chunk loading") } + defer dbConn.Close() switch cfg.Mode { case FullRefreshMode: diff --git a/core/sling/replication.go b/core/sling/replication.go index 1779799c7..f931fb1c4 100644 --- a/core/sling/replication.go +++ b/core/sling/replication.go @@ -726,6 +726,7 @@ func (rd *ReplicationConfig) ProcessChunks() (err error) { if err != nil { return g.Error(err) } + defer sourceConnDB.Close() for i, stream := range streamsToChunk { chunkSize := cast.ToString(stream.config.SourceOptions.ChunkSize) @@ -768,7 +769,7 @@ func (rd *ReplicationConfig) ProcessChunks() (err error) { if chunkExpr != "" { // no update_key needed for chunking by expression } else if stream.config.UpdateKey == "" { - return g.Error(err, "did not provide update_key for stream chunking: %s", stream.name) + return g.Error("did not provide update_key for stream chunking: %s", stream.name) } else if stream.config.Mode == IncrementalMode { // need to get the max value target side if the table exists var tempCfg Config @@ -781,6 +782,7 @@ func (rd *ReplicationConfig) ProcessChunks() (err error) { if err != nil { return g.Error(err, "could not connect to target database for stream chunking: %s", stream.name) } + defer tgtConn.Close() err = getIncrementalValueViaDB(&tempCfg, tgtConn, sourceConnDB.GetType()) if err != nil { return g.Error(err, "could not get max range value in target database for stream chunking: %s", stream.name)