Skip to content

Commit c40f64f

Browse files
committed
feat(clickpipes): BigQuery source support
This PR enables creating BigQuery ClickPipe. This is in private preview.
1 parent 5852070 commit c40f64f

File tree

9 files changed

+766
-31
lines changed

9 files changed

+766
-31
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
resource "random_id" "clickpipes_suffix" {
2+
byte_length = 4
3+
}
4+
5+
locals {
6+
snapshot_staging_path = "gs://${google_storage_bucket.clickpipes_staging_bucket.name}/${random_id.clickpipes_suffix.hex}/"
7+
}
8+
9+
resource "clickhouse_clickpipe" "bigquery_snapshot" {
10+
name = "BigQuery Snapshot ClickPipe"
11+
12+
service_id = var.service_id
13+
14+
source = {
15+
bigquery = {
16+
snapshot_staging_path = local.snapshot_staging_path
17+
18+
credentials = {
19+
service_account_file = google_service_account_key.clickpipes_key.private_key
20+
}
21+
22+
settings = {
23+
replication_mode = "snapshot"
24+
}
25+
26+
table_mappings = [for table_name in var.bigquery_table_names : {
27+
source_dataset_name = var.bigquery_dataset_id
28+
source_table = table_name
29+
target_table = "${table_name}_${random_id.clickpipes_suffix.hex}"
30+
}]
31+
}
32+
}
33+
34+
destination = {
35+
database = "default"
36+
}
37+
}
38+
39+
output "clickpipe_id" {
40+
value = clickhouse_clickpipe.bigquery_snapshot.id
41+
}
42+
43+
output "clickpipe_state" {
44+
value = clickhouse_clickpipe.bigquery_snapshot.state
45+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
resource "random_id" "suffix" {
2+
byte_length = 4
3+
}
4+
5+
locals {
6+
dataset_name = split("/", data.google_bigquery_dataset.dataset.id)[length(split("/", data.google_bigquery_dataset.dataset.id)) - 1]
7+
staging_bucket_name = "${var.gcp_project_id}-clickpipe-staging-${random_id.suffix.hex}"
8+
sa_name = "clickpipe-bigquery-${random_id.suffix.hex}"
9+
sa_display_name = "ClickPipe BigQuery Service Account"
10+
}
11+
12+
// Ensures the BigQuery dataset and tables exist
13+
data "google_bigquery_dataset" "dataset" {
14+
project = var.gcp_project_id
15+
dataset_id = var.bigquery_dataset_id
16+
}
17+
18+
data "google_bigquery_table" "table" {
19+
for_each = toset(var.bigquery_table_names)
20+
21+
dataset_id = var.bigquery_dataset_id
22+
table_id = each.value
23+
}
24+
25+
// This bucket is used by ClickPipe to stage data during BigQuery exports
26+
resource "google_storage_bucket" "clickpipes_staging_bucket" {
27+
name = local.staging_bucket_name
28+
location = var.gcp_region
29+
project = var.gcp_project_id
30+
force_destroy = true // do not use in production
31+
32+
uniform_bucket_level_access = true
33+
34+
lifecycle_rule {
35+
condition {
36+
age = 1
37+
}
38+
action {
39+
type = "Delete"
40+
}
41+
}
42+
}
43+
44+
// Service account for ClickPipe to access BigQuery and GCS
45+
resource "google_service_account" "clickpipes" {
46+
project = var.gcp_project_id
47+
account_id = local.sa_name
48+
display_name = local.sa_display_name
49+
description = "Service account for ClickPipe to access BigQuery and GCS"
50+
}
51+
52+
// Service account key for ClickPipe
53+
resource "google_service_account_key" "clickpipes_key" {
54+
service_account_id = google_service_account.clickpipes.name
55+
public_key_type = "TYPE_X509_PEM_FILE"
56+
private_key_type = "TYPE_GOOGLE_CREDENTIALS_FILE"
57+
}
58+
59+
// Allows to view BigQuery datasets and tables with dataset-level condition
60+
resource "google_project_iam_member" "bigquery_data_viewer" {
61+
project = var.gcp_project_id
62+
role = "roles/bigquery.dataViewer"
63+
member = "serviceAccount:${google_service_account.clickpipes.email}"
64+
65+
condition {
66+
title = "Restrict access to specific dataset"
67+
description = "Allow access only to the designated BigQuery dataset"
68+
expression = "resource.name.startsWith(\"projects/${var.gcp_project_id}/datasets/${local.dataset_name}\")"
69+
}
70+
}
71+
72+
// This allows ClickPipes to run BigQuery export jobs
73+
resource "google_project_iam_member" "bigquery_job_user" {
74+
project = var.gcp_project_id
75+
role = "roles/bigquery.jobUser"
76+
member = "serviceAccount:${google_service_account.clickpipes.email}"
77+
}
78+
79+
// GCS Object Admin role with bucket-level condition
80+
resource "google_project_iam_member" "storage_object_admin" {
81+
project = var.gcp_project_id
82+
role = "roles/storage.objectAdmin"
83+
member = "serviceAccount:${google_service_account.clickpipes.email}"
84+
85+
condition {
86+
title = "Restrict access to staging bucket"
87+
description = "Allow access only to the ClickPipe staging bucket"
88+
expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")"
89+
}
90+
}
91+
92+
// GCS Bucket Viewer role with bucket-level condition
93+
resource "google_project_iam_member" "storage_bucket_viewer" {
94+
project = var.gcp_project_id
95+
role = "roles/storage.bucketViewer"
96+
member = "serviceAccount:${google_service_account.clickpipes.email}"
97+
98+
condition {
99+
title = "Restrict access to staging bucket"
100+
description = "Allow access only to the ClickPipe staging bucket"
101+
expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")"
102+
}
103+
}
104+
105+
output "clickpipes_bigquery_service_account_email" {
106+
value = google_service_account.clickpipes.email
107+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# This file is generated automatically please do not edit
2+
terraform {
3+
required_providers {
4+
clickhouse = {
5+
version = "3.8.1-alpha1"
6+
source = "ClickHouse/clickhouse"
7+
}
8+
}
9+
}
10+
11+
provider "clickhouse" {
12+
organization_id = var.organization_id
13+
token_key = var.token_key
14+
token_secret = var.token_secret
15+
}
16+
17+
provider "google" {
18+
project = var.gcp_project_id
19+
region = var.gcp_region
20+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
terraform {
2+
required_providers {
3+
clickhouse = {
4+
version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}"
5+
source = "ClickHouse/clickhouse"
6+
}
7+
}
8+
}
9+
10+
provider "clickhouse" {
11+
organization_id = var.organization_id
12+
token_key = var.token_key
13+
token_secret = var.token_secret
14+
}
15+
16+
provider "google" {
17+
project = var.gcp_project_id
18+
region = var.gcp_region
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
variable "organization_id" {
2+
description = "ClickHouse Cloud organization ID"
3+
}
4+
variable "token_key" {
5+
description = "ClickHouse Cloud API token key"
6+
}
7+
variable "token_secret" {
8+
description = "ClickHouse Cloud API token secret"
9+
}
10+
11+
variable "service_id" {
12+
description = "ClickHouse ClickPipe service ID"
13+
}
14+
15+
variable "gcp_project_id" {
16+
description = "GCP project ID where the BigQuery dataset is located"
17+
}
18+
19+
variable "gcp_region" {
20+
description = "GCP region for the BigQuery dataset"
21+
}
22+
23+
variable "bigquery_dataset_id" {
24+
description = "Source BigQuery dataset ID"
25+
}
26+
27+
variable "bigquery_table_names" {
28+
description = "Source BigQuery table names"
29+
type = list(string)
30+
}

pkg/internal/api/clickpipe.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -149,28 +149,37 @@ var ClickPipeObjectStorageCompressions = []string{
149149

150150
const (
151151
// Postgres replication modes
152-
ClickPipePostgresReplicationModeCDC = "cdc"
153-
ClickPipePostgresReplicationModeSnapshot = "snapshot"
154-
ClickPipePostgresReplicationModeCDCOnly = "cdc_only"
152+
ClickPipeReplicationModeCDC = "cdc"
153+
ClickPipeReplicationModeSnapshot = "snapshot"
154+
ClickPipeReplicationModeCDCOnly = "cdc_only"
155+
)
156+
157+
const (
158+
ClickPipeTableEngineMergeTree = "MergeTree"
159+
ClickPipeTableEngineReplacingMergeTree = "ReplacingMergeTree"
160+
ClickPipeTableEngineNull = "Null"
155161
)
156162

157163
var ClickPipePostgresReplicationModes = []string{
158-
ClickPipePostgresReplicationModeCDC,
159-
ClickPipePostgresReplicationModeSnapshot,
160-
ClickPipePostgresReplicationModeCDCOnly,
164+
ClickPipeReplicationModeCDC,
165+
ClickPipeReplicationModeSnapshot,
166+
ClickPipeReplicationModeCDCOnly,
161167
}
162168

163-
const (
164-
// Postgres table engines
165-
ClickPipePostgresTableEngineMergeTree = "MergeTree"
166-
ClickPipePostgresTableEngineReplacingMergeTree = "ReplacingMergeTree"
167-
ClickPipePostgresTableEngineNull = "Null"
168-
)
169+
var ClickPipeBigQueryReplicationModes = []string{
170+
ClickPipeReplicationModeSnapshot,
171+
}
169172

170173
var ClickPipePostgresTableEngines = []string{
171-
ClickPipePostgresTableEngineMergeTree,
172-
ClickPipePostgresTableEngineReplacingMergeTree,
173-
ClickPipePostgresTableEngineNull,
174+
ClickPipeTableEngineMergeTree,
175+
ClickPipeTableEngineReplacingMergeTree,
176+
ClickPipeTableEngineNull,
177+
}
178+
179+
var ClickPipeBigQueryTableEngines = []string{
180+
ClickPipeTableEngineMergeTree,
181+
ClickPipeTableEngineReplacingMergeTree,
182+
ClickPipeTableEngineNull,
174183
}
175184

176185
const (

pkg/internal/api/clickpipe_models.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,43 @@ type ClickPipePostgresTableMapping struct {
177177
TableEngine *string `json:"tableEngine,omitempty"`
178178
}
179179

180+
type ClickPipeServiceAccount struct {
181+
ServiceAccountFile string `json:"serviceAccountFile,omitempty"`
182+
}
183+
184+
type ClickPipeBigQuerySettings struct {
185+
ReplicationMode string `json:"replicationMode,omitempty"`
186+
AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"`
187+
InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"`
188+
SnapshotNumRowsPerPartition *int `json:"snapshotNumRowsPerPartition,omitempty"`
189+
SnapshotNumberOfParallelTables *int `json:"snapshotNumberOfParallelTables,omitempty"`
190+
}
191+
192+
type ClickPipeBigQueryTableMapping struct {
193+
SourceDatasetName string `json:"sourceDatasetName"`
194+
SourceTable string `json:"sourceTable"`
195+
TargetTable string `json:"targetTable"`
196+
ExcludedColumns []string `json:"excludedColumns,omitempty"`
197+
UseCustomSortingKey *bool `json:"useCustomSortingKey,omitempty"`
198+
SortingKeys []string `json:"sortingKeys,omitempty"`
199+
TableEngine *string `json:"tableEngine,omitempty"`
200+
}
201+
202+
type ClickPipeBigQuerySource struct {
203+
SnapshotStagingPath string `json:"snapshotStagingPath,omitempty"`
204+
Settings ClickPipeBigQuerySettings `json:"settings"`
205+
Mappings []ClickPipeBigQueryTableMapping `json:"tableMappings"`
206+
TableMappingsToRemove []ClickPipeBigQueryTableMapping `json:"tableMappingsToRemove,omitempty"`
207+
TableMappingsToAdd []ClickPipeBigQueryTableMapping `json:"tableMappingsToAdd,omitempty"`
208+
Credentials *ClickPipeServiceAccount `json:"credentials,omitempty"`
209+
}
210+
180211
type ClickPipeSource struct {
181212
Kafka *ClickPipeKafkaSource `json:"kafka,omitempty"`
182213
ObjectStorage *ClickPipeObjectStorageSource `json:"objectStorage,omitempty"`
183214
Kinesis *ClickPipeKinesisSource `json:"kinesis,omitempty"`
184215
Postgres *ClickPipePostgresSource `json:"postgres,omitempty"`
216+
BigQuery *ClickPipeBigQuerySource `json:"bigquery,omitempty"`
185217
ValidateSamples bool `json:"validateSamples,omitempty"`
186218
}
187219

0 commit comments

Comments
 (0)