Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions dags/sample_sdk.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a short note in README with a screenshot of a DAG structure?

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pathlib

import doublecloud
from doublecloud.clickhouse.v1.cluster_service_pb2 import ListClustersRequest
from doublecloud.clickhouse.v1.cluster_service_pb2_grpc import ClusterServiceStub

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook


@dag(
dag_id=pathlib.Path(__file__).stem,
dag_display_name="List ClickHouse clusters using SDK and passed service account",
tags=["sample", "clickhouse", "sdk", "service_account"],
schedule=None,
catchup=False,
)
def sample_list_ch_clusters():
@task
def get_project_id():
Copy link
Contributor

@cra cra Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest changing this could to a dag parameter instead, see how it is used in https://github.com/doublecloud/sample-airflow-dags/blob/trunk/dags/sample_ch_insert.py#L25-L28

When using decorator, dag/params are accessible through context variables, see

'''
What project to use?
'''
return "cloud"


@task
def display_clusters(project_id):
'''
Lists CH clusters using the SDK
'''
# Fetch the connection using Airflow's connection management system
# To use the functionality, go to Cluster Settings and specify a Service Account
connection = BaseHook.get_connection('doublecloud_api_private_key')

# Setup SDK using data from the connection
sdk = doublecloud.SDK(service_account_key={
"id": connection.extra_dejson.get('kid'),
"service_account_id": connection.login,
"private_key": connection.password,
})

cluster_service = sdk.client(ClusterServiceStub)
response = cluster_service.List(ListClustersRequest(
project_id=project_id,
))
print("Your CH clusters are:")
for cluster in response.clusters:
print(cluster)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, but I was thinking if we can go even further and create connections for the user?

Here's how that is done with gcp sql query
https://github.com/apache/airflow/blob/8f7616c47d5a89da83dc0a565d86fd3f618f5612/tests/system/providers/google/cloud/cloud_sql/example_cloud_sql_query.py#L442

If we can do something similar, it would make it super-useful

Alternatively, if we want to just make this as a "print all stuff we have" I'd suggest adding more similar tasks to report clusters and other entities in the account — kafka, clickhouse, transfer endpoints


display_clusters(project_id=get_project_id())


my_dag = sample_list_ch_clusters()


if __name__ == '__main__':
my_dag.test()