diff --git a/cmd/sling/sling_test.go b/cmd/sling/sling_test.go index 47fcb7783..d8e045e94 100755 --- a/cmd/sling/sling_test.go +++ b/cmd/sling/sling_test.go @@ -859,9 +859,6 @@ func runOneTask(t *testing.T, ctx context.Context, file g.FileItem, connType dbi if srcType == dbio.TypeDbMariaDB && strings.EqualFold(colName, "json_data") { correctType = iop.TextType // mariadb's `json` type is `longtext` } - if srcType == dbio.TypeDbStarRocks && strings.EqualFold(colName, "json_data") { - correctType = iop.TextType // starrocks's `json` type is `varchar(65500)` - } case tgtType.IsMySQLLike(): if g.In(correctType, iop.TimestampType, iop.TimestampzType) { correctType = iop.DatetimeType // mysql/mariadb uses datetime diff --git a/cmd/sling/tests/pipelines/p.23.ternary_length_mixed_types.yaml b/cmd/sling/tests/pipelines/p.23.ternary_length_mixed_types.yaml new file mode 100644 index 000000000..eb1e309d9 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.23.ternary_length_mixed_types.yaml @@ -0,0 +1,139 @@ +# Reproduce: Wildcard transform with type_of + length fails on non-string columns +# Issue: goval ternary operator does NOT short-circuit, so both branches are always +# evaluated. When using a wildcard transform like: +# type_of(value) == "string" ? (length(value) > 509 ? substring(value,0,505) + "..." : value) : value +# length(value) is called on ALL columns regardless of type, causing: +# "function error: 'length' - cannot get length of type int64" +# +# User report: Oracle -> Postgres, wants to truncate long strings via wildcard transform +# without having to define per-column transforms. + +steps: + # 0. Cleanup + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TERNARY_LENGTH'; EXCEPTION WHEN OTHERS THEN NULL; END; + + - connection: POSTGRES + query: DROP TABLE IF EXISTS public.test_ternary_length + + # 1. Create Oracle source table with mixed types (string, integer, date, number) + - connection: oracle + query: | + CREATE TABLE SYSTEM.TEST_TERNARY_LENGTH ( + id NUMBER(10), + short_name VARCHAR2(50), + long_description VARCHAR2(4000), + amount NUMBER(12,2), + created_at DATE + ) + + - connection: oracle + query: | + INSERT INTO SYSTEM.TEST_TERNARY_LENGTH VALUES ( + 1, + 'Alice', + 'This is a short description', + 123.45, + TO_DATE('2025-01-15', 'YYYY-MM-DD') + ) + + - connection: oracle + query: | + INSERT INTO SYSTEM.TEST_TERNARY_LENGTH VALUES ( + 2, + 'Bob', + RPAD('Very long text that exceeds 509 characters. ', 600, 'ABCDEFGHIJ'), + 99999.99, + TO_DATE('2025-06-20', 'YYYY-MM-DD') + ) + + - connection: oracle + query: | + INSERT INTO SYSTEM.TEST_TERNARY_LENGTH VALUES ( + 3, + 'Charlie', + NULL, + 0.00, + NULL + ) + + - connection: oracle + query: COMMIT + + - log: "Created Oracle source table with mixed types (integer, string, number, date)" + + # 2. Replicate Oracle -> Postgres with wildcard transform using type_of + length + # This should fail because goval evaluates length(value) even on non-string columns + - replication: + source: oracle + target: POSTGRES + defaults: + mode: full-refresh + streams: + SYSTEM.TEST_TERNARY_LENGTH: + object: public.test_ternary_length + transforms: + - '*': 'type_of(value) == "string" ? (length(value) > 509 ? substring(value,0,505) + "..." : value) : value' + on_failure: warn + + # 3. Verify data landed in Postgres + - connection: POSTGRES + query: SELECT count(*) as cnt FROM public.test_ternary_length + into: row_count + + - log: "Row count => {store.row_count[0].cnt}" + + - check: int_parse(store.row_count[0].cnt) == 3 + failure_message: "Expected 3 rows, got {store.row_count[0].cnt}" + + # 4. Verify long string was truncated (505 chars + "..." = 508 chars) + # Note: wildcard transform converts all columns to string type, + # so we use string comparisons for the id column. + - connection: POSTGRES + query: | + SELECT id, length(long_description) as desc_len + FROM public.test_ternary_length + WHERE id = '2' + into: long_row + + - log: "Long string length for id=2 => {store.long_row[0].desc_len}" + + - check: int_parse(store.long_row[0].desc_len) == 508 + failure_message: "Expected truncated length 508, got {store.long_row[0].desc_len}" + + # 5. Verify short string was NOT truncated + - connection: POSTGRES + query: | + SELECT long_description + FROM public.test_ternary_length + WHERE id = '1' + into: short_row + + - check: store.short_row[0].long_description == "This is a short description" + failure_message: "Short string was incorrectly modified" + + # 6. Verify integer and numeric columns survived the transform + - connection: POSTGRES + query: | + SELECT id, amount + FROM public.test_ternary_length + WHERE id = '1' + into: numeric_row + + - log: "Numeric values => id={store.numeric_row[0].id}, amount={store.numeric_row[0].amount}" + + - check: store.numeric_row[0].id == "1" + failure_message: "Integer column 'id' value incorrect, got {store.numeric_row[0].id}" + + - log: "SUCCESS: Wildcard transform with type_of + length works on mixed-type columns" + + # 7. Cleanup + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_TERNARY_LENGTH'; EXCEPTION WHEN OTHERS THEN NULL; END; + + - connection: POSTGRES + query: DROP TABLE IF EXISTS public.test_ternary_length + + - log: "Ternary length mixed types test complete" diff --git a/cmd/sling/tests/pipelines/p.25.oracle_sqlldr_char_sizing.yaml b/cmd/sling/tests/pipelines/p.25.oracle_sqlldr_char_sizing.yaml new file mode 100644 index 000000000..dd99d3cd9 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.25.oracle_sqlldr_char_sizing.yaml @@ -0,0 +1,75 @@ +# Reproduce: sqlldr ctl file uses char(400000) even when column_typing.string.max_length is set +# Issue: When replicating CSV to Oracle with use_bulk=true and column_typing.string.max_length=255, +# the Oracle table columns get correct VARCHAR2 sizing, but the sqlldr control file still +# hardcodes char(400000) for all string columns. This causes sqlldr to pre-allocate massive +# memory (10s of GB for moderately sized tables). +# +# The bug is in getColumnsString() in database_oracle.go: +# char(400000) is always used unless col.DbPrecision > 400000 +# It should respect col.DbPrecision when it's set (e.g. from column_typing). +# +# This test: +# 1. Replicates a CSV file to Oracle with use_bulk=true and column_typing max_length=255 +# 2. Verifies the table was created with correct column sizing (< 4000) +# 3. Checks debug output does NOT contain char(400000) in the sqlldr ctl + +steps: + # 0. Cleanup + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_SQLLDR_CHAR'; EXCEPTION WHEN OTHERS THEN NULL; END; + + # 1. Replicate CSV to Oracle with column_typing + use_bulk=true + - replication: + source: local + target: oracle + defaults: + mode: full-refresh + streams: + file://cmd/sling/tests/files/test_wide_columns.csv: + object: SYSTEM.TEST_SQLLDR_CHAR + target_options: + use_bulk: true + column_typing: + string: + max_length: 255 + min_length: 50 + length_factor: 1 + + # 2. Verify table columns are correctly sized (not VARCHAR2(4000)) + - connection: oracle + query: | + SELECT column_name, data_type, data_length + FROM all_tab_columns + WHERE table_name = 'TEST_SQLLDR_CHAR' AND owner = 'SYSTEM' + AND column_name LIKE 'COL_%' + ORDER BY column_name + into: col_info + + - log: | + Oracle column metadata: + {pretty_table(store.col_info)} + + # 3. Verify row count + - connection: oracle + query: SELECT COUNT(*) as cnt FROM SYSTEM.TEST_SQLLDR_CHAR + into: row_count + + - check: int_parse(store.row_count[0].cnt) == 5 + failure_message: "Expected 5 rows, got {store.row_count[0].cnt}" + + # 4. Verify table columns respect column_typing (should be <= 255) + - check: int_parse(store.col_info[0].data_length) <= 255 + failure_message: "COL_001 data_length={store.col_info[0].data_length} (expected <= 255 from column_typing)" + + - log: "SUCCESS: Oracle table columns correctly sized from column_typing" + + # 5. The real bug check happens via output_does_not_contain in suite.cli.yaml: + # The debug output prints "sqlldr ctl file content" which should NOT contain char(400000) + # when column_typing.string.max_length=255 is set. + - log: "Oracle sqlldr char sizing test complete" + + # 6. Cleanup + - connection: oracle + query: | + BEGIN EXECUTE IMMEDIATE 'DROP TABLE SYSTEM.TEST_SQLLDR_CHAR'; EXCEPTION WHEN OTHERS THEN NULL; END; diff --git a/cmd/sling/tests/pipelines/p.26.duckdb_arrow_ipc_output.yaml b/cmd/sling/tests/pipelines/p.26.duckdb_arrow_ipc_output.yaml new file mode 100644 index 000000000..b2a4b4569 --- /dev/null +++ b/cmd/sling/tests/pipelines/p.26.duckdb_arrow_ipc_output.yaml @@ -0,0 +1,26 @@ +steps: + # Read local parquet via Arrow IPC mode and write to temp CSV + - replication: + source: LOCAL + target: LOCAL + defaults: + mode: full-refresh + streams: + cmd/sling/tests/files/test1.parquet: + object: file:///tmp/sling/arrow_test_output.csv + + # Verify the output file has data rows (header + data rows) + # test1.parquet has 1000 data rows, CSV output has 1 header + 1000 rows + - type: command + command: | + data_lines=$(tail -n +2 /tmp/sling/arrow_test_output.csv | grep -c .) + echo "Arrow IPC output data row count: $data_lines" + if [ "$data_lines" -ge 999 ]; then + echo "SUCCESS: DuckDB Arrow IPC output produced correct row count" + else + echo "FAIL: Expected at least 999 data rows, got $data_lines" + exit 1 + fi + + - type: log + message: "DuckDB Arrow IPC output test complete" diff --git a/cmd/sling/tests/replications/r.109.mysql_snowflake_tinyint_boolean.yaml b/cmd/sling/tests/replications/r.109.mysql_snowflake_tinyint_boolean.yaml new file mode 100644 index 000000000..3c6124ab1 --- /dev/null +++ b/cmd/sling/tests/replications/r.109.mysql_snowflake_tinyint_boolean.yaml @@ -0,0 +1,131 @@ +# Test for MySQL TINYINT vs TINYINT(1) type mapping to Snowflake +# Issue: MySQL TINYINT columns with values beyond 0/1 (e.g. 1, 2, 4) were mapped +# to BOOLEAN in Snowflake, causing all non-zero values to become TRUE. +# Fix: TINYINT maps to SMALLINT (integer), TINYINT(1) maps to BOOLEAN. +source: mysql +target: snowflake + +defaults: + mode: full-refresh + +hooks: + start: + # Create source table in MySQL with both TINYINT and TINYINT(1) columns + - type: query + connection: '{source.name}' + query: | + DROP TABLE IF EXISTS mysql.tinyint_bool_test; + CREATE TABLE mysql.tinyint_bool_test ( + id INT PRIMARY KEY, + status TINYINT, + flag TINYINT(1) + ); + INSERT INTO mysql.tinyint_bool_test (id, status, flag) VALUES + (1, 1, 1), + (2, 2, 0), + (3, 4, 1), + (4, 0, 0), + (5, NULL, NULL); + + - type: query + connection: '{source.name}' + query: SELECT id, status, flag FROM mysql.tinyint_bool_test ORDER BY id + into: source_data + + - type: log + message: | + Source data (MySQL): + {pretty_table(store.source_data)} + + end: + # Check execution succeeded + - type: check + check: execution.status.error == 0 + on_failure: break + + # ===== Verify values in Snowflake ===== + + - type: query + connection: '{target.name}' + query: SELECT id, status, flag FROM public.tinyint_bool_test ORDER BY id + into: result + + - type: log + message: | + Snowflake result data: + {pretty_table(store.result)} + + # Check column types in Snowflake + - type: query + connection: '{target.name}' + query: | + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = 'PUBLIC' + AND table_name = 'TINYINT_BOOL_TEST' + AND column_name IN ('STATUS', 'FLAG') + ORDER BY column_name + into: col_types + + - type: log + message: | + Snowflake column types: + {pretty_table(store.col_types)} + + # Verify row count + - type: check + check: length(store.result) == 5 + failure_message: "Expected 5 rows but found {length(store.result)}" + + # ===== Verify STATUS column (TINYINT -> SMALLINT, integer values preserved) ===== + + - type: check + check: int_parse(store.result[0].status) == 1 + failure_message: "Row 1 status should be 1, got {store.result[0].status}" + + # Key test: value=2 must NOT become TRUE/1 + - type: check + check: int_parse(store.result[1].status) == 2 + failure_message: "Row 2 status should be 2, got {store.result[1].status} (TINYINT value lost due to BOOLEAN mapping)" + + # Key test: value=4 must NOT become TRUE/1 + - type: check + check: int_parse(store.result[2].status) == 4 + failure_message: "Row 3 status should be 4, got {store.result[2].status} (TINYINT value lost due to BOOLEAN mapping)" + + - type: check + check: int_parse(store.result[3].status) == 0 + failure_message: "Row 4 status should be 0, got {store.result[3].status}" + + - type: check + check: store.result[4].status == nil + failure_message: "Row 5 status should be NULL, got {store.result[4].status}" + + # ===== Verify FLAG column (TINYINT(1) -> BOOLEAN) ===== + + - type: check + check: store.result[0].flag == true || store.result[0].flag == "true" || store.result[0].flag == "1" + failure_message: "Row 1 flag should be true, got {store.result[0].flag}" + + - type: check + check: store.result[1].flag == false || store.result[1].flag == "false" || store.result[1].flag == "0" + failure_message: "Row 2 flag should be false, got {store.result[1].flag}" + + - type: log + message: "SUCCESS: MySQL TINYINT values correctly preserved in Snowflake (not collapsed to BOOLEAN)" + - type: log + message: "SUCCESS: MySQL TINYINT(1) correctly mapped to BOOLEAN in Snowflake" + + # Cleanup + - type: query + connection: '{source.name}' + query: DROP TABLE IF EXISTS mysql.tinyint_bool_test + + - type: query + connection: '{target.name}' + query: DROP TABLE IF EXISTS public.tinyint_bool_test + +streams: + mysql.tinyint_bool_test: + object: public.tinyint_bool_test + mode: full-refresh diff --git a/cmd/sling/tests/replications/r.110.mysql_starrocks_tinyint_boolean.yaml b/cmd/sling/tests/replications/r.110.mysql_starrocks_tinyint_boolean.yaml new file mode 100644 index 000000000..e0b118007 --- /dev/null +++ b/cmd/sling/tests/replications/r.110.mysql_starrocks_tinyint_boolean.yaml @@ -0,0 +1,137 @@ +# Test MySQL TINYINT vs TINYINT(1) type mapping to StarRocks +# StarRocks has native BOOLEAN type, plus TINYINT(1) alias. +# Verify: TINYINT -> SMALLINT (integer), TINYINT(1) -> BOOLEAN, native BOOLEAN -> BOOLEAN +source: mysql +target: starrocks + +defaults: + mode: full-refresh + +hooks: + start: + # Create source table in MySQL with BOOLEAN, TINYINT(1), and plain TINYINT columns + - type: query + connection: '{source.name}' + query: | + DROP TABLE IF EXISTS mysql.tinyint_bool_sr_test; + CREATE TABLE mysql.tinyint_bool_sr_test ( + id INT PRIMARY KEY, + bool_col BOOLEAN, + flag TINYINT(1), + status TINYINT + ); + INSERT INTO mysql.tinyint_bool_sr_test (id, bool_col, flag, status) VALUES + (1, true, 1, 1), + (2, false, 0, 2), + (3, true, 1, 4), + (4, false, 0, 0), + (5, NULL, NULL, NULL); + + - type: query + connection: '{source.name}' + query: SELECT id, bool_col, flag, status FROM mysql.tinyint_bool_sr_test ORDER BY id + into: source_data + + - type: log + message: | + Source data (MySQL): + {pretty_table(store.source_data)} + + end: + # Check execution succeeded + - type: check + check: execution.status.error == 0 + on_failure: break + + # ===== Verify values in StarRocks ===== + + - type: query + connection: '{target.name}' + query: SELECT id, bool_col, flag, status FROM public.tinyint_bool_sr_test ORDER BY id + into: result + + - type: log + message: | + StarRocks result data: + {pretty_table(store.result)} + + # Check column types in StarRocks + - type: query + connection: '{target.name}' + query: | + SELECT column_name, data_type, column_type + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'tinyint_bool_sr_test' + AND column_name IN ('bool_col', 'flag', 'status') + ORDER BY column_name + into: col_types + + - type: log + message: | + StarRocks column types: + {pretty_table(store.col_types)} + + # Verify row count + - type: check + check: length(store.result) == 5 + failure_message: "Expected 5 rows but found {length(store.result)}" + + # ===== Verify STATUS column (TINYINT -> SMALLINT, integer values preserved) ===== + + - type: check + check: int_parse(store.result[0].status) == 1 + failure_message: "Row 1 status should be 1, got {store.result[0].status}" + + - type: check + check: int_parse(store.result[1].status) == 2 + failure_message: "Row 2 status should be 2, got {store.result[1].status} (TINYINT value lost due to BOOLEAN mapping)" + + - type: check + check: int_parse(store.result[2].status) == 4 + failure_message: "Row 3 status should be 4, got {store.result[2].status} (TINYINT value lost due to BOOLEAN mapping)" + + - type: check + check: int_parse(store.result[3].status) == 0 + failure_message: "Row 4 status should be 0, got {store.result[3].status}" + + - type: check + check: store.result[4].status == nil + failure_message: "Row 5 status should be NULL, got {store.result[4].status}" + + # ===== Verify BOOL_COL and FLAG (TINYINT(1)/BOOLEAN -> BOOLEAN) ===== + + - type: check + check: int_parse(store.result[0].bool_col) == 1 + failure_message: "Row 1 bool_col should be truthy (1), got {store.result[0].bool_col}" + + - type: check + check: int_parse(store.result[1].bool_col) == 0 + failure_message: "Row 2 bool_col should be falsy (0), got {store.result[1].bool_col}" + + - type: check + check: int_parse(store.result[0].flag) == 1 + failure_message: "Row 1 flag should be truthy (1), got {store.result[0].flag}" + + - type: check + check: int_parse(store.result[1].flag) == 0 + failure_message: "Row 2 flag should be falsy (0), got {store.result[1].flag}" + + - type: log + message: "SUCCESS: MySQL TINYINT values correctly preserved as integers in StarRocks" + - type: log + message: "SUCCESS: MySQL TINYINT(1)/BOOLEAN correctly mapped to BOOLEAN in StarRocks" + + # Cleanup + - type: query + connection: '{source.name}' + query: DROP TABLE IF EXISTS mysql.tinyint_bool_sr_test + + - type: query + connection: '{target.name}' + query: DROP TABLE IF EXISTS public.tinyint_bool_sr_test + +streams: + mysql.tinyint_bool_sr_test: + object: public.tinyint_bool_sr_test + mode: full-refresh diff --git a/cmd/sling/tests/suite.cli.yaml b/cmd/sling/tests/suite.cli.yaml index 1be635f0c..722295893 100644 --- a/cmd/sling/tests/suite.cli.yaml +++ b/cmd/sling/tests/suite.cli.yaml @@ -2141,4 +2141,56 @@ name: 'DuckLake Unicode column names preserved when replicating to Postgres (issue #721)' run: 'sling run -d -p cmd/sling/tests/pipelines/p.22.ducklake_unicode_column_names.yaml' output_contains: - - 'SUCCESS: DuckLake Unicode column names preserved (issue #721)' \ No newline at end of file + - 'SUCCESS: DuckLake Unicode column names preserved (issue #721)' + +# Wildcard transform with type_of + length fails on non-string columns +# Issue: goval ternary does not short-circuit, so length(value) is called on +# all column types (int, date, etc.) even when guarded by type_of(value) == "string" +# User report: Oracle -> Postgres, truncate long strings via wildcard transform +- id: 223 + name: 'Wildcard transform type_of + length on mixed types (Oracle -> Postgres)' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.23.ternary_length_mixed_types.yaml' + output_contains: + - 'SUCCESS: Wildcard transform with type_of + length works on mixed-type columns' + - 'Ternary length mixed types test complete' + +# MySQL TINYINT columns with values > 1 (e.g. 1, 2, 4) are mapped to BOOLEAN +# in Snowflake, causing all non-zero values to collapse to TRUE. +# User report: full-refresh recreates table with BOOLEAN column, losing integer values. +- id: 224 + name: 'MySQL TINYINT vs TINYINT(1) type mapping to Snowflake' + run: 'sling run -d -r cmd/sling/tests/replications/r.109.mysql_snowflake_tinyint_boolean.yaml' + output_contains: + - 'SUCCESS: MySQL TINYINT values correctly preserved in Snowflake (not collapsed to BOOLEAN)' + - 'SUCCESS: MySQL TINYINT(1) correctly mapped to BOOLEAN in Snowflake' + +- id: 225 + name: 'MySQL TINYINT vs TINYINT(1) type mapping to StarRocks' + run: 'sling run -d -r cmd/sling/tests/replications/r.110.mysql_starrocks_tinyint_boolean.yaml' + output_contains: + - 'SUCCESS: MySQL TINYINT values correctly preserved as integers in StarRocks' + - 'SUCCESS: MySQL TINYINT(1)/BOOLEAN correctly mapped to BOOLEAN in StarRocks' + +# sqlldr ctl file hardcodes char(400000) even when column_typing.string.max_length is set. +# This causes sqlldr to pre-allocate massive memory for every string column. +# The table DDL gets correct VARCHAR2 sizing but the bulk load ctl does not. +# Ref: getColumnsString() in database_oracle.go always defaults to char(400000). +- id: 227 + name: 'Oracle sqlldr ctl respects column_typing max_length' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.25.oracle_sqlldr_char_sizing.yaml' + output_contains: + - 'SUCCESS: Oracle table columns correctly sized from column_typing' + - 'Oracle sqlldr char sizing test complete' + output_does_not_contain: + - 'char(400000)' + +# DuckDB Arrow IPC output: DUCKDB_USE_ARROW=true enables binary Arrow IPC streaming +# from DuckDB instead of CSV text, for better performance on large datasets. +- id: 228 + name: 'DuckDB Arrow IPC output for parquet reads' + env: + DUCKDB_USE_ARROW: 'true' + run: 'sling run -d -p cmd/sling/tests/pipelines/p.26.duckdb_arrow_ipc_output.yaml' + output_contains: + - 'SUCCESS: DuckDB Arrow IPC output produced correct row count' + - 'DuckDB Arrow IPC output test complete' \ No newline at end of file diff --git a/core/dbio/api/auth.go b/core/dbio/api/auth.go index 4cfeb4957..55a65025e 100644 --- a/core/dbio/api/auth.go +++ b/core/dbio/api/auth.go @@ -451,7 +451,7 @@ func (a *AuthenticatorOAuth2) Authenticate(ctx context.Context, state *APIStateA if err == nil { state.Token = newTok.AccessToken state.Headers = map[string]string{"Authorization": g.F("Bearer %s", newTok.AccessToken)} - if newTok.RefreshToken != storedTok.RefreshToken && !env.IsAgentMode { + if newTok.RefreshToken != storedTok.RefreshToken { a.SaveToken(newTok) // Save if rotated } g.Debug("Used refreshed token from storage") @@ -672,8 +672,14 @@ func generateRandomState() string { return base64.URLEncoding.EncodeToString(b) } -// openBrowser opens the default browser to the given URL (unchanged from original). +// openBrowser opens the default browser to the given URL. +// If the BROWSER environment variable is set, it is used as the browser command. func (a *AuthenticatorOAuth2) openBrowser(url string) error { + // Respect BROWSER env var for session-level browser override + if browserEnv := os.Getenv("BROWSER"); browserEnv != "" { + return exec.Command(browserEnv, url).Start() + } + var cmd string var args []string diff --git a/core/dbio/database/database.go b/core/dbio/database/database.go index 6cdac011a..cb60de3d7 100755 --- a/core/dbio/database/database.go +++ b/core/dbio/database/database.go @@ -1554,6 +1554,16 @@ func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns) { col.Type = fc.Type col.Sourced = fc.Sourced } + + // MySQL/MariaDB/StarRocks: the Go driver reports TINYINT without display + // width, so it can't distinguish TINYINT(1) (boolean) from TINYINT (integer). + // The metadata query uses COLUMN_TYPE which preserves this distinction. + // Use the fetched column type when it differs from the driver-derived type. + if g.In(conn.Self().GetType(), dbio.TypeDbMySQL, dbio.TypeDbMariaDB, dbio.TypeDbStarRocks) && fc.Type != "" && fc.Type != col.Type { + col.Type = fc.Type + col.Sourced = fc.Sourced + } + col.Constraint = fc.Constraint // fetch decimal info diff --git a/core/dbio/database/database_duckdb.go b/core/dbio/database/database_duckdb.go index dca9dfc3b..117bc8d47 100644 --- a/core/dbio/database/database_duckdb.go +++ b/core/dbio/database/database_duckdb.go @@ -131,6 +131,11 @@ func (conn *DuckDbConn) Connect(timeOut ...int) (err error) { } } + // set opy_method + if conn.GetProp("copy_method") == "" && cast.ToBool(os.Getenv("DUCKDB_USE_ARROW")) { + conn.SetProp("copy_methody", "arrow_http") + } + // add extensions if conn.GetProp("copy_method") == "arrow_http" { conn.duck.AddExtension("arrow from community") diff --git a/core/dbio/database/database_oracle.go b/core/dbio/database/database_oracle.go index 6979d2251..9661f7a32 100755 --- a/core/dbio/database/database_oracle.go +++ b/core/dbio/database/database_oracle.go @@ -259,7 +259,7 @@ func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) // logic to insert rows with values containing new line chars // addFilePath is additional rows to be inserted - countTot, err := conn.SQLLoad(tableFName, ds) + countTot, err := conn.SQLLoad(tableFName, ds, columns) if err != nil { return 0, g.Error(err, "Error with SQLLoad") } @@ -270,7 +270,7 @@ func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) // SQLLoad uses sqlldr to Bulk Import // cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr // cannot import when newline in value. Need to scan for new lines. -func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error) { +func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream, tgtColumns ...iop.Columns) (count uint64, err error) { var stderr, stdout bytes.Buffer connURL := conn.ConnString() @@ -289,7 +289,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui ctlStr := g.R( conn.BaseConn.GetTemplateValue("core.sqlldr"), "table", tableFName, - "columns", conn.getColumnsString(ds), + "columns", conn.getColumnsString(ds, tgtColumns...), ) err = os.WriteFile( ctlPath, @@ -430,7 +430,19 @@ retry: return ds.Count, err } -func (conn *OracleConn) getColumnsString(ds *iop.Datastream) string { +func (conn *OracleConn) getColumnsString(ds *iop.Datastream, tgtColumns ...iop.Columns) string { + // build lookup of target column length by name (from GetColumns/metadata) + tgtLength := map[string]int{} + if len(tgtColumns) > 0 { + for _, col := range tgtColumns[0] { + if col.DbPrecision > 0 { + tgtLength[strings.ToLower(col.Name)] = col.DbPrecision + } else if col.Stats.MaxLen > 0 { + tgtLength[strings.ToLower(col.Name)] = col.Stats.MaxLen + } + } + } + columnsString := "" for _, col := range ds.Columns { expr := "" @@ -452,10 +464,15 @@ func (conn *OracleConn) getColumnsString(ds *iop.Datastream) string { colNameEscaped, ) } else if col.IsString() { - expr = g.F("char(400000) NULLIF %s=BLANKS", colName) - if col.DbPrecision > 400000 { - expr = g.F("char(%d) NULLIF %s=BLANKS", col.DbPrecision, colName) + charSize := 400000 // default fallback when precision is unknown + if p, ok := tgtLength[strings.ToLower(col.Name)]; ok && p > 0 { + charSize = p + } else if col.DbPrecision > 0 { + charSize = col.DbPrecision + } else if col.Stats.MaxLen > 0 { + charSize = col.Stats.MaxLen } + expr = g.F("char(%d) NULLIF %s=BLANKS", charSize, colName) } columnsString += fmt.Sprintf(" %s %s,\n", colName, expr) } diff --git a/core/dbio/database/database_sqlserver.go b/core/dbio/database/database_sqlserver.go index c36760eea..bdaf7352e 100755 --- a/core/dbio/database/database_sqlserver.go +++ b/core/dbio/database/database_sqlserver.go @@ -15,6 +15,8 @@ import ( "cloud.google.com/go/cloudsqlconn" cloudsqlmssql "cloud.google.com/go/cloudsqlconn/sqlserver/mssql" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/jmoiron/sqlx" "github.com/samber/lo" "github.com/slingdata-io/sling-cli/core/dbio" @@ -573,13 +575,17 @@ func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastr return conn.BaseConn.InsertBatchStream(tableFName, ds) } - if fedAuth := conn.GetProp("fed_auth"); g.In(fedAuth, azuread.ActiveDirectoryAzCli) { - // need to az cli tool - _, err = exec.LookPath(conn.azCliPath()) - if err != nil { - g.Warn("unable to use bcp since the Azure CLI tool is not found in path. bcp needs an Access Token obtained via the Azure CLI tool for fed_auth=%s. Using cursor...", fedAuth) - return conn.BaseConn.InsertBatchStream(tableFName, ds) + if fedAuth := conn.FedAuth(); fedAuth != "" { + if g.In(fedAuth, azuread.ActiveDirectoryAzCli) { + // need az cli tool for AzCli auth + _, err = exec.LookPath(conn.azCliPath()) + if err != nil { + g.Warn("unable to use bcp since the Azure CLI tool is not found in path. bcp needs an Access Token obtained via the Azure CLI tool for fed_auth=%s. Using cursor...", fedAuth) + return conn.BaseConn.InsertBatchStream(tableFName, ds) + } } + // other Entra ID methods (ActiveDirectoryDefault, ActiveDirectoryManagedIdentity, etc.) + // will obtain tokens via azidentity in BcpImportFile } // needs to get columns to shape stream @@ -820,6 +826,59 @@ func (conn *MsSQLServerConn) azCliPath() string { return "az" } +// bcpEntraIDMethods returns the fed_auth methods that can obtain tokens via azidentity for BCP +func bcpEntraIDMethods() []string { + return []string{ + azuread.ActiveDirectoryDefault, + azuread.ActiveDirectoryManagedIdentity, + azuread.ActiveDirectoryMSI, + azuread.ActiveDirectoryServicePrincipal, + azuread.ActiveDirectoryApplication, + } +} + +// getEntraIDToken obtains an access token using azidentity (DefaultAzureCredential). +// This supports Workload Identity, Managed Identity, environment credentials, and more +// without requiring the Azure CLI. +func (conn *MsSQLServerConn) getEntraIDToken() (string, error) { + resource := conn.GetProp("bcp_azure_token_resource") + if resource == "" { + resource = "https://database.windows.net" + } + scope := resource + "/.default" + + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return "", g.Error(err, "could not create Azure credential for BCP token") + } + + token, err := cred.GetToken(context.Background(), policy.TokenRequestOptions{ + Scopes: []string{scope}, + }) + if err != nil { + return "", g.Error(err, "could not obtain Azure access token for BCP") + } + + return token.Token, nil +} + +// writeBcpTokenFile writes an access token as UTF-16LE to a temp file for BCP -P flag +func (conn *MsSQLServerConn) writeBcpTokenFile(token string) (string, error) { + utf16Token := utf16.Encode([]rune(token)) + var leBytes []byte + for _, u := range utf16Token { + leBytes = append(leBytes, byte(u), byte(u>>8)) + } + + tokenFilePath := path.Join(env.GetTempFolder(), g.NewTsID("sqlserver.token")+".txt") + err := os.WriteFile(tokenFilePath, leBytes, 0600) + if err != nil { + return "", g.Error(err, "could not write token to temp file") + } + + return tokenFilePath, nil +} + // BcpImportFile Import using bcp tool // https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 // bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 @@ -896,20 +955,15 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u return } bcpArgs = append(bcpArgs, bcpAuthParts...) - } else if fedAuth := conn.GetProp("fed_auth"); g.In(fedAuth, azuread.ActiveDirectoryAzCli) { + } else if fedAuth := conn.FedAuth(); g.In(fedAuth, azuread.ActiveDirectoryAzCli) { + // obtain token via Azure CLI tool azCliTokenResource := conn.GetProp("bcp_azure_token_resource") if azCliTokenResource == "" { azCliTokenResource = "https://database.windows.net" } azCliArgs := []string{"account", "get-access-token", "--resource", azCliTokenResource, "--query", "accessToken", "--output", "tsv"} - // need to run - // az account get-access-token --resource https://database.windows.net --query accessToken --output tsv azCmd := exec.Command(conn.azCliPath(), azCliArgs...) - if err != nil { - return 0, g.Error(err, "could not create az cli command") - } - g.Debug(conn.azCliPath() + " " + strings.Join(azCliArgs, ` `)) output, err := azCmd.Output() if err != nil { @@ -918,23 +972,27 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u } token := strings.TrimSpace(string(output)) + tokenFilePath, err := conn.writeBcpTokenFile(token) + if err != nil { + return 0, g.Error(err, "could not write BCP token file") + } + defer os.Remove(tokenFilePath) - // convert token to UTF-16LE bytes - utf16Token := utf16.Encode([]rune(token)) - var leBytes []byte - for _, u := range utf16Token { - leBytes = append(leBytes, byte(u), byte(u>>8)) + bcpArgs = append(bcpArgs, "-G", "-P", tokenFilePath) + } else if fedAuth := conn.FedAuth(); g.In(fedAuth, bcpEntraIDMethods()...) { + // obtain token via azidentity (supports Workload Identity, Managed Identity, env credentials, etc.) + g.Debug("obtaining Entra ID token via azidentity for BCP (fed_auth=%s)", fedAuth) + token, err := conn.getEntraIDToken() + if err != nil { + return 0, g.Error(err, "could not obtain Entra ID token for BCP via azidentity (fed_auth=%s)", fedAuth) } - // now write the UTF-16LE token bytes to a temp file - tokenFilePath := path.Join(env.GetTempFolder(), g.NewTsID("sqlserver.token")+".txt") - err = os.WriteFile(tokenFilePath, leBytes, 0600) + tokenFilePath, err := conn.writeBcpTokenFile(token) if err != nil { - return 0, g.Error(err, "could not write token to temp file") + return 0, g.Error(err, "could not write BCP token file") } defer os.Remove(tokenFilePath) - // add BCP authentication args for Azure AD with token file bcpArgs = append(bcpArgs, "-G", "-P", tokenFilePath) } else { if g.In(conn.GetProp("authenticator"), "winsspi") || conn.isTrusted() { diff --git a/core/dbio/iop/datatype.go b/core/dbio/iop/datatype.go index 516c5674a..9b552019b 100755 --- a/core/dbio/iop/datatype.go +++ b/core/dbio/iop/datatype.go @@ -1612,10 +1612,22 @@ func NativeTypeToGeneral(name, dbType string, connType dbio.Type) (colType Colum } } - dbType = strings.Split(strings.ToLower(dbType), "(")[0] - dbType = strings.Split(dbType, "<")[0] + dbType = strings.ToLower(dbType) template, _ := connType.Template() + + // Try matching with parenthesized type first (e.g. "tinyint(1)") + // before stripping parentheses. This allows MySQL/MariaDB/StarRocks + // to distinguish tinyint(1) (boolean) from tinyint (small integer). + dbTypeFull := strings.Split(dbType, "<")[0] + dbTypeFull = strings.TrimRight(dbTypeFull, " ") + if matchedType, ok := template.NativeTypeMap[dbTypeFull]; ok { + return ColumnType(matchedType) + } + + dbType = strings.Split(dbType, "(")[0] + dbType = strings.Split(dbType, "<")[0] + if matchedType, ok := template.NativeTypeMap[dbType]; ok { colType = ColumnType(matchedType) } else { diff --git a/core/dbio/iop/duckdb.go b/core/dbio/iop/duckdb.go index c9811015f..e7ec13c50 100644 --- a/core/dbio/iop/duckdb.go +++ b/core/dbio/iop/duckdb.go @@ -789,7 +789,65 @@ func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...ma columns, describeErr := duck.Describe(sql) - // one query at a time + // add any specified transforms + transforms := []map[string]string{} + fsProps := map[string]string{} + g.Unmarshal(duck.GetProp("fs_props"), &fsProps) + if transformsPayload, ok := fsProps["transforms"]; ok { + if err := g.Unmarshal(transformsPayload, &transforms); err != nil { + g.Warn("could not unmarshal transform payload: %s", err.Error()) + } + } + + // Arrow IPC output mode: uses a separate DuckDB process to pipe binary Arrow data. + // Only applies to SELECT/WITH queries — describe, pragma, etc. must use CSV mode. + sqlStripped, _ := StripSQLComments(sql) + sqlLower := strings.TrimSpace(strings.ToLower(sqlStripped)) + isSelectQuery := strings.HasPrefix(sqlLower, "select") || strings.HasPrefix(sqlLower, "with") + useArrow := cast.ToBool(os.Getenv("DUCKDB_USE_ARROW")) && isSelectQuery + if useArrow { + duck.AddExtension("arrow from community") + + arrowReader, arrowCleanup, err := duck.StreamArrow(queryCtx.Ctx, sql) + if err != nil { + return nil, g.Error(err, "Failed to start Arrow stream") + } + + ds = NewDatastreamContext(queryCtx.Ctx, columns) + ds.Defer(func() { arrowCleanup() }) + + if cds, ok := opts["datastream"]; ok { + ds = cds.(*Datastream) + ds.Columns = columns + } + + ds.Inferred = true + ds.NoDebug = strings.Contains(sql, env.NoDebugKey) + ds.SetConfig(duck.Props()) + if len(transforms) > 0 { + ds.SetConfig(map[string]string{"transforms": g.Marshal(transforms)}) + } + + err = ds.ConsumeArrowReaderStream(arrowReader) + if err != nil { + ds.Close() + return ds, g.Error(err, "could not read Arrow output stream") + } + + // handle filename, always last column (after Arrow columns are set) + if cast.ToBool(opts["filename"]) { + ds.Columns[len(ds.Columns)-1].Name = ds.Metadata.StreamURL.Key + ds.Metadata.StreamURL.Key = "" // so it is not added again + } + + if describeErr != nil { + g.LogError(describeErr) + } + + return ds, nil + } + + // CSV mode: one query at a time on the interactive process duck.Context.Lock() // new datastream @@ -805,16 +863,6 @@ func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...ma return nil, g.Error(err, "Failed to submit SQL") } - // add any specified transforms - transforms := []map[string]string{} - fsProps := map[string]string{} - g.Unmarshal(duck.GetProp("fs_props"), &fsProps) - if transformsPayload, ok := fsProps["transforms"]; ok { - if err := g.Unmarshal(transformsPayload, &transforms); err != nil { - g.Warn("could not unmarshal transform payload: %s", err.Error()) - } - } - if cds, ok := opts["datastream"]; ok { // if provided, use it ds = cds.(*Datastream) @@ -838,9 +886,6 @@ func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...ma duck.Context.Unlock() // release lock }) - // TODO: Add Arrows output - // COPY (SELECT extension_name, loaded, installed FROM duckdb_extensions()) TO '/dev/stdout' (FORMAT ARROWS, BATCH_SIZE 100); - err = ds.ConsumeCsvReader(dq.reader) if err != nil { ds.Close() @@ -858,6 +903,159 @@ func (duck *DuckDb) StreamContext(ctx context.Context, sql string, options ...ma return } +// StreamArrow launches a separate DuckDB CLI process that outputs Arrow IPC binary data to stdout. +// This bypasses the interactive CSV process entirely, avoiding line-based scanning issues with binary data. +func (duck *DuckDb) StreamArrow(ctx context.Context, sql string) (reader io.ReadCloser, cleanup func(), err error) { + bin, err := duck.EnsureBinDuckDB(duck.GetProp("duckdb_version")) + if err != nil { + return nil, nil, g.Error(err, "could not get duckdb binary") + } + + // Build args (no -csv or -nullvalue flags for Arrow mode) + args := []string{} + instance := duck.GetProp("instance") + if instance != "" { + // Always open file-based instances read-only to avoid lock conflicts + // with the interactive process that already holds a write lock + args = append(args, "-readonly") + args = append(args, instance) + } else if cast.ToBool(duck.GetProp("read_only")) { + args = append(args, "-readonly") + } + + if motherduckToken := duck.GetProp("motherduck_token"); motherduckToken != "" { + dsn := "md:" + duck.GetProp("database") + if motherduckAttachMode := duck.GetProp("motherduck_attach_mode"); motherduckAttachMode != "" { + dsn = g.F("%s?attach_mode=%s", dsn, motherduckAttachMode) + } + args = append(args, dsn) + } + + // Build SQL script + scriptParts := []string{} + if extSQL := duck.getLoadExtensionSQL(); extSQL != "" { + scriptParts = append(scriptParts, extSQL) + } + if secretSQL := duck.getCreateSecretSQL(); secretSQL != "" { + scriptParts = append(scriptParts, secretSQL) + } + scriptParts = append(scriptParts, "SET preserve_insertion_order = false;") + + if runtime.GOOS == "windows" { + // Windows: use temp file since /dev/stdout doesn't exist + tmpFile, tmpErr := os.CreateTemp("", "sling-arrow-*.ipc") + if tmpErr != nil { + return nil, nil, g.Error(tmpErr, "could not create temp file for Arrow output") + } + tmpPath := tmpFile.Name() + tmpFile.Close() + + scriptParts = append(scriptParts, g.F("COPY (%s) TO '%s' (FORMAT ARROWS);", sql, tmpPath)) + script := strings.Join(scriptParts, "\n") + + cmd := exec.CommandContext(ctx, bin, args...) + cmd.Stdin = strings.NewReader(script) + if duck.Proc != nil { + cmd.Dir = duck.Proc.WorkDir + cmd.Env = g.MapToKVArr(duck.Proc.Env) + } else if workDir := duck.GetProp("working_dir"); workDir != "" { + cmd.Dir = workDir + cmd.Env = os.Environ() + } + + // MotherDuck token + if motherduckToken := duck.GetProp("motherduck_token"); motherduckToken != "" { + cmd.Env = append(cmd.Env, "motherduck_token="+motherduckToken) + } + + var stderrBuf strings.Builder + cmd.Stderr = &stderrBuf + + if runErr := cmd.Run(); runErr != nil { + os.Remove(tmpPath) + errMsg := stderrBuf.String() + if errMsg != "" { + return nil, nil, g.Error("Arrow DuckDB process failed: %s\n%s", runErr, errMsg) + } + return nil, nil, g.Error(runErr, "Arrow DuckDB process failed") + } + + file, openErr := os.Open(tmpPath) + if openErr != nil { + os.Remove(tmpPath) + return nil, nil, g.Error(openErr, "could not open Arrow temp file") + } + + cleanup = func() { + file.Close() + os.Remove(tmpPath) + } + return file, cleanup, nil + } + + // Unix: pipe Arrow IPC directly through /dev/stdout + scriptParts = append(scriptParts, g.F("COPY (%s) TO '/dev/stdout' (FORMAT ARROWS);", sql)) + script := strings.Join(scriptParts, "\n") + + cmd := exec.CommandContext(ctx, bin, args...) + cmd.Stdin = strings.NewReader(script) + if duck.Proc != nil { + cmd.Dir = duck.Proc.WorkDir + cmd.Env = g.MapToKVArr(duck.Proc.Env) + } else if workDir := duck.GetProp("working_dir"); workDir != "" { + cmd.Dir = workDir + cmd.Env = os.Environ() + } + + // MotherDuck token + if motherduckToken := duck.GetProp("motherduck_token"); motherduckToken != "" { + cmd.Env = append(cmd.Env, "motherduck_token="+motherduckToken) + } + + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, g.Error(err, "could not get stdout pipe for Arrow DuckDB process") + } + + var stderrBuf strings.Builder + stderrPipe, err := cmd.StderrPipe() + if err != nil { + return nil, nil, g.Error(err, "could not get stderr pipe for Arrow DuckDB process") + } + + // capture stderr in background + go func() { + buf := make([]byte, 4096) + for { + n, readErr := stderrPipe.Read(buf) + if n > 0 { + stderrBuf.Write(buf[:n]) + } + if readErr != nil { + break + } + } + }() + + if err = cmd.Start(); err != nil { + return nil, nil, g.Error(err, "could not start Arrow DuckDB process") + } + + cleanup = func() { + waitErr := cmd.Wait() + if waitErr != nil { + errMsg := stderrBuf.String() + if errMsg != "" { + g.Warn("Arrow DuckDB process error: %s\n%s", waitErr, errMsg) + } else { + g.Warn("Arrow DuckDB process error: %s", waitErr) + } + } + } + + return stdoutPipe, cleanup, nil +} + // initScanner is set only once func (duck *DuckDb) initScanner() { diff --git a/core/dbio/iop/duckdb_test.go b/core/dbio/iop/duckdb_test.go index 88e353db6..da2df8635 100644 --- a/core/dbio/iop/duckdb_test.go +++ b/core/dbio/iop/duckdb_test.go @@ -131,6 +131,133 @@ func TestDuckDb(t *testing.T) { }) } +func TestDuckDbStreamArrow(t *testing.T) { + t.Run("StreamArrow basic query", func(t *testing.T) { + duck := NewDuckDb(context.Background()) + + // Ensure arrow extension is added and connection is open + duck.AddExtension("arrow from community") + err := duck.Open() + if !assert.NoError(t, err) { + return + } + defer duck.Close() + + // Use inline VALUES — the Arrow process is separate and has no access to in-memory tables + sql := "SELECT * FROM (VALUES (1, 'Alice', 10.5, true), (2, 'Bob', 20.7, false), (3, 'Charlie', 30.9, true)) AS t(id, name, value, flag) ORDER BY id" + + reader, cleanup, err := duck.StreamArrow(context.Background(), sql) + if !assert.NoError(t, err) { + return + } + defer cleanup() + + // Consume the Arrow stream into a Datastream + ds := NewDatastreamContext(context.Background(), nil) + err = ds.ConsumeArrowReaderStream(reader) + if !assert.NoError(t, err) { + return + } + + data, err := ds.Collect(0) + if !assert.NoError(t, err) { + return + } + + records := data.Records() + if !assert.Equal(t, 3, len(records)) { + return + } + + // Verify data (Arrow may return different Go types depending on DuckDB inference) + assert.EqualValues(t, 1, records[0]["id"]) + assert.Equal(t, "Alice", records[0]["name"]) + assert.EqualValues(t, 3, records[2]["id"]) + assert.Equal(t, "Charlie", records[2]["name"]) + }) + + t.Run("StreamContext with DUCKDB_USE_ARROW", func(t *testing.T) { + t.Setenv("DUCKDB_USE_ARROW", "true") + + duck := NewDuckDb(context.Background()) + + sql := "SELECT * FROM (VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)) AS t(id, name, age) ORDER BY id" + + // StreamContext should use Arrow path + ds, err := duck.StreamContext(context.Background(), sql) + if !assert.NoError(t, err) { + return + } + + data, err := ds.Collect(0) + if !assert.NoError(t, err) { + return + } + + records := data.Records() + if !assert.Equal(t, 3, len(records)) { + return + } + + assert.EqualValues(t, 1, records[0]["id"]) + assert.Equal(t, "Alice", records[0]["name"]) + assert.EqualValues(t, 30, records[0]["age"]) + + assert.EqualValues(t, 3, records[2]["id"]) + assert.Equal(t, "Charlie", records[2]["name"]) + assert.EqualValues(t, 35, records[2]["age"]) + + ds.Close() + }) + + t.Run("StreamArrow with file-based instance", func(t *testing.T) { + tmpDir := t.TempDir() + instancePath := tmpDir + "/test_arrow.duckdb" + + // Create and populate a file-based database, then close to release lock + setupDuck := NewDuckDb(context.Background(), "instance="+instancePath) + _, err := setupDuck.ExecMultiContext( + context.Background(), + "CREATE TABLE arrow_file_test (id INT, name VARCHAR, amount DECIMAL(10,2))", + "INSERT INTO arrow_file_test VALUES (1, 'Alice', 100.50),(2, 'Bob', 200.75),(3, 'Charlie', 300.25)", + ) + if !assert.NoError(t, err) { + return + } + setupDuck.Close() + time.Sleep(200 * time.Millisecond) // ensure lock is fully released + + // StreamArrow on the file-based instance (no interactive process needed) + duck := NewDuckDb(context.Background(), "instance="+instancePath) + duck.AddExtension("arrow from community") + + reader, cleanup, err := duck.StreamArrow(context.Background(), "SELECT * FROM arrow_file_test ORDER BY id") + if !assert.NoError(t, err) { + return + } + defer cleanup() + + ds := NewDatastreamContext(context.Background(), nil) + err = ds.ConsumeArrowReaderStream(reader) + if !assert.NoError(t, err) { + return + } + + data, err := ds.Collect(0) + if !assert.NoError(t, err) { + return + } + + records := data.Records() + if !assert.Equal(t, 3, len(records)) { + return + } + + assert.EqualValues(t, 1, records[0]["id"]) + assert.Equal(t, "Alice", records[0]["name"]) + }) +} + func TestDuckDbDataflowToHttpStream(t *testing.T) { t.Run("CSV streaming - verifies streaming without io.ReadAll", func(t *testing.T) { // This test confirms that DataflowToHttpStream now streams data diff --git a/core/dbio/iop/parquet_arrow_test.go b/core/dbio/iop/parquet_arrow_test.go index feefc38ed..af109386f 100644 --- a/core/dbio/iop/parquet_arrow_test.go +++ b/core/dbio/iop/parquet_arrow_test.go @@ -39,7 +39,7 @@ func TestDecimal(t *testing.T) { func TestNewParquetReader(t *testing.T) { // Test file paths - you may need to adjust these testFiles := []string{ - "/Users/fritz/__/Git/sling-cli/core/dbio/filesys/test/test1/parquet/test1.1.parquet", + "./core/dbio/filesys/test/test1/parquet/test1.1.parquet", "/tmp/test.parquet", // Will use the file created by TestNewParquetWriter } diff --git a/core/dbio/templates/mariadb.yaml b/core/dbio/templates/mariadb.yaml index ad0922c9b..33996ba76 100644 --- a/core/dbio/templates/mariadb.yaml +++ b/core/dbio/templates/mariadb.yaml @@ -109,7 +109,12 @@ metadata: order by table_schema, table_name columns: | - select column_name, data_type + select column_name, + case + when data_type = 'tinyint' and column_type like '%unsigned%' then 'unsigned tinyint' + when data_type = 'tinyint' then column_type + else data_type + end as data_type from information_schema.columns where table_schema = '{schema}' and table_name = '{table}' @@ -243,7 +248,11 @@ metadata: cols.table_name as table_name, tables.is_view as is_view, cols.column_name as column_name, - cols.data_type as data_type, + case + when cols.data_type = 'tinyint' and cols.column_type like '%unsigned%' then 'unsigned tinyint' + when cols.data_type = 'tinyint' then cols.column_type + else cols.data_type + end as data_type, cols.ordinal_position as position from information_schema.columns cols join tables @@ -564,7 +573,8 @@ native_type_map: time: time timestamp: timestamp tinyblob: text - tinyint: bool + "tinyint(1)": bool + tinyint: smallint tinytext: text "unsigned bigint": "decimal(28,0)" "unsigned int": bigint @@ -578,7 +588,7 @@ native_type_map: general_type_map: bigint: bigint binary: varbinary - bool: tinyint + bool: "tinyint(1)" date: date datetime: "datetime(6)" decimal: "decimal(,)" diff --git a/core/dbio/templates/mysql.yaml b/core/dbio/templates/mysql.yaml index e002186b9..f688c283a 100755 --- a/core/dbio/templates/mysql.yaml +++ b/core/dbio/templates/mysql.yaml @@ -109,7 +109,12 @@ metadata: order by table_schema, table_name columns: | - select column_name, data_type + select column_name, + case + when data_type = 'tinyint' and column_type like '%unsigned%' then 'unsigned tinyint' + when data_type = 'tinyint' then column_type + else data_type + end as data_type from information_schema.columns where table_schema = '{schema}' and table_name = '{table}' @@ -246,7 +251,11 @@ metadata: cols.table_name as table_name, tables.is_view as is_view, cols.column_name as column_name, - cols.data_type as data_type, + case + when cols.data_type = 'tinyint' and cols.column_type like '%unsigned%' then 'unsigned tinyint' + when cols.data_type = 'tinyint' then cols.column_type + else cols.data_type + end as data_type, cols.ordinal_position as position from information_schema.columns cols join tables @@ -567,7 +576,8 @@ native_type_map: time: time timestamp: timestamp tinyblob: text - tinyint: bool + "tinyint(1)": bool + tinyint: smallint tinytext: text "unsigned bigint": "decimal(28,0)" "unsigned int": bigint @@ -581,7 +591,7 @@ native_type_map: general_type_map: bigint: bigint binary: varbinary - bool: tinyint + bool: "tinyint(1)" date: date datetime: "datetime(6)" decimal: "decimal(,)" diff --git a/core/dbio/templates/oracle.yaml b/core/dbio/templates/oracle.yaml index 4601859c4..ffdb56824 100755 --- a/core/dbio/templates/oracle.yaml +++ b/core/dbio/templates/oracle.yaml @@ -189,7 +189,7 @@ metadata: order by owner, view_name columns: | - select column_name, data_type, coalesce(data_precision, data_length) as precision, data_scale as scale + select column_name, data_type, coalesce(data_precision, data_length) as precision, data_scale as scale, data_length as maximum_length from sys.all_tab_columns where owner = '{schema}' and table_name = '{table}' @@ -201,7 +201,8 @@ metadata: col.column_name, col.data_type, coalesce(col.data_precision, col.data_length) as precision, - col.data_scale as scale + col.data_scale as scale, + col.data_length as maximum_length from sys.all_tab_columns col inner join sys.all_synonyms syn on syn.table_owner = col.owner diff --git a/core/dbio/templates/starrocks.yaml b/core/dbio/templates/starrocks.yaml index fbd85c261..713b63bde 100644 --- a/core/dbio/templates/starrocks.yaml +++ b/core/dbio/templates/starrocks.yaml @@ -129,7 +129,12 @@ metadata: order by table_schema, table_name columns: | - select column_name, data_type + select column_name, + case + when data_type = 'tinyint' and column_type like '%unsigned%' then 'unsigned tinyint' + when data_type = 'tinyint' then column_type + else data_type + end as data_type from information_schema.columns where table_schema = '{schema}' and table_name = '{table}' @@ -207,7 +212,11 @@ metadata: cols.table_name as table_name, tables.is_view as is_view, cols.column_name as column_name, - cols.data_type as data_type, + case + when cols.data_type = 'tinyint' and cols.column_type like '%unsigned%' then 'unsigned tinyint' + when cols.data_type = 'tinyint' then cols.column_type + else cols.data_type + end as data_type, cols.ordinal_position as position from information_schema.columns cols join tables @@ -500,6 +509,7 @@ native_type_map: binary: binary bit: smallint blob: text + boolean: bool char: string date: date datetime: datetime @@ -530,8 +540,10 @@ native_type_map: time: time timestamp: timestamp tinyblob: text - tinyint: bool + "tinyint(1)": bool + tinyint: smallint tinytext: text + "unsigned tinyint": integer varbinary: binary varchar: text year: string diff --git a/core/env/env.go b/core/env/env.go index 61583f9be..36b49fe98 100755 --- a/core/env/env.go +++ b/core/env/env.go @@ -4,7 +4,7 @@ import ( "embed" "fmt" "os" - "path" + "path/filepath" "sort" "strings" "sync" @@ -51,15 +51,15 @@ var ( GetOAuthMap = func() map[string]map[string]any { return map[string]map[string]any{} } - ExecFolder = func() string { return path.Join(HomeDir, "executions", ExecID) } - QueueFolder = func() string { return path.Join(ExecFolder(), "queues") } - RuntimeFolder = func() string { return path.Join(ExecFolder(), "runtime") } + ExecFolder = func() string { return filepath.Join(HomeDir, "executions", ExecID) } + QueueFolder = func() string { return filepath.Join(ExecFolder(), "queues") } + RuntimeFolder = func() string { return filepath.Join(ExecFolder(), "runtime") } RuntimeFilePath = func(name string) string { name = strings.ReplaceAll(name, "\\", "_") name = strings.ReplaceAll(name, "/", "_") name = strings.ReplaceAll(name, ":", "_") os.MkdirAll(RuntimeFolder(), 0755) // make folder - return path.Join(RuntimeFolder(), g.F("%s.json", name)) + return filepath.Join(RuntimeFolder(), g.F("%s.json", name)) } setupOtel = func() {} @@ -130,7 +130,7 @@ func LoadHomeDir() { envKey := "SLING_HOME_DIR" HomeDir = CleanWindowsPath(os.Getenv(envKey)) if HomeDir == "" { - HomeDir = CleanWindowsPath(path.Join(g.UserHomeDir(), ".sling")) + HomeDir = CleanWindowsPath(filepath.Join(g.UserHomeDir(), ".sling")) os.Setenv(envKey, HomeDir) } @@ -145,7 +145,7 @@ func LoadHomeDir() { } func HomeBinDir() string { - return path.Join(HomeDir, "bin") + return filepath.Join(HomeDir, "bin") } // IsInteractiveTerminal checks if the current process is running in an interactive terminal @@ -285,13 +285,13 @@ func setupFileLogging() { if logDir := os.Getenv("SLING_LOG_DIR"); logDir != "" && debugLogFile == nil { // Expand ~ to home directory if strings.HasPrefix(logDir, "~/") { - logDir = path.Join(g.UserHomeDir(), logDir[2:]) + logDir = filepath.Join(g.UserHomeDir(), logDir[2:]) } if err := os.MkdirAll(logDir, 0755); err != nil { g.Warn("could not create log directory: %s", err.Error()) } else { logFileName := "sling_debug_" + time.Now().Format("2006_01_02") + ".log" - logPath := path.Join(logDir, logFileName) + logPath := filepath.Join(logDir, logFileName) f, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { @@ -349,7 +349,7 @@ func cleanupOldLogFiles(dir string, keep int) { if len(logFiles) > keep { for _, name := range logFiles[:len(logFiles)-keep] { - if err := os.Remove(path.Join(dir, name)); err != nil { + if err := os.Remove(filepath.Join(dir, name)); err != nil { g.Warn("could not remove old log file %s: %s", name, err.Error()) } } @@ -704,7 +704,7 @@ func RemoveAllLocalTempFile(localPath string) { } func WriteTempSQL(sql string, filePrefix ...string) (sqlPath string, err error) { - sqlPath = path.Join(GetTempFolder(), g.NewTsID(filePrefix...)+".sql") + sqlPath = filepath.Join(GetTempFolder(), g.NewTsID(filePrefix...)+".sql") err = os.WriteFile(sqlPath, []byte(sql), 0777) if err != nil { diff --git a/core/env/envfile.go b/core/env/envfile.go index 8ed53ae60..c053b26cc 100644 --- a/core/env/envfile.go +++ b/core/env/envfile.go @@ -23,7 +23,8 @@ type EnvFile struct { Body string `json:"-" yaml:"-"` } -func (ef *EnvFile) WriteEnvFile() (err error) { +// marshalEnvFileBytes marshals the EnvFile into formatted YAML bytes +func (ef *EnvFile) marshalEnvFileBytes() ([]byte, error) { connsMap := yaml.MapSlice{} // order connections names @@ -58,14 +59,22 @@ func (ef *EnvFile) WriteEnvFile() (err error) { envBytes, err := yaml.Marshal(efMap) if err != nil { - return g.Error(err, "could not marshal into YAML") + return nil, g.Error(err, "could not marshal into YAML") } - output := []byte(ef.TopComment + string(envBytes)) + output := formatYAML([]byte(ef.TopComment + string(envBytes))) + return output, nil +} + +func (ef *EnvFile) WriteEnvFile() (err error) { + output, err := ef.marshalEnvFileBytes() + if err != nil { + return err + } // fix windows path ef.Path = strings.ReplaceAll(ef.Path, `\`, `/`) - err = os.WriteFile(ef.Path, formatYAML(output), 0644) + err = os.WriteFile(ef.Path, output, 0644) if err != nil { return g.Error(err, "could not write YAML file") } @@ -73,6 +82,15 @@ func (ef *EnvFile) WriteEnvFile() (err error) { return } +// MarshalBody returns the EnvFile as a formatted YAML string +func (ef *EnvFile) MarshalBody() (string, error) { + output, err := ef.marshalEnvFileBytes() + if err != nil { + return "", err + } + return string(output), nil +} + func formatYAML(input []byte) []byte { newOutput := []byte{} pIndent := 0 diff --git a/core/sling/task.go b/core/sling/task.go index 639b064f0..129dce839 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -461,6 +461,11 @@ func (t *TaskExecution) shouldWriteViaDuckDB(uri string) bool { return g.In(t.Config.Target.ObjectFileFormat(), dbio.FileTypeParquet) } +// isFullRefreshCDC means we should use CDC snapshot mode +func (t *TaskExecution) isFullRefreshCDC() bool { + return t.Config.Mode == FullRefreshMode && t.Config.ReplicationStream != nil && t.Config.ReplicationStream.CDCOptions != nil && os.Getenv("SLING_STATE") != "" +} + // isIncrementalWithUpdateKey means it has an update_key and is incremental mode func (t *TaskExecution) isIncrementalWithUpdateKey() bool { return t.Config.Source.HasUpdateKey() && t.Config.Mode == IncrementalMode diff --git a/core/sling/task_run.go b/core/sling/task_run.go index de4a6feb3..8b9355aa0 100644 --- a/core/sling/task_run.go +++ b/core/sling/task_run.go @@ -796,7 +796,7 @@ func (t *TaskExecution) runDbToDb() (err error) { } // CDC mode has its own execution path - if t.Config.Mode == ChangeCaptureMode { + if t.Config.Mode == ChangeCaptureMode || t.isFullRefreshCDC() { return executeCDC(t) }