-
Notifications
You must be signed in to change notification settings - Fork 0
Sample DAG with SDK #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import pathlib | ||
|
|
||
| import doublecloud | ||
| from doublecloud.clickhouse.v1.cluster_service_pb2 import ListClustersRequest | ||
| from doublecloud.clickhouse.v1.cluster_service_pb2_grpc import ClusterServiceStub | ||
|
|
||
| from airflow.decorators import dag, task | ||
| from airflow.hooks.base import BaseHook | ||
|
|
||
|
|
||
| @dag( | ||
| dag_id=pathlib.Path(__file__).stem, | ||
| dag_display_name="List ClickHouse clusters using SDK and passed service account", | ||
| tags=["sample", "clickhouse", "sdk", "service_account"], | ||
| schedule=None, | ||
| catchup=False, | ||
| ) | ||
| def sample_list_ch_clusters(): | ||
| @task | ||
| def get_project_id(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest changing this could to a dag parameter instead, see how it is used in https://github.com/doublecloud/sample-airflow-dags/blob/trunk/dags/sample_ch_insert.py#L25-L28 When using decorator, dag/params are accessible through context variables, see |
||
| ''' | ||
| What project to use? | ||
| ''' | ||
| return "cloud" | ||
|
|
||
|
|
||
| @task | ||
| def display_clusters(project_id): | ||
| ''' | ||
| Lists CH clusters using the SDK | ||
| ''' | ||
| # Fetch the connection using Airflow's connection management system | ||
| # To use the functionality, go to Cluster Settings and specify a Service Account | ||
| connection = BaseHook.get_connection('doublecloud_api_private_key') | ||
|
|
||
| # Setup SDK using data from the connection | ||
| sdk = doublecloud.SDK(service_account_key={ | ||
| "id": connection.extra_dejson.get('kid'), | ||
| "service_account_id": connection.login, | ||
| "private_key": connection.password, | ||
| }) | ||
|
|
||
| cluster_service = sdk.client(ClusterServiceStub) | ||
| response = cluster_service.List(ListClustersRequest( | ||
| project_id=project_id, | ||
| )) | ||
| print("Your CH clusters are:") | ||
| for cluster in response.clusters: | ||
| print(cluster) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is good, but I was thinking if we can go even further and create connections for the user? Here's how that is done with gcp sql query If we can do something similar, it would make it super-useful Alternatively, if we want to just make this as a "print all stuff we have" I'd suggest adding more similar tasks to report clusters and other entities in the account — kafka, clickhouse, transfer endpoints |
||
|
|
||
| display_clusters(project_id=get_project_id()) | ||
|
|
||
|
|
||
| my_dag = sample_list_ch_clusters() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| my_dag.test() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also add a short note in README with a screenshot of a DAG structure?