1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- import asyncio
16- from quart import Quart , request
15+ from quart import Quart
16+ import quart
1717from google .auth import default
18- from google .cloud . sql . connector . instance_connection_manager import IPTypes
18+ from google .auth . transport . requests import Request
1919import logging
2020import google .cloud .logging
21- from iam_groups_authn .sync import (
22- get_credentials ,
23- get_users_with_roles ,
24- revoke_iam_group_role ,
25- grant_iam_group_role ,
26- UserService ,
27- )
28- from iam_groups_authn .sql_admin import (
29- get_instance_users ,
30- add_missing_db_users ,
31- InstanceConnectionName ,
32- )
33- from iam_groups_authn .iam_admin import get_iam_users
34- from iam_groups_authn .utils import DatabaseVersion
35- from iam_groups_authn .mysql import (
36- init_mysql_connection_engine ,
37- MysqlRoleService ,
38- mysql_username ,
39- )
40- from iam_groups_authn .postgres import (
41- init_postgres_connection_engine ,
42- PostgresRoleService ,
43- )
21+ from iam_groups_authn .sync import groups_sync
4422
45- # define scopes
23+ # define OAuth2 scopes
4624SCOPES = [
4725 "https://www.googleapis.com/auth/admin.directory.group.member.readonly" ,
4826 "https://www.googleapis.com/auth/sqlservice.admin" ,
6038 "ERROR" : logging .ERROR ,
6139}
6240
41+ # grab default creds from cloud run service account
42+ creds , project = default (scopes = SCOPES )
43+
6344
6445@app .route ("/" , methods = ["GET" ])
6546def health_check ():
@@ -68,7 +49,7 @@ def health_check():
6849
6950@app .route ("/run" , methods = ["PUT" ])
7051async def run_groups_authn ():
71- body = await request .get_json (force = True )
52+ body = await quart . request .get_json (force = True )
7253 # try reading in required request parameters and verify type, otherwise throw custom error
7354 sql_instances = body .get ("sql_instances" )
7455 if sql_instances is None or type (sql_instances ) is not list :
@@ -97,136 +78,12 @@ async def run_groups_authn():
9778 if type (log_level ) is str and log_level .upper () in log_levels :
9879 logging .getLogger ().setLevel (log_levels [log_level .upper ()])
9980
100- # set ip_type to proper type for connector
101- ip_type = IPTypes .PRIVATE if private_ip else IPTypes .PUBLIC
102-
103- # grab default creds from cloud run service account
104- creds , project = default ()
105- # update default credentials with IAM and SQL admin scopes
106- updated_creds = get_credentials (creds , SCOPES )
107-
108- # create UserService object for API calls
109- user_service = UserService (updated_creds )
110-
111- # keep track of IAM group and database instance tasks
112- group_tasks = {}
113- instance_tasks = {}
114-
115- # loop iam_groups and sql_instances creating async tasks
116- for group in iam_groups :
117- group_task = asyncio .create_task (get_iam_users (user_service , group ))
118- group_tasks [group ] = group_task
119-
120- for instance in sql_instances :
121- instance_task = asyncio .create_task (get_instance_users (user_service , instance ))
122- database_version_task = asyncio .create_task (
123- user_service .get_database_version (
124- InstanceConnectionName (* instance .split (":" ))
125- )
126- )
127- instance_tasks [instance ] = (instance_task , database_version_task )
128-
129- # create pairings of iam groups and instances
130- for group in iam_groups :
131- for instance in sql_instances :
132-
133- # get database version of instance and check if supported
134- database_version = await instance_tasks [instance ][1 ]
135- try :
136- database_version = DatabaseVersion (database_version )
137- except ValueError as e :
138- raise ValueError (
139- f"Unsupported database version for instance `{ instance } `. Current supported versions are: { list (DatabaseVersion .__members__ .keys ())} "
140- ) from e
141-
142- # add missing IAM group members to database
143- add_users_task = asyncio .create_task (
144- add_missing_db_users (
145- user_service ,
146- group_tasks [group ],
147- instance_tasks [instance ][0 ],
148- instance ,
149- database_version ,
150- )
151- )
152-
153- # initialize database connection pool
154- if database_version .is_mysql ():
155- db = init_mysql_connection_engine (instance , updated_creds , ip_type )
156- role_service = MysqlRoleService (db )
157- else :
158- db = init_postgres_connection_engine (instance , updated_creds , ip_type )
159- role_service = PostgresRoleService (db )
160- logging .debug (
161- "[%s][%s] Initialized a %s connection pool."
162- % (instance , group , database_version .value )
163- )
164-
165- # verify role for IAM group exists on database, create if does not exist
166- role = mysql_username (group )
167- verify_role_task = asyncio .create_task (role_service .create_group_role (role ))
168-
169- # get database users who have group role
170- users_with_roles_task = asyncio .create_task (
171- get_users_with_roles (role_service , role )
172- )
173-
174- # await dependent tasks
175- results = await asyncio .gather (
176- add_users_task , verify_role_task , return_exceptions = True
177- )
178- # raise exception if found
179- for result in results :
180- if issubclass (type (result ), Exception ):
181- raise result
182-
183- # log IAM users added as database users
184- added_users = results [0 ]
185- logging .debug (
186- "[%s][%s] Users added to database: %s."
187- % (instance , group , list (added_users ))
188- )
189-
190- # revoke group role from users no longer in IAM group
191- revoke_role_task = asyncio .create_task (
192- revoke_iam_group_role (
193- role_service ,
194- role ,
195- users_with_roles_task ,
196- group_tasks [group ],
197- database_version ,
198- )
199- )
200-
201- # grant group role to IAM users who are missing it on database
202- grant_role_task = asyncio .create_task (
203- grant_iam_group_role (
204- role_service ,
205- role ,
206- users_with_roles_task ,
207- group_tasks [group ],
208- database_version ,
209- )
210- )
211- results = await asyncio .gather (
212- revoke_role_task , grant_role_task , return_exceptions = True
213- )
214- # raise exception if found
215- for result in results :
216- if issubclass (type (result ), Exception ):
217- raise result
81+ # check if credentials are expired
82+ if not creds .valid :
83+ request = Request ()
84+ creds .refresh (request )
21885
219- # log sync info
220- revoked_users , granted_users = results
221- logging .info (
222- "[%s][%s] Sync successful: %s users were revoked group role, %s users were granted group role."
223- % (instance , group , len (revoked_users ), len (granted_users ))
224- )
225- logging .debug (
226- "[%s][%s] Users revoked role: %s." % (instance , group , revoked_users )
227- )
228- logging .debug (
229- "[%s][%s] Users granted role: %s." % (instance , group , granted_users )
230- )
86+ # sync IAM groups to Cloud SQL instances
87+ await groups_sync (iam_groups , sql_instances , creds , private_ip )
23188
23289 return "Sync successful." , 200
0 commit comments