Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions cmd/sling/sling_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ var cliRunFlags = []g.Flag{
Type: "bool",
Description: "Shows some examples.",
},
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
}

var cliRun = &g.CliSC{
Expand Down Expand Up @@ -270,11 +275,23 @@ var cliConns = &g.CliSC{
Type: "bool",
Description: "Set logging level to TRACE (do not use in production).",
},
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
},
},
{
Name: "list",
Description: "list local connections detected",
Flags: []g.Flag{
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
},
},
{
Name: "test",
Expand Down Expand Up @@ -304,6 +321,11 @@ var cliConns = &g.CliSC{
Type: "string",
Description: "The endpoint(s) to test, for API connections only",
},
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
},
},
{
Expand All @@ -317,6 +339,13 @@ var cliConns = &g.CliSC{
Description: "The name of the connection to remove",
},
},
Flags: []g.Flag{
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
},
},
{
Name: "set",
Expand All @@ -335,6 +364,13 @@ var cliConns = &g.CliSC{
Description: "The key=value properties to set. See https://docs.slingdata.io/sling-cli/environment#set-connections",
},
},
Flags: []g.Flag{
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
},
},
{
Name: "exec",
Expand Down Expand Up @@ -365,6 +401,11 @@ var cliConns = &g.CliSC{
Type: "bool",
Description: "Set logging level to TRACE (do not use in production).",
},
{
Name: "home-dir",
Type: "string",
Description: "Specify the sling home directory to use.",
},
},
},
},
Expand Down Expand Up @@ -520,6 +561,11 @@ func cliInit(done chan struct{}) int {
flaggy.AttachSubcommand(cli.Sc, 1)
}

// patch 'sling mcp' into 'sling serve mcp' for backwards compatibility
if len(os.Args) >= 2 && os.Args[1] == "mcp" {
os.Args = append([]string{os.Args[0], "serve"}, os.Args[1:]...)
}

flaggy.ShowHelpOnUnexpectedDisable()
flaggy.Parse()

Expand Down
7 changes: 6 additions & 1 deletion cmd/sling/sling_conns.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ var (
func processConns(c *g.CliSC) (ok bool, err error) {
ok = true

if homeDir := cast.ToString(c.Vals["home-dir"]); homeDir != "" {
os.Setenv("SLING_HOME_DIR", homeDir)
env.LoadHomeDir()
}

ef := env.LoadSlingEnvFile()
ec := connection.EnvFileConns{EnvFile: &ef}
asJSON := os.Getenv("SLING_OUTPUT") == "json"

entries := connection.GetLocalConns()
entries := connection.GetLocalConns(true)
defer connection.CloseAll()

env.SetTelVal("task_start_time", time.Now())
Expand Down
17 changes: 12 additions & 5 deletions cmd/sling/sling_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ func processRun(c *g.CliSC) (ok bool, err error) {
selectStreams = strings.Split(cast.ToString(v), ",")
case "examples":
showExamples = cast.ToBool(v)
case "home-dir":
if homeDir := cast.ToString(v); homeDir != "" {
os.Setenv("SLING_HOME_DIR", homeDir)
env.LoadHomeDir()
connection.GetLocalConns(true) // force reload connections
}
case "cdc-options":
payload := cast.ToString(v)
options, err := parsePayload(payload, true)
Expand Down Expand Up @@ -439,11 +445,6 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
cfg.Env["SLING_WORK_PATH"], _ = os.Getwd()
}

// set logging
if val := cfg.Env["SLING_LOGGING"]; val != "" {
os.Setenv("SLING_LOGGING", val)
}

task = sling.NewTask(env.ExecID, cfg)
task.Replication = replication

Expand Down Expand Up @@ -548,6 +549,12 @@ func replicationRun(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..
return
}

// set logging
if val := cast.ToString(replication.Env["SLING_LOGGING"]); val != "" {
os.Setenv("SLING_LOGGING", val)
env.InitLogger()
}

isThreadChild := cast.ToBool(os.Getenv("SLING_THREAD_CHILD"))

eG := g.ErrorGroup{}
Expand Down
91 changes: 91 additions & 0 deletions cmd/sling/tests/pipelines/p.17.direct_insert_state_datetime.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
env:
SOURCE: mssql2022
TARGET: clickhouse

steps:
# Setup source table in MSSQL
- connection: '{env.SOURCE}'
query: |
IF OBJECT_ID('dbo.test_direct_state', 'U') IS NOT NULL DROP TABLE dbo.test_direct_state;
CREATE TABLE dbo.test_direct_state (
id INT PRIMARY KEY,
name NVARCHAR(100),
updated DATETIME
);
INSERT INTO dbo.test_direct_state VALUES
(1, 'alice', '2024-01-15 10:30:00'),
(2, 'bob', '2024-02-20 14:45:00'),
(3, 'carol', '2024-03-10 09:00:00'),
(4, 'dave', '2024-04-05 16:20:00');

# Clean up target table in ClickHouse
- connection: '{env.TARGET}'
on_failure: warn
query: DROP TABLE IF EXISTS default.test_direct_state

# Clean up any previous state
- connection: POSTGRES
on_failure: warn
query: DELETE FROM sling_state._sling_state WHERE source_stream like '%test_direct_state%'

# Run incremental replication with direct_insert + column_casing + SLING_STATE
- replication:
source: '{env.SOURCE}'
target: '{env.TARGET}'

streams:
dbo.test_direct_state:
object: default.test_direct_state
mode: incremental
update_key: updated
target_options:
column_casing: snake
direct_insert: true

env:
SLING_STATE: POSTGRES/sling_state
SLING_RETRIES: 1

# Verify rows landed in target
- type: query
connection: '{env.TARGET}'
query: SELECT count(*) as cnt FROM default.test_direct_state
into: result

- type: log
message: 'row count => {store.result[0].cnt}'

- type: check
check: int_parse(store.result[0].cnt) == 4

# Verify state was written
- type: query
connection: POSTGRES
query: |
SELECT value, column_type FROM sling_state._sling_state
WHERE source_stream like '%test_direct_state%'
into: state_result

- type: log
message: 'state_result => {store.state_result}'

- type: check
check: length(store.state_result) > 0
message: 'Expected state record but found none'

- type: check
check: store.state_result[0].column_type == "datetime"
message: 'Expected column_type=datetime, got {store.state_result[0].column_type}'

# Clean up
- connection: '{env.SOURCE}'
query: |
IF OBJECT_ID('dbo.test_direct_state', 'U') IS NOT NULL DROP TABLE dbo.test_direct_state;

- connection: '{env.TARGET}'
on_failure: warn
query: DROP TABLE IF EXISTS default.test_direct_state

- connection: POSTGRES
on_failure: warn
query: DELETE FROM sling_state._sling_state WHERE source_stream like '%test_direct_state%'
115 changes: 115 additions & 0 deletions cmd/sling/tests/pipelines/p.18.store_in_replication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Test that pipeline {store.xxx} values are accessible in replication where: and sql: fields
# This validates the user-reported feature of passing store variables into replications

steps:
# 1. Create source table in Postgres
- id: setup
connection: postgres
query: |
DROP TABLE IF EXISTS public.store_repl_source;
DROP TABLE IF EXISTS public.store_repl_target_where;
DROP TABLE IF EXISTS public.store_repl_target_sql;
CREATE TABLE public.store_repl_source (
id INT PRIMARY KEY,
name VARCHAR(50),
category VARCHAR(20)
);
INSERT INTO public.store_repl_source (id, name, category) VALUES
(1, 'Alice', 'A'),
(2, 'Bob', 'B'),
(3, 'Charlie', 'A'),
(4, 'Diana', 'B'),
(5, 'Eve', 'A');

- log: "Created source table with 5 rows"

# 2. Store a filter value
- type: store
key: filter_category
value: A

- type: store
key: max_id
value: "4"

- log: "store.filter_category = {store.filter_category}, store.max_id = {store.max_id}"

# 3. Run inline replication using {store.xxx} in where: field (table stream)
- replication:
source: postgres
target: postgres
defaults:
mode: full-refresh
streams:
public.store_repl_source:
object: public.store_repl_target_where
where: "category = '{store.filter_category}'"
on_failure: abort

- log: "Replication with where clause completed"

# 4. Run inline replication using {store.xxx} in sql: field
- replication:
source: postgres
target: postgres
defaults:
mode: full-refresh
streams:
store_repl_sql:
sql: "select * from public.store_repl_source where category = '{store.filter_category}' and id <= {store.max_id}"
object: public.store_repl_target_sql
on_failure: abort

- log: "Replication with sql field completed"

# 5. Verify where: results (should have 3 rows: Alice, Charlie, Eve with category A)
- connection: postgres
query: SELECT id, name, category FROM public.store_repl_target_where ORDER BY id
into: result_where

- log: |
where: results:
{pretty_table(store.result_where)}

- check: length(store.result_where) == 3
failure_message: "Expected 3 rows from where filter, got {length(store.result_where)}"

- check: store.result_where[0].name == "Alice"
failure_message: "Expected first row to be Alice, got {store.result_where[0].name}"

- check: store.result_where[1].name == "Charlie"
failure_message: "Expected second row to be Charlie, got {store.result_where[1].name}"

- check: store.result_where[2].name == "Eve"
failure_message: "Expected third row to be Eve, got {store.result_where[2].name}"

- log: "SUCCESS: store variables work in where: field"

# 6. Verify sql: results (should have 2 rows: Alice and Charlie with category A and id <= 4)
- connection: postgres
query: SELECT id, name, category FROM public.store_repl_target_sql ORDER BY id
into: result_sql

- log: |
sql: results:
{pretty_table(store.result_sql)}

- check: length(store.result_sql) == 2
failure_message: "Expected 2 rows from sql filter, got {length(store.result_sql)}"

- check: store.result_sql[0].name == "Alice"
failure_message: "Expected first row to be Alice, got {store.result_sql[0].name}"

- check: store.result_sql[1].name == "Charlie"
failure_message: "Expected second row to be Charlie, got {store.result_sql[1].name}"

- log: "SUCCESS: store variables work in sql: field"

# 7. Cleanup
- connection: postgres
query: |
DROP TABLE IF EXISTS public.store_repl_source;
DROP TABLE IF EXISTS public.store_repl_target_where;
DROP TABLE IF EXISTS public.store_repl_target_sql;

- log: "Pipeline completed successfully"
Loading