Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ dev/
.idea/
*.backup

**/variables.tfvars
**/variables*.tfvars

/tmp
45 changes: 45 additions & 0 deletions examples/clickpipe/bigquery_snapshot/clickpipe.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
resource "random_id" "clickpipes_suffix" {
byte_length = 4
}

locals {
snapshot_staging_path = "gs://${google_storage_bucket.clickpipes_staging_bucket.name}/${random_id.clickpipes_suffix.hex}/"
}

resource "clickhouse_clickpipe" "bigquery_snapshot" {
name = "BigQuery Snapshot ClickPipe"

service_id = var.service_id

source = {
bigquery = {
snapshot_staging_path = local.snapshot_staging_path

credentials = {
service_account_file = google_service_account_key.clickpipes_key.private_key
}

settings = {
replication_mode = "snapshot"
}

table_mappings = [for table_name in var.bigquery_table_names : {
source_dataset_name = var.bigquery_dataset_id
source_table = table_name
target_table = "${table_name}_${random_id.clickpipes_suffix.hex}"
}]
}
}

destination = {
database = "default"
}
}

output "clickpipe_id" {
value = clickhouse_clickpipe.bigquery_snapshot.id
}

output "clickpipe_state" {
value = clickhouse_clickpipe.bigquery_snapshot.state
}
107 changes: 107 additions & 0 deletions examples/clickpipe/bigquery_snapshot/gcp.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
resource "random_id" "suffix" {
byte_length = 4
}

locals {
dataset_name = split("/", data.google_bigquery_dataset.dataset.id)[length(split("/", data.google_bigquery_dataset.dataset.id)) - 1]
staging_bucket_name = "${var.gcp_project_id}-clickpipe-staging-${random_id.suffix.hex}"
sa_name = "clickpipe-bigquery-${random_id.suffix.hex}"
sa_display_name = "ClickPipe BigQuery Service Account"
}

// Ensures the BigQuery dataset and tables exist
data "google_bigquery_dataset" "dataset" {
project = var.gcp_project_id
dataset_id = var.bigquery_dataset_id
}

data "google_bigquery_table" "table" {
for_each = toset(var.bigquery_table_names)

dataset_id = var.bigquery_dataset_id
table_id = each.value
}

// This bucket is used by ClickPipe to stage data during BigQuery exports
resource "google_storage_bucket" "clickpipes_staging_bucket" {
name = local.staging_bucket_name
location = var.gcp_region
project = var.gcp_project_id
force_destroy = true // do not use in production

uniform_bucket_level_access = true

lifecycle_rule {
condition {
age = 1
}
action {
type = "Delete"
}
}
}

// Service account for ClickPipe to access BigQuery and GCS
resource "google_service_account" "clickpipes" {
project = var.gcp_project_id
account_id = local.sa_name
display_name = local.sa_display_name
description = "Service account for ClickPipe to access BigQuery and GCS"
}

// Service account key for ClickPipe
resource "google_service_account_key" "clickpipes_key" {
service_account_id = google_service_account.clickpipes.name
public_key_type = "TYPE_X509_PEM_FILE"
private_key_type = "TYPE_GOOGLE_CREDENTIALS_FILE"
}

// Allows to view BigQuery datasets and tables with dataset-level condition
resource "google_project_iam_member" "bigquery_data_viewer" {
project = var.gcp_project_id
role = "roles/bigquery.dataViewer"
member = "serviceAccount:${google_service_account.clickpipes.email}"

condition {
title = "Restrict access to specific dataset"
description = "Allow access only to the designated BigQuery dataset"
expression = "resource.name.startsWith(\"projects/${var.gcp_project_id}/datasets/${local.dataset_name}\")"
}
}

// This allows ClickPipes to run BigQuery export jobs
resource "google_project_iam_member" "bigquery_job_user" {
project = var.gcp_project_id
role = "roles/bigquery.jobUser"
member = "serviceAccount:${google_service_account.clickpipes.email}"
}

// GCS Object Admin role with bucket-level condition
resource "google_project_iam_member" "storage_object_admin" {
project = var.gcp_project_id
role = "roles/storage.objectAdmin"
member = "serviceAccount:${google_service_account.clickpipes.email}"

condition {
title = "Restrict access to staging bucket"
description = "Allow access only to the ClickPipe staging bucket"
expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")"
}
}

// GCS Bucket Viewer role with bucket-level condition
resource "google_project_iam_member" "storage_bucket_viewer" {
project = var.gcp_project_id
role = "roles/storage.bucketViewer"
member = "serviceAccount:${google_service_account.clickpipes.email}"

condition {
title = "Restrict access to staging bucket"
description = "Allow access only to the ClickPipe staging bucket"
expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")"
}
}

output "clickpipes_bigquery_service_account_email" {
value = google_service_account.clickpipes.email
}
20 changes: 20 additions & 0 deletions examples/clickpipe/bigquery_snapshot/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# This file is generated automatically please do not edit
terraform {
required_providers {
clickhouse = {
version = "3.8.1-alpha1"
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}

provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
19 changes: 19 additions & 0 deletions examples/clickpipe/bigquery_snapshot/provider.tf.template.alpha
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
terraform {
required_providers {
clickhouse = {
version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}"
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}

provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
10 changes: 10 additions & 0 deletions examples/clickpipe/bigquery_snapshot/variables.tfvars.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# these keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server
organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"
token_key = "avhj1U5QCdWAE9CA9"
token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca"
service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"

gcp_project_id = "your-project"
gcp_region = "us-central1"
bigquery_dataset_id = "test_dataset"
bigquery_table_names = ["test_table_1", "test_table_2"]
30 changes: 30 additions & 0 deletions examples/clickpipe/bigquery_snapshot/vars.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
variable "organization_id" {
description = "ClickHouse Cloud organization ID"
}
variable "token_key" {
description = "ClickHouse Cloud API token key"
}
variable "token_secret" {
description = "ClickHouse Cloud API token secret"
}

variable "service_id" {
description = "ClickHouse ClickPipe service ID"
}

variable "gcp_project_id" {
description = "GCP project ID where the BigQuery dataset is located"
}

variable "gcp_region" {
description = "GCP region for the BigQuery dataset"
}

variable "bigquery_dataset_id" {
description = "Source BigQuery dataset ID"
}

variable "bigquery_table_names" {
description = "Source BigQuery table names"
type = list(string)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
resource "clickhouse_clickpipe" "bigquery_snapshot_clickpipe" {
name = "BigQuery Snapshot ClickPipe"

service_id = "dc189652-b621-4bee-9088-b5b4c3f88626"

source = {
bigquery = {
snapshot_staging_path = "gs://my-staging-bucket/"

credentials = {
# Base64-encoded service account JSON key
service_account_file = "ewogICJuYW1lIjogInByb2plY3RzL1BST0pFQ1RfSUQvc2VydmljZUFjY291bnRzL1NFUlZJQ0VfQUNDT1VOVF9FTUFJTC9rZXlzL0tFWV9JRCIsCiAgInByaXZhdGVLZXlUeXBlIjogIlRZUEVfR09PR0xFX0NSRURFTlRJQUxTX0ZJTEUiLAogICJwcml2YXRlS2V5RGF0YSI6ICJFTkNPREVEX1BSSVZBVEVfS0VZIiwKICAidmFsaWRBZnRlclRpbWUiOiAiREFURSIsCiAgInZhbGlkQmVmb3JlVGltZSI6ICJEQVRFIiwKICAia2V5QWxnb3JpdGhtIjogIktFWV9BTEdfUlNBXzIwNDgiCn0="
}

settings = {
replication_mode = "snapshot"
}

table_mappings = [{
source_dataset_name = "test_dataset"
source_table = "test_table"
target_table = "test_table_snapshot"
}]
}
}

destination = {
database = "default"
}
}
39 changes: 24 additions & 15 deletions pkg/internal/api/clickpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,28 +149,37 @@ var ClickPipeObjectStorageCompressions = []string{

const (
// Postgres replication modes
ClickPipePostgresReplicationModeCDC = "cdc"
ClickPipePostgresReplicationModeSnapshot = "snapshot"
ClickPipePostgresReplicationModeCDCOnly = "cdc_only"
ClickPipeReplicationModeCDC = "cdc"
ClickPipeReplicationModeSnapshot = "snapshot"
ClickPipeReplicationModeCDCOnly = "cdc_only"
)

const (
ClickPipeTableEngineMergeTree = "MergeTree"
ClickPipeTableEngineReplacingMergeTree = "ReplacingMergeTree"
ClickPipeTableEngineNull = "Null"
)

var ClickPipePostgresReplicationModes = []string{
ClickPipePostgresReplicationModeCDC,
ClickPipePostgresReplicationModeSnapshot,
ClickPipePostgresReplicationModeCDCOnly,
ClickPipeReplicationModeCDC,
ClickPipeReplicationModeSnapshot,
ClickPipeReplicationModeCDCOnly,
}

const (
// Postgres table engines
ClickPipePostgresTableEngineMergeTree = "MergeTree"
ClickPipePostgresTableEngineReplacingMergeTree = "ReplacingMergeTree"
ClickPipePostgresTableEngineNull = "Null"
)
var ClickPipeBigQueryReplicationModes = []string{
ClickPipeReplicationModeSnapshot,
}

var ClickPipePostgresTableEngines = []string{
ClickPipePostgresTableEngineMergeTree,
ClickPipePostgresTableEngineReplacingMergeTree,
ClickPipePostgresTableEngineNull,
ClickPipeTableEngineMergeTree,
ClickPipeTableEngineReplacingMergeTree,
ClickPipeTableEngineNull,
}

var ClickPipeBigQueryTableEngines = []string{
ClickPipeTableEngineMergeTree,
ClickPipeTableEngineReplacingMergeTree,
ClickPipeTableEngineNull,
}

const (
Expand Down
32 changes: 32 additions & 0 deletions pkg/internal/api/clickpipe_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,43 @@ type ClickPipePostgresTableMapping struct {
TableEngine *string `json:"tableEngine,omitempty"`
}

type ClickPipeServiceAccount struct {
ServiceAccountFile string `json:"serviceAccountFile,omitempty"`
}

type ClickPipeBigQuerySettings struct {
ReplicationMode string `json:"replicationMode,omitempty"`
AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"`
InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"`
SnapshotNumRowsPerPartition *int `json:"snapshotNumRowsPerPartition,omitempty"`
SnapshotNumberOfParallelTables *int `json:"snapshotNumberOfParallelTables,omitempty"`
}

type ClickPipeBigQueryTableMapping struct {
SourceDatasetName string `json:"sourceDatasetName"`
SourceTable string `json:"sourceTable"`
TargetTable string `json:"targetTable"`
ExcludedColumns []string `json:"excludedColumns,omitempty"`
UseCustomSortingKey *bool `json:"useCustomSortingKey,omitempty"`
SortingKeys []string `json:"sortingKeys,omitempty"`
TableEngine *string `json:"tableEngine,omitempty"`
}

type ClickPipeBigQuerySource struct {
SnapshotStagingPath string `json:"snapshotStagingPath,omitempty"`
Settings ClickPipeBigQuerySettings `json:"settings"`
Mappings []ClickPipeBigQueryTableMapping `json:"tableMappings"`
TableMappingsToRemove []ClickPipeBigQueryTableMapping `json:"tableMappingsToRemove,omitempty"`
TableMappingsToAdd []ClickPipeBigQueryTableMapping `json:"tableMappingsToAdd,omitempty"`
Credentials *ClickPipeServiceAccount `json:"credentials,omitempty"`
}

type ClickPipeSource struct {
Kafka *ClickPipeKafkaSource `json:"kafka,omitempty"`
ObjectStorage *ClickPipeObjectStorageSource `json:"objectStorage,omitempty"`
Kinesis *ClickPipeKinesisSource `json:"kinesis,omitempty"`
Postgres *ClickPipePostgresSource `json:"postgres,omitempty"`
BigQuery *ClickPipeBigQuerySource `json:"bigquery,omitempty"`
ValidateSamples bool `json:"validateSamples,omitempty"`
}

Expand Down
Loading