Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ dev/
.idea/
*.backup

**/variables.tfvars
**/variables*.tfvars

/tmp
16 changes: 16 additions & 0 deletions examples/clickpipe/bigquery_snapshot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
## ClickPipe BigQuery snapshot example

This example demonstrates how to deploy a BigQuery snapshot ClickPipe using Terraform.

It provisions all necessary GCP prerequisites, including:
- GCS staging bucket
- IAM service account with required permissions
- IAM service account key

BigQuery dataset and table must already exist.

## How to run

- Rename `variables.tfvars.sample` to `variables.tfvars` and fill in all needed data.
- Run `terraform init`
- Run `terraform <plan|apply> -var-file=variables.tfvars`
45 changes: 45 additions & 0 deletions examples/clickpipe/bigquery_snapshot/clickpipe.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
resource "random_id" "clickpipes_suffix" {
byte_length = 4
}

locals {
snapshot_staging_path = "gs://${google_storage_bucket.clickpipes_staging_bucket.name}/${random_id.clickpipes_suffix.hex}/"
}

resource "clickhouse_clickpipe" "bigquery_snapshot" {
name = "BigQuery Snapshot ClickPipe"

service_id = var.service_id

source = {
bigquery = {
snapshot_staging_path = local.snapshot_staging_path

credentials = {
service_account_file = google_service_account_key.clickpipes_key.private_key
}

settings = {
replication_mode = "snapshot"
}

table_mappings = [for table_name in var.bigquery_table_names : {
source_dataset_name = var.bigquery_dataset_id
source_table = table_name
target_table = "${table_name}_${random_id.clickpipes_suffix.hex}"
}]
}
}

destination = {
database = "default"
}
}

output "clickpipe_id" {
value = clickhouse_clickpipe.bigquery_snapshot.id
}

output "clickpipe_state" {
value = clickhouse_clickpipe.bigquery_snapshot.state
}
107 changes: 107 additions & 0 deletions examples/clickpipe/bigquery_snapshot/gcp.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
resource "random_id" "suffix" {
byte_length = 4
}

locals {
dataset_name = split("/", data.google_bigquery_dataset.dataset.id)[length(split("/", data.google_bigquery_dataset.dataset.id)) - 1]
staging_bucket_name = "${var.gcp_project_id}-clickpipe-staging-${random_id.suffix.hex}"
sa_name = "clickpipe-bigquery-${random_id.suffix.hex}"
sa_display_name = "ClickPipe BigQuery Service Account"
}

// Ensures the BigQuery dataset and tables exist
data "google_bigquery_dataset" "dataset" {
project = var.gcp_project_id
dataset_id = var.bigquery_dataset_id
}

data "google_bigquery_table" "table" {
for_each = toset(var.bigquery_table_names)

dataset_id = var.bigquery_dataset_id
table_id = each.value
}

// This bucket is used by ClickPipe to stage data during BigQuery exports
resource "google_storage_bucket" "clickpipes_staging_bucket" {
name = local.staging_bucket_name
location = var.gcp_region
project = var.gcp_project_id
force_destroy = true // do not use in production

uniform_bucket_level_access = true

lifecycle_rule {
condition {
age = 1
}
action {
type = "Delete"
}
}
}

// Service account for ClickPipe to access BigQuery and GCS
resource "google_service_account" "clickpipes" {
project = var.gcp_project_id
account_id = local.sa_name
display_name = local.sa_display_name
description = "Service account for ClickPipe to access BigQuery and GCS"
}

// Service account key for ClickPipe
resource "google_service_account_key" "clickpipes_key" {
service_account_id = google_service_account.clickpipes.name
public_key_type = "TYPE_X509_PEM_FILE"
private_key_type = "TYPE_GOOGLE_CREDENTIALS_FILE"
}

// Allows to view BigQuery datasets and tables with dataset-level condition
resource "google_project_iam_member" "bigquery_data_viewer" {
project = var.gcp_project_id
role = "roles/bigquery.dataViewer"
member = "serviceAccount:${google_service_account.clickpipes.email}"

condition {
title = "Restrict access to specific dataset"
description = "Allow access only to the designated BigQuery dataset"
expression = "resource.name.startsWith(\"projects/${var.gcp_project_id}/datasets/${local.dataset_name}\")"
}
}

// This allows ClickPipes to run BigQuery export jobs
resource "google_project_iam_member" "bigquery_job_user" {
project = var.gcp_project_id
role = "roles/bigquery.jobUser"
member = "serviceAccount:${google_service_account.clickpipes.email}"
}

// GCS Object Admin role with bucket-level condition
resource "google_project_iam_member" "storage_object_admin" {
project = var.gcp_project_id
role = "roles/storage.objectAdmin"
member = "serviceAccount:${google_service_account.clickpipes.email}"

condition {
title = "Restrict access to staging bucket"
description = "Allow access only to the ClickPipe staging bucket"
expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")"
}
}

// GCS Bucket Viewer role with bucket-level condition
resource "google_project_iam_member" "storage_bucket_viewer" {
project = var.gcp_project_id
role = "roles/storage.bucketViewer"
member = "serviceAccount:${google_service_account.clickpipes.email}"

condition {
title = "Restrict access to staging bucket"
description = "Allow access only to the ClickPipe staging bucket"
expression = "resource.name.startsWith(\"projects/_/buckets/${local.staging_bucket_name}\")"
}
}

output "clickpipes_bigquery_service_account_email" {
value = google_service_account.clickpipes.email
}
20 changes: 20 additions & 0 deletions examples/clickpipe/bigquery_snapshot/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# This file is generated automatically please do not edit
terraform {
required_providers {
clickhouse = {
version = "3.8.1-alpha1"
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}

provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
19 changes: 19 additions & 0 deletions examples/clickpipe/bigquery_snapshot/provider.tf.template.alpha
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
terraform {
required_providers {
clickhouse = {
version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}"
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}

provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
10 changes: 10 additions & 0 deletions examples/clickpipe/bigquery_snapshot/variables.tfvars.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# these keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server
organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"
token_key = "avhj1U5QCdWAE9CA9"
token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca"
service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"

gcp_project_id = "your-project"
gcp_region = "us-central1"
bigquery_dataset_id = "test_dataset"
bigquery_table_names = ["test_table_1", "test_table_2"]
30 changes: 30 additions & 0 deletions examples/clickpipe/bigquery_snapshot/vars.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
variable "organization_id" {
description = "ClickHouse Cloud organization ID"
}
variable "token_key" {
description = "ClickHouse Cloud API token key"
}
variable "token_secret" {
description = "ClickHouse Cloud API token secret"
}

variable "service_id" {
description = "ClickHouse ClickPipe service ID"
}

variable "gcp_project_id" {
description = "GCP project ID where the BigQuery dataset is located"
}

variable "gcp_region" {
description = "GCP region for the BigQuery dataset"
}

variable "bigquery_dataset_id" {
description = "Source BigQuery dataset ID"
}

variable "bigquery_table_names" {
description = "Source BigQuery table names"
type = list(string)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
resource "clickhouse_clickpipe" "bigquery_snapshot_clickpipe" {
name = "BigQuery Snapshot ClickPipe"

service_id = "dc189652-b621-4bee-9088-b5b4c3f88626"

source = {
bigquery = {
snapshot_staging_path = "gs://my-staging-bucket/"

credentials = {
# Base64-encoded service account JSON key
service_account_file = "ewogICJuYW1lIjogInByb2plY3RzL1BST0pFQ1RfSUQvc2VydmljZUFjY291bnRzL1NFUlZJQ0VfQUNDT1VOVF9FTUFJTC9rZXlzL0tFWV9JRCIsCiAgInByaXZhdGVLZXlUeXBlIjogIlRZUEVfR09PR0xFX0NSRURFTlRJQUxTX0ZJTEUiLAogICJwcml2YXRlS2V5RGF0YSI6ICJFTkNPREVEX1BSSVZBVEVfS0VZIiwKICAidmFsaWRBZnRlclRpbWUiOiAiREFURSIsCiAgInZhbGlkQmVmb3JlVGltZSI6ICJEQVRFIiwKICAia2V5QWxnb3JpdGhtIjogIktFWV9BTEdfUlNBXzIwNDgiCn0="
}

settings = {
replication_mode = "snapshot"
}

table_mappings = [{
source_dataset_name = "test_dataset"
source_table = "test_table"
target_table = "test_table_snapshot"
}]
}
}

destination = {
database = "default"
}
}
39 changes: 24 additions & 15 deletions pkg/internal/api/clickpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,28 +149,37 @@ var ClickPipeObjectStorageCompressions = []string{

const (
// Postgres replication modes
ClickPipePostgresReplicationModeCDC = "cdc"
ClickPipePostgresReplicationModeSnapshot = "snapshot"
ClickPipePostgresReplicationModeCDCOnly = "cdc_only"
ClickPipeReplicationModeCDC = "cdc"
ClickPipeReplicationModeSnapshot = "snapshot"
ClickPipeReplicationModeCDCOnly = "cdc_only"
)

const (
ClickPipeTableEngineMergeTree = "MergeTree"
ClickPipeTableEngineReplacingMergeTree = "ReplacingMergeTree"
ClickPipeTableEngineNull = "Null"
)

var ClickPipePostgresReplicationModes = []string{
ClickPipePostgresReplicationModeCDC,
ClickPipePostgresReplicationModeSnapshot,
ClickPipePostgresReplicationModeCDCOnly,
ClickPipeReplicationModeCDC,
ClickPipeReplicationModeSnapshot,
ClickPipeReplicationModeCDCOnly,
}

const (
// Postgres table engines
ClickPipePostgresTableEngineMergeTree = "MergeTree"
ClickPipePostgresTableEngineReplacingMergeTree = "ReplacingMergeTree"
ClickPipePostgresTableEngineNull = "Null"
)
var ClickPipeBigQueryReplicationModes = []string{
ClickPipeReplicationModeSnapshot,
}

var ClickPipePostgresTableEngines = []string{
ClickPipePostgresTableEngineMergeTree,
ClickPipePostgresTableEngineReplacingMergeTree,
ClickPipePostgresTableEngineNull,
ClickPipeTableEngineMergeTree,
ClickPipeTableEngineReplacingMergeTree,
ClickPipeTableEngineNull,
}

var ClickPipeBigQueryTableEngines = []string{
ClickPipeTableEngineMergeTree,
ClickPipeTableEngineReplacingMergeTree,
ClickPipeTableEngineNull,
}

const (
Expand Down
32 changes: 32 additions & 0 deletions pkg/internal/api/clickpipe_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,43 @@ type ClickPipePostgresTableMapping struct {
TableEngine *string `json:"tableEngine,omitempty"`
}

type ClickPipeServiceAccount struct {
ServiceAccountFile string `json:"serviceAccountFile,omitempty"`
}

type ClickPipeBigQuerySettings struct {
ReplicationMode string `json:"replicationMode,omitempty"`
AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"`
InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"`
SnapshotNumRowsPerPartition *int `json:"snapshotNumRowsPerPartition,omitempty"`
SnapshotNumberOfParallelTables *int `json:"snapshotNumberOfParallelTables,omitempty"`
}

type ClickPipeBigQueryTableMapping struct {
SourceDatasetName string `json:"sourceDatasetName"`
SourceTable string `json:"sourceTable"`
TargetTable string `json:"targetTable"`
ExcludedColumns []string `json:"excludedColumns,omitempty"`
UseCustomSortingKey *bool `json:"useCustomSortingKey,omitempty"`
SortingKeys []string `json:"sortingKeys,omitempty"`
TableEngine *string `json:"tableEngine,omitempty"`
}

type ClickPipeBigQuerySource struct {
SnapshotStagingPath string `json:"snapshotStagingPath,omitempty"`
Settings ClickPipeBigQuerySettings `json:"settings"`
Mappings []ClickPipeBigQueryTableMapping `json:"tableMappings"`
TableMappingsToRemove []ClickPipeBigQueryTableMapping `json:"tableMappingsToRemove,omitempty"`
TableMappingsToAdd []ClickPipeBigQueryTableMapping `json:"tableMappingsToAdd,omitempty"`
Credentials *ClickPipeServiceAccount `json:"credentials,omitempty"`
}

type ClickPipeSource struct {
Kafka *ClickPipeKafkaSource `json:"kafka,omitempty"`
ObjectStorage *ClickPipeObjectStorageSource `json:"objectStorage,omitempty"`
Kinesis *ClickPipeKinesisSource `json:"kinesis,omitempty"`
Postgres *ClickPipePostgresSource `json:"postgres,omitempty"`
BigQuery *ClickPipeBigQuerySource `json:"bigquery,omitempty"`
ValidateSamples bool `json:"validateSamples,omitempty"`
}

Expand Down
Loading