From 2ad1dc049ba864527042e0abfead750dda7e6bcd Mon Sep 17 00:00:00 2001 From: yuanzhou Date: Tue, 26 Aug 2025 19:25:03 -0400 Subject: [PATCH 1/3] Add logging --- src/app.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/app.py b/src/app.py index 36bff7ab..e4a24ee6 100644 --- a/src/app.py +++ b/src/app.py @@ -4163,6 +4163,8 @@ def bulk_update_entities( "success": res.ok, "data": res.json() if res.ok else res.json().get("error"), } + + logger.info(f"Successfully made {idx + 1} internal entity-api call to update {uuid}") except requests.exceptions.RequestException as e: logger.error(f"Failed to update entity {uuid}: {e}") results[uuid] = {"success": False, "data": str(e)} @@ -4173,6 +4175,9 @@ def bulk_update_entities( if idx < len(entity_updates) - 1: time.sleep(throttle) + logger.info("Returning bulk_update_entities() resulting data") + logger.info(results) + return results @@ -4233,6 +4238,10 @@ def entity_bulk_update(): validate_user_update_privilege(entity, user_token) uuids = [e.get("uuid") for e in entities] + + logger.info(f"Bulk updating the following {entity_type} uuids:") + logger.info(uuids) + if None in uuids: bad_request_error(f"All {entity_type}s must have a 'uuid' field") if len(set(uuids)) != len(uuids): @@ -4257,6 +4266,9 @@ def entity_bulk_update(): bad_request_error(f"No {entity_type} found with the following uuids: {', '.join(diff)}") entity_api_url = app.config["ENTITY_API_URL"].rstrip('/') + + logger.info(f"About to bulk update {len(entities)} {entity_type} in a separate thread...") + thread_instance =\ threading.Thread(target=update_datasets_uploads, args=(entities, user_token, entity_api_url)) From 2a07a3dd5a6fe79be61a3ea1600526ebbf2a5d73 Mon Sep 17 00:00:00 2001 From: yuanzhou Date: Tue, 26 Aug 2025 21:29:53 -0400 Subject: [PATCH 2/3] Bulk update tweaks --- src/app.py | 217 +++++++++++++++++++++++++++-------------------------- 1 file changed, 112 insertions(+), 105 deletions(-) diff --git a/src/app.py b/src/app.py index e4a24ee6..c681abf1 100644 --- a/src/app.py +++ b/src/app.py @@ -68,6 +68,7 @@ app.config['UUID_API_URL'] = app.config['UUID_API_URL'].strip('/') app.config['INGEST_API_URL'] = app.config['INGEST_API_URL'].strip('/') app.config['ONTOLOGY_API_URL'] = app.config['ONTOLOGY_API_URL'].strip('/') +app.config['ENTITY_API_URL'] = app.config['ENTITY_API_URL'].strip('/') app.config['SEARCH_API_URL'] = app.config['SEARCH_API_URL'].strip('/') S3_settings_dict = {'large_response_threshold': app.config['LARGE_RESPONSE_THRESHOLD'] @@ -4094,107 +4095,6 @@ def multiple_components(): return jsonify(normalized_complete_entity_list) -""" -Bulk update the entities in the entity-api. - -This function supports request throttling and retries. - -Parameters ----------- -entity_updates : dict - The dictionary of entity updates. The key is the uuid and the value is the - update dictionary. -token : str - The groups token for the request. -entity_api_url : str - The url of the entity-api. -total_tries : int, optional - The number of total requests to be made for each update, by default 3. -throttle : float, optional - The time to wait between requests and retries, by default 5. -after_each_callback : Callable[[int], None], optional - A callback function to be called after each update, by default None. The index - of the update is passed as a parameter to the callback. - -Returns -------- -dict - The results of the bulk update. The key is the uuid of the entity. If - successful, the value is a dictionary with "success" as True and "data" as the - entity data. If failed, the value is a dictionary with "success" as False and - "data" as the error message. -""" -def bulk_update_entities( - entity_updates: dict, - token: str, - entity_api_url: str, - total_tries: int = 3, - throttle: float = 5, - after_each_callback: Optional[Callable[[int], None]] = None, -) -> dict: - headers = { - "Authorization": f"Bearer {token}", - SchemaConstants.HUBMAP_APP_HEADER: SchemaConstants.ENTITY_API_APP, - } - # create a session with retries - session = requests.Session() - session.headers = headers - retries = Retry( - total=total_tries, - backoff_factor=throttle, - status_forcelist=[500, 502, 503, 504], - ) - session.mount(entity_api_url, HTTPAdapter(max_retries=retries)) - - results = {} - with session as s: - for idx, (uuid, payload) in enumerate(entity_updates.items()): - try: - # https://github.com/hubmapconsortium/entity-api/issues/698#issuecomment-2260799700 - # yuanzhou: When you iterate over the target uuids make individual PUT /entities/ calls. - # The main reason we use the PUT call rather than direct neo4j query is because the entity update - # needs to go through the schema trigger methods and generate corresponding values programmatically - # before sending over to neo4j. - # The PUT call returns the response immediately while the backend updating may be still going on. - res = s.put( - f"{entity_api_url}/entities/{uuid}", json=payload, timeout=15 - ) - results[uuid] = { - "success": res.ok, - "data": res.json() if res.ok else res.json().get("error"), - } - - logger.info(f"Successfully made {idx + 1} internal entity-api call to update {uuid}") - except requests.exceptions.RequestException as e: - logger.error(f"Failed to update entity {uuid}: {e}") - results[uuid] = {"success": False, "data": str(e)} - - if after_each_callback: - after_each_callback(idx) - - if idx < len(entity_updates) - 1: - time.sleep(throttle) - - logger.info("Returning bulk_update_entities() resulting data") - logger.info(results) - - return results - - -# For this call to work READ_ONLY_MODE = False in the app.cfg file. -def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: str) -> None: - update_payload = {ds.pop("uuid"): ds for ds in entity_updates} - - # send the dataset/upload updates to entity-api - update_res = bulk_update_entities(update_payload, token, entity_api_url) - - for uuid, res in update_res.items(): - if not res["success"]: - logger.error(f"Failed to update entity {uuid}: {res['data']}") - - -ENTITY_BULK_UPDATE_FIELDS_ACCEPTED = ['uuid', 'status', 'ingest_task', 'assigned_to_group_name'] - """ New endpoints (PUT /datasets and PUT /uploads) to handle the bulk updating of entities see Issue: #698 @@ -4221,6 +4121,8 @@ def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: st @app.route('/datasets', methods=['PUT']) @app.route('/uploads', methods=['PUT']) def entity_bulk_update(): + ENTITY_BULK_UPDATE_FIELDS_ACCEPTED = ['uuid', 'status', 'ingest_task', 'assigned_to_group_name'] + entity_type: str = 'dataset' if request.path == "/uploads": entity_type = "upload" @@ -4265,13 +4167,11 @@ def entity_bulk_update(): if len(diff) > 0: bad_request_error(f"No {entity_type} found with the following uuids: {', '.join(diff)}") - entity_api_url = app.config["ENTITY_API_URL"].rstrip('/') - - logger.info(f"About to bulk update {len(entities)} {entity_type} in a separate thread...") + logger.info(f"Bulk update {len(entities)} {entity_type} in a separate thread...") thread_instance =\ threading.Thread(target=update_datasets_uploads, - args=(entities, user_token, entity_api_url)) + args=(entities, user_token, app.config["ENTITY_API_URL"])) thread_instance.start() return jsonify(list(uuids)), 202 @@ -5435,6 +5335,113 @@ def validate_organ_code(organ_code): internal_server_error(msg) +""" +Bulk update the entities in the entity-api. + +This function supports request throttling and retries. + +Parameters +---------- +entity_updates : dict + The dictionary of entity updates. The key is the uuid and the value is the + update dictionary. +token : str + The groups token for the request. +entity_api_url : str + The url of the entity-api. +total_tries : int, optional + The number of total requests to be made for each update, by default 3. +throttle : float, optional + The time to wait between requests and retries, by default 5. +after_each_callback : Callable[[int], None], optional + A callback function to be called after each update, by default None. The index + of the update is passed as a parameter to the callback. + +Returns +------- +dict + The results of the bulk update. The key is the uuid of the entity. If + successful, the value is a dictionary with "success" as True and "data" as the + entity data. If failed, the value is a dictionary with "success" as False and + "data" as the error message. +""" +def bulk_update_entities( + entity_updates: dict, + token: str, + entity_api_url: str, + total_tries: int = 3, + throttle: float = 5, + after_each_callback: Optional[Callable[[int], None]] = None, +) -> dict: + headers = { + "Authorization": f"Bearer {token}", + SchemaConstants.HUBMAP_APP_HEADER: SchemaConstants.ENTITY_API_APP, + } + # create a session with retries + session = requests.Session() + session.headers = headers + retries = Retry( + total=total_tries, + backoff_factor=throttle, + status_forcelist=[500, 502, 503, 504], + ) + session.mount(entity_api_url, HTTPAdapter(max_retries=retries)) + + results = {} + with session as s: + for idx, (uuid, payload) in enumerate(entity_updates.items()): + try: + # https://github.com/hubmapconsortium/entity-api/issues/698#issuecomment-2260799700 + # yuanzhou: When you iterate over the target uuids make individual PUT /entities/ calls. + # The main reason we use the PUT call rather than direct neo4j query is because the entity update + # needs to go through the schema trigger methods and generate corresponding values programmatically + # before sending over to neo4j. + # The PUT call returns the response immediately while the backend updating may be still going on. + res = s.put(f"{entity_api_url}/entities/{uuid}", json=payload) + + results[uuid] = { + "success": res.ok, + "data": res.json() if res.ok else res.json().get("error"), + } + + logger.info(f"Successfully made No.{idx + 1} internal entity-api call to update {uuid}") + except requests.exceptions.RequestException as e: + logger.error(f"Failed to update entity {uuid}: {e}") + results[uuid] = {"success": False, "data": str(e)} + + if after_each_callback: + after_each_callback(idx) + + if idx < len(entity_updates) - 1: + time.sleep(throttle) + + logger.debug("Returning bulk_update_entities() resulting data") + logger.debug(results) + + return results + + +""" +Bulk update the entities called in a separate thread + +Parameters +---------- +entity_updates : dict + The dictionary of entity updates +token : str + The groups token for the request +entity_api_url : str + The url of the entity-api +""" +def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: str) -> None: + update_payload = {ds.pop("uuid"): ds for ds in entity_updates} + update_res = bulk_update_entities(update_payload, token, entity_api_url) + + for uuid, res in update_res.items(): + if not res["success"]: + logger.error(f"Failed to update entity {uuid}: {res['data']}") + + """ Retrieve the JSON containing the normalized metadata information for a given entity appropriate for the scope of metadata requested e.g. complete data for a another service, indexing data for an OpenSearch document, etc. From a5c4c8d8e6a7bec341044a8962f6cc0908d7314e Mon Sep 17 00:00:00 2001 From: yuanzhou Date: Tue, 26 Aug 2025 21:37:48 -0400 Subject: [PATCH 3/3] Log and comment tweaks --- src/app.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/app.py b/src/app.py index c681abf1..d8d0c354 100644 --- a/src/app.py +++ b/src/app.py @@ -4121,6 +4121,7 @@ def multiple_components(): @app.route('/datasets', methods=['PUT']) @app.route('/uploads', methods=['PUT']) def entity_bulk_update(): + # Only in the PUT call: `assigned_to_group_name` is allowed to use an empty string value to reser/clear existing values ENTITY_BULK_UPDATE_FIELDS_ACCEPTED = ['uuid', 'status', 'ingest_task', 'assigned_to_group_name'] entity_type: str = 'dataset' @@ -4141,8 +4142,8 @@ def entity_bulk_update(): uuids = [e.get("uuid") for e in entities] - logger.info(f"Bulk updating the following {entity_type} uuids:") - logger.info(uuids) + logger.debug(f"Bulk updating the following {entity_type} uuids:") + logger.debug(uuids) if None in uuids: bad_request_error(f"All {entity_type}s must have a 'uuid' field")