Skip to content

Commit 5852070

Browse files
authored
Add postgres CDC terraform provider for ClickPipes (#403)
1 parent 7040424 commit 5852070

File tree

7 files changed

+1496
-105
lines changed

7 files changed

+1496
-105
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
resource "clickhouse_clickpipe" "postgres_cdc_clickpipe" {
2+
name = "My Postgres CDC ClickPipe"
3+
service_id = "e9465b4b-f7e5-4937-8e21-8d508b02843d"
4+
5+
source {
6+
postgres {
7+
host = "postgres.example.com"
8+
port = 5432
9+
database = "mydb"
10+
11+
credentials {
12+
username = "postgres_user"
13+
password = "***"
14+
}
15+
16+
settings {
17+
replication_mode = "cdc"
18+
19+
# Optional: Sync interval for polling changes (seconds)
20+
sync_interval_seconds = 60
21+
22+
# Optional: Number of rows to pull per batch
23+
pull_batch_size = 100000
24+
25+
# Optional: Allow nullable columns in destination tables
26+
allow_nullable_columns = true
27+
28+
# Optional: Number of parallel workers for initial snapshot load
29+
initial_load_parallelism = 4
30+
31+
# Optional: Number of rows per partition during snapshot
32+
snapshot_num_rows_per_partition = 100000
33+
34+
# Optional: Number of tables to snapshot in parallel
35+
snapshot_number_of_parallel_tables = 1
36+
37+
# Optional: Publication name (system-managed if not specified)
38+
# publication_name = "my_publication"
39+
40+
# Optional: Replication slot name (system-managed if not specified)
41+
# replication_slot_name = "my_replication_slot"
42+
43+
# Optional: Enable failover slots for high availability
44+
# enable_failover_slots = true
45+
}
46+
47+
table_mappings {
48+
source_schema_name = "public"
49+
source_table = "users"
50+
target_table = "public_users"
51+
52+
# Optional: Columns to exclude from replication
53+
# excluded_columns = ["password_hash", "internal_notes"]
54+
55+
# Optional: Use custom sorting key
56+
# use_custom_sorting_key = true
57+
# sorting_keys = ["id", "created_at"]
58+
59+
# Optional: Specify table engine (default: ReplacingMergeTree)
60+
# table_engine = "ReplacingMergeTree"
61+
}
62+
63+
table_mappings {
64+
source_schema_name = "public"
65+
source_table = "orders"
66+
target_table = "public_orders"
67+
}
68+
}
69+
}
70+
71+
destination {
72+
database = "default"
73+
74+
# Note: For Postgres CDC, table and columns are automatically created
75+
# based on the source schema and table_mappings configuration.
76+
# The destination.table and destination.columns fields are not used.
77+
}
78+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/hashicorp/terraform-plugin-framework v1.16.1
1111
github.com/hashicorp/terraform-plugin-framework-validators v0.19.0
1212
github.com/hashicorp/terraform-plugin-log v0.10.0
13+
github.com/stretchr/testify v1.10.0
1314
k8s.io/apimachinery v0.34.1
1415
)
1516

pkg/internal/api/clickpipe.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ const (
1919
ClickPipeStoppedState = "Stopped"
2020
ClickPipeFailedState = "Failed"
2121
ClickPipeCompletedState = "Completed"
22+
ClickPipeSnapShotState = "Snapshot"
2223
ClickPipeInternalErrorState = "InternalError"
2324
)
2425

2526
const (
26-
ClickPipeStateStart = "start"
27-
ClickPipeStateStop = "stop"
27+
ClickPipeStateStart = "start"
28+
ClickPipeStateStop = "stop"
29+
ClickPipeStateResync = "resync"
2830
)
2931

3032
const (
@@ -145,6 +147,32 @@ var ClickPipeObjectStorageCompressions = []string{
145147
ClickPipeObjectStorageCompressionZstd,
146148
}
147149

150+
const (
151+
// Postgres replication modes
152+
ClickPipePostgresReplicationModeCDC = "cdc"
153+
ClickPipePostgresReplicationModeSnapshot = "snapshot"
154+
ClickPipePostgresReplicationModeCDCOnly = "cdc_only"
155+
)
156+
157+
var ClickPipePostgresReplicationModes = []string{
158+
ClickPipePostgresReplicationModeCDC,
159+
ClickPipePostgresReplicationModeSnapshot,
160+
ClickPipePostgresReplicationModeCDCOnly,
161+
}
162+
163+
const (
164+
// Postgres table engines
165+
ClickPipePostgresTableEngineMergeTree = "MergeTree"
166+
ClickPipePostgresTableEngineReplacingMergeTree = "ReplacingMergeTree"
167+
ClickPipePostgresTableEngineNull = "Null"
168+
)
169+
170+
var ClickPipePostgresTableEngines = []string{
171+
ClickPipePostgresTableEngineMergeTree,
172+
ClickPipePostgresTableEngineReplacingMergeTree,
173+
ClickPipePostgresTableEngineNull,
174+
}
175+
148176
const (
149177
ClickPipeKafkaOffsetFromBeginningStrategy = "from_beginning"
150178
ClickPipeKafkaOffsetFromLatestStrategy = "from_latest"

pkg/internal/api/clickpipe_models.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,45 @@ type ClickPipeKinesisSource struct {
143143
IAMRole *string `json:"iamRole,omitempty"`
144144
}
145145

146+
type ClickPipePostgresSource struct {
147+
Host string `json:"host"`
148+
Port int `json:"port"`
149+
Database string `json:"database"`
150+
Credentials *ClickPipeSourceCredentials `json:"credentials,omitempty"`
151+
Settings ClickPipePostgresSettings `json:"settings"`
152+
Mappings []ClickPipePostgresTableMapping `json:"tableMappings"`
153+
TableMappingsToRemove []ClickPipePostgresTableMapping `json:"tableMappingsToRemove,omitempty"`
154+
TableMappingsToAdd []ClickPipePostgresTableMapping `json:"tableMappingsToAdd,omitempty"`
155+
}
156+
157+
type ClickPipePostgresSettings struct {
158+
SyncIntervalSeconds *int `json:"syncIntervalSeconds,omitempty"`
159+
PullBatchSize *int `json:"pullBatchSize,omitempty"`
160+
PublicationName *string `json:"publicationName,omitempty"`
161+
ReplicationMode string `json:"replicationMode"`
162+
ReplicationSlotName *string `json:"replicationSlotName,omitempty"`
163+
AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"`
164+
InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"`
165+
SnapshotNumRowsPerPartition *int `json:"snapshotNumRowsPerPartition,omitempty"`
166+
SnapshotNumberOfParallelTables *int `json:"snapshotNumberOfParallelTables,omitempty"`
167+
EnableFailoverSlots *bool `json:"enableFailoverSlots,omitempty"`
168+
}
169+
170+
type ClickPipePostgresTableMapping struct {
171+
SourceSchemaName string `json:"sourceSchemaName"`
172+
SourceTable string `json:"sourceTable"`
173+
TargetTable string `json:"targetTable"`
174+
ExcludedColumns []string `json:"excludedColumns,omitempty"`
175+
UseCustomSortingKey *bool `json:"useCustomSortingKey,omitempty"`
176+
SortingKeys []string `json:"sortingKeys,omitempty"`
177+
TableEngine *string `json:"tableEngine,omitempty"`
178+
}
179+
146180
type ClickPipeSource struct {
147181
Kafka *ClickPipeKafkaSource `json:"kafka,omitempty"`
148182
ObjectStorage *ClickPipeObjectStorageSource `json:"objectStorage,omitempty"`
149183
Kinesis *ClickPipeKinesisSource `json:"kinesis,omitempty"`
184+
Postgres *ClickPipePostgresSource `json:"postgres,omitempty"`
150185
ValidateSamples bool `json:"validateSamples,omitempty"`
151186
}
152187

@@ -170,10 +205,10 @@ type ClickPipeDestinationTableDefinition struct {
170205

171206
type ClickPipeDestination struct {
172207
Database string `json:"database"`
173-
Table string `json:"table"`
174-
ManagedTable bool `json:"managedTable"`
208+
Table *string `json:"table,omitempty"`
209+
ManagedTable *bool `json:"managedTable,omitempty"`
175210
TableDefinition *ClickPipeDestinationTableDefinition `json:"tableDefinition,omitempty"`
176-
Columns []ClickPipeDestinationColumn `json:"columns"`
211+
Columns []ClickPipeDestinationColumn `json:"columns,omitempty"`
177212
Roles []string `json:"roles,omitempty"`
178213
}
179214

@@ -193,7 +228,7 @@ type ClickPipe struct {
193228
State string `json:"state,omitempty"`
194229
Source ClickPipeSource `json:"source"`
195230
Destination ClickPipeDestination `json:"destination"`
196-
FieldMappings []ClickPipeFieldMapping `json:"fieldMappings"`
231+
FieldMappings []ClickPipeFieldMapping `json:"fieldMappings,omitempty"`
197232
Settings map[string]any `json:"settings,omitempty"`
198233
CreatedAt *time.Time `json:"createdAt,omitempty"`
199234
UpdatedAt *time.Time `json:"updatedAt,omitempty"`

0 commit comments

Comments
 (0)