Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var connMap = map[dbio.Type]connTest{
dbio.TypeDbIceberg: {name: "iceberg_r2", adjustCol: g.Bool(false)},
dbio.TypeDbMySQL: {name: "mysql", schema: "mysql"},
dbio.TypeDbOracle: {name: "oracle", schema: "oracle", useBulk: g.Bool(false)},
dbio.Type("oracle_sqlldr"): {name: "oracle", schema: "oracle", useBulk: g.Bool(true)},
dbio.Type("oracle_sqlldr"): {name: "oracle", schema: "oracle", useBulk: g.Bool(true), adjustCol: g.Bool(false)},
dbio.TypeDbPostgres: {name: "postgres"},
dbio.Type("postgres_adbc"): {name: "postgres_adbc", adjustCol: g.Bool(false)},
dbio.TypeDbRedshift: {name: "redshift", adjustCol: g.Bool(false)},
Expand Down
6 changes: 6 additions & 0 deletions cmd/sling/tests/files/test_wide_columns.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
id,col_001,col_002,col_003,col_004,col_005,col_006,col_007,col_008,col_009,col_010,col_011,col_012,col_013,col_014,col_015,col_016,col_017,col_018,col_019,col_020,col_021,col_022,col_023,col_024,col_025,col_026,col_027,col_028,col_029,col_030,col_031,col_032,col_033,col_034,col_035,col_036,col_037,col_038,col_039,col_040,col_041,col_042,col_043,col_044,col_045,col_046,col_047,col_048,col_049,col_050
1,value_1_1,value_1_2,value_1_3,value_1_4,value_1_5,value_1_6,value_1_7,value_1_8,value_1_9,value_1_10,value_1_11,value_1_12,value_1_13,value_1_14,value_1_15,value_1_16,value_1_17,value_1_18,value_1_19,value_1_20,value_1_21,value_1_22,value_1_23,value_1_24,value_1_25,value_1_26,value_1_27,value_1_28,value_1_29,value_1_30,value_1_31,value_1_32,value_1_33,value_1_34,value_1_35,value_1_36,value_1_37,value_1_38,value_1_39,value_1_40,value_1_41,value_1_42,value_1_43,value_1_44,value_1_45,value_1_46,value_1_47,value_1_48,value_1_49,value_1_50
2,value_2_1,value_2_2,value_2_3,value_2_4,value_2_5,value_2_6,value_2_7,value_2_8,value_2_9,value_2_10,value_2_11,value_2_12,value_2_13,value_2_14,value_2_15,value_2_16,value_2_17,value_2_18,value_2_19,value_2_20,value_2_21,value_2_22,value_2_23,value_2_24,value_2_25,value_2_26,value_2_27,value_2_28,value_2_29,value_2_30,value_2_31,value_2_32,value_2_33,value_2_34,value_2_35,value_2_36,value_2_37,value_2_38,value_2_39,value_2_40,value_2_41,value_2_42,value_2_43,value_2_44,value_2_45,value_2_46,value_2_47,value_2_48,value_2_49,value_2_50
3,value_3_1,value_3_2,value_3_3,value_3_4,value_3_5,value_3_6,value_3_7,value_3_8,value_3_9,value_3_10,value_3_11,value_3_12,value_3_13,value_3_14,value_3_15,value_3_16,value_3_17,value_3_18,value_3_19,value_3_20,value_3_21,value_3_22,value_3_23,value_3_24,value_3_25,value_3_26,value_3_27,value_3_28,value_3_29,value_3_30,value_3_31,value_3_32,value_3_33,value_3_34,value_3_35,value_3_36,value_3_37,value_3_38,value_3_39,value_3_40,value_3_41,value_3_42,value_3_43,value_3_44,value_3_45,value_3_46,value_3_47,value_3_48,value_3_49,value_3_50
4,value_4_1,value_4_2,value_4_3,value_4_4,value_4_5,value_4_6,value_4_7,value_4_8,value_4_9,value_4_10,value_4_11,value_4_12,value_4_13,value_4_14,value_4_15,value_4_16,value_4_17,value_4_18,value_4_19,value_4_20,value_4_21,value_4_22,value_4_23,value_4_24,value_4_25,value_4_26,value_4_27,value_4_28,value_4_29,value_4_30,value_4_31,value_4_32,value_4_33,value_4_34,value_4_35,value_4_36,value_4_37,value_4_38,value_4_39,value_4_40,value_4_41,value_4_42,value_4_43,value_4_44,value_4_45,value_4_46,value_4_47,value_4_48,value_4_49,value_4_50
5,value_5_1,value_5_2,value_5_3,value_5_4,value_5_5,value_5_6,value_5_7,value_5_8,value_5_9,value_5_10,value_5_11,value_5_12,value_5_13,value_5_14,value_5_15,value_5_16,value_5_17,value_5_18,value_5_19,value_5_20,value_5_21,value_5_22,value_5_23,value_5_24,value_5_25,value_5_26,value_5_27,value_5_28,value_5_29,value_5_30,value_5_31,value_5_32,value_5_33,value_5_34,value_5_35,value_5_36,value_5_37,value_5_38,value_5_39,value_5_40,value_5_41,value_5_42,value_5_43,value_5_44,value_5_45,value_5_46,value_5_47,value_5_48,value_5_49,value_5_50
97 changes: 97 additions & 0 deletions cmd/sling/tests/pipelines/p.19.duckdb_chunk_lock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Test DuckDB chunking file lock (issue #717)
# When using chunk_size with DuckDB target and wildcard streams,
# the connection opened during ClearTableForChunkLoadWithRange (or other compile steps)
# must be properly closed before chunk tasks open theirs.

steps:
# 1. Create source tables in Postgres
- id: setup
connection: postgres
query: |
DROP TABLE IF EXISTS public.test_duckdb_chunk_lock;
DROP TABLE IF EXISTS public.test_duckdb_chunk_lock_other;
CREATE TABLE public.test_duckdb_chunk_lock AS
SELECT generate_series AS id, 'name_' || generate_series AS name
FROM generate_series(1, 1000);
CREATE TABLE public.test_duckdb_chunk_lock_other AS
SELECT generate_series AS id, 'other_' || generate_series AS val
FROM generate_series(1, 100);

- log: "Created source tables (1000 + 100 rows)"

# 2. Run replication with explicit streams, one chunked
- replication:
source: postgres
target: DUCKDB
defaults:
mode: full-refresh
target_options:
use_bulk: false
env:
SLING_THREADS: 1
streams:
public.test_duckdb_chunk_lock_other:
object: main.test_duckdb_chunk_lock_other
public.test_duckdb_chunk_lock:
object: main.test_duckdb_chunk_lock
primary_key: [id]
update_key: id
source_options:
chunk_size: 250
on_failure: abort

- log: "Replication with explicit streams completed"

# 3. Run replication with wildcard (like the user's public.*)
# This is the scenario that triggers the issue
- replication:
source: postgres
target: DUCKDB
defaults:
mode: full-refresh
object: "main.{stream_table}"
target_options:
use_bulk: false
source_options:
chunk_size: 250
env:
SLING_THREADS: 1
streams:
"public.test_duckdb_chunk_lock*":
primary_key: [id]
update_key: id
on_failure: abort

- log: "Replication with wildcard completed"

# 4. Verify all rows arrived
- connection: DUCKDB
query: SELECT count(*) as cnt FROM main.test_duckdb_chunk_lock
into: result

- log: "DuckDB chunk table row count: {store.result[0].cnt}"

- check: int_parse(store.result[0].cnt) == 1000
failure_message: "Expected 1000 rows, got {store.result[0].cnt}"

- connection: DUCKDB
query: SELECT count(*) as cnt FROM main.test_duckdb_chunk_lock_other
into: result_other

- log: "DuckDB other table row count: {store.result_other[0].cnt}"

- check: int_parse(store.result_other[0].cnt) == 100
failure_message: "Expected 100 rows, got {store.result_other[0].cnt}"

- log: "SUCCESS: DuckDB chunking with chunk_size works (issue #717)"

# 5. Cleanup
- connection: DUCKDB
query: |
DROP TABLE IF EXISTS main.test_duckdb_chunk_lock;
DROP TABLE IF EXISTS main.test_duckdb_chunk_lock_other;

- connection: postgres
query: |
DROP TABLE IF EXISTS public.test_duckdb_chunk_lock;
DROP TABLE IF EXISTS public.test_duckdb_chunk_lock_other;
177 changes: 177 additions & 0 deletions cmd/sling/tests/pipelines/p.20.oracle_sqlldr_column_typing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Reproduce: Oracle column_typing and explicit columns type not translating correctly
# Three scenarios:
# 1. column_typing with max_length:250 + use_bulk:true → should produce VARCHAR2(250), not CLOB
# 2. column_typing with max_length:250 + use_bulk:false → should produce VARCHAR2(250), not CLOB
# 3. explicit columns: { col_001: string(100) } → should produce VARCHAR2(100), not CLOB

steps:
# 0. Cleanup all test tables
- connection: oracle
query: |
BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_BULK'; EXCEPTION WHEN OTHERS THEN NULL; END;
- connection: oracle
query: |
BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_NOBULK'; EXCEPTION WHEN OTHERS THEN NULL; END;
- connection: oracle
query: |
BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_COLS'; EXCEPTION WHEN OTHERS THEN NULL; END;

# ── Stream 1: column_typing + use_bulk: true ──
- replication:
source: local
target: oracle
defaults:
mode: full-refresh
streams:
file://cmd/sling/tests/files/test_wide_columns.csv:
object: SYSTEM.TEST_TYPING_BULK
target_options:
use_bulk: true
column_typing:
string:
max_length: 250

- connection: oracle
query: |
SELECT column_name, data_type, data_length
FROM all_tab_columns
WHERE table_name = 'TEST_TYPING_BULK' AND owner = 'SYSTEM'
AND column_name LIKE 'COL_%'
ORDER BY column_name
into: bulk_cols

- log: |
Stream 1 (column_typing + use_bulk=true):
{pretty_table(store.bulk_cols)}

- check: int_parse(store.bulk_cols[0].data_length) <= 250
failure_message: "BULK: column_typing max_length:250 not respected. COL_001 data_length={store.bulk_cols[0].data_length} (expected <= 250)"
on_failure: warn

- connection: oracle
query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_TYPING_BULK
into: bulk_count

- check: int_parse(store.bulk_count[0].cnt) == 5
failure_message: "BULK: Expected 5 rows, got {store.bulk_count[0].cnt}"
on_failure: warn

- log: "Stream 1 done"

# ── Stream 2: column_typing + use_bulk: false ──
- replication:
source: local
target: oracle
defaults:
mode: full-refresh
streams:
file://cmd/sling/tests/files/test_wide_columns.csv:
object: SYSTEM.TEST_TYPING_NOBULK
target_options:
use_bulk: false
column_typing:
string:
max_length: 250

- connection: oracle
query: |
SELECT column_name, data_type, data_length
FROM all_tab_columns
WHERE table_name = 'TEST_TYPING_NOBULK' AND owner = 'SYSTEM'
AND column_name LIKE 'COL_%'
ORDER BY column_name
into: nobulk_cols

- log: |
Stream 2 (column_typing + use_bulk=false):
{pretty_table(store.nobulk_cols)}

- check: int_parse(store.nobulk_cols[0].data_length) <= 250
failure_message: "NOBULK: column_typing max_length:250 not respected. COL_001 data_length={store.nobulk_cols[0].data_length} (expected <= 250)"
on_failure: warn

- connection: oracle
query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_TYPING_NOBULK
into: nobulk_count

- check: int_parse(store.nobulk_count[0].cnt) == 5
failure_message: "NOBULK: Expected 5 rows, got {store.nobulk_count[0].cnt}"
on_failure: warn

- log: "Stream 2 done"

# ── Stream 3: explicit columns with string(100) ──
- replication:
source: local
target: oracle
defaults:
mode: full-refresh
streams:
file://cmd/sling/tests/files/test_wide_columns.csv:
object: SYSTEM.TEST_TYPING_COLS
columns:
col_001: string(100)
col_002: string(200)

- connection: oracle
query: |
SELECT column_name, data_type, data_length
FROM all_tab_columns
WHERE table_name = 'TEST_TYPING_COLS' AND owner = 'SYSTEM'
AND column_name IN ('COL_001', 'COL_002', 'COL_003')
ORDER BY column_name
into: cols_info

- log: |
Stream 3 (explicit columns string(100)/string(200)):
{pretty_table(store.cols_info)}

- check: int_parse(store.cols_info[0].data_length) == 100
failure_message: "COLS: COL_001 should be VARCHAR2(100), got data_length={store.cols_info[0].data_length}"
on_failure: warn

- check: int_parse(store.cols_info[1].data_length) == 200
failure_message: "COLS: COL_002 should be VARCHAR2(200), got data_length={store.cols_info[1].data_length}"
on_failure: warn

- connection: oracle
query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_TYPING_COLS
into: cols_count

- check: int_parse(store.cols_count[0].cnt) == 5
failure_message: "COLS: Expected 5 rows, got {store.cols_count[0].cnt}"
on_failure: warn

- log: "Stream 3 done"

# ── Final validation (strict) ──
- check: int_parse(store.bulk_cols[0].data_length) <= 250
failure_message: "FAIL BULK: COL_001 data_length={store.bulk_cols[0].data_length} (expected <= 250)"

- log: "SUCCESS: Stream 1 (column_typing + use_bulk=true) passed"

- check: int_parse(store.nobulk_cols[0].data_length) <= 250
failure_message: "FAIL NOBULK: COL_001 data_length={store.nobulk_cols[0].data_length} (expected <= 250)"

- log: "SUCCESS: Stream 2 (column_typing + use_bulk=false) passed"

- check: int_parse(store.cols_info[0].data_length) == 100
failure_message: "FAIL COLS: COL_001 data_length={store.cols_info[0].data_length} (expected 100)"

- check: int_parse(store.cols_info[1].data_length) == 200
failure_message: "FAIL COLS: COL_002 data_length={store.cols_info[1].data_length} (expected 200)"

- log: "SUCCESS: Stream 3 (explicit columns) passed"

# ── Cleanup ──
- connection: oracle
query: |
BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_BULK'; EXCEPTION WHEN OTHERS THEN NULL; END;
- connection: oracle
query: |
BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_NOBULK'; EXCEPTION WHEN OTHERS THEN NULL; END;
- connection: oracle
query: |
BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TYPING_COLS'; EXCEPTION WHEN OTHERS THEN NULL; END;

- log: "Oracle column_typing test complete"
Loading