Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 118 additions & 98 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
app.config['UUID_API_URL'] = app.config['UUID_API_URL'].strip('/')
app.config['INGEST_API_URL'] = app.config['INGEST_API_URL'].strip('/')
app.config['ONTOLOGY_API_URL'] = app.config['ONTOLOGY_API_URL'].strip('/')
app.config['ENTITY_API_URL'] = app.config['ENTITY_API_URL'].strip('/')
app.config['SEARCH_API_URL'] = app.config['SEARCH_API_URL'].strip('/')

S3_settings_dict = {'large_response_threshold': app.config['LARGE_RESPONSE_THRESHOLD']
Expand Down Expand Up @@ -4094,102 +4095,6 @@ def multiple_components():
return jsonify(normalized_complete_entity_list)


"""
Bulk update the entities in the entity-api.

This function supports request throttling and retries.

Parameters
----------
entity_updates : dict
The dictionary of entity updates. The key is the uuid and the value is the
update dictionary.
token : str
The groups token for the request.
entity_api_url : str
The url of the entity-api.
total_tries : int, optional
The number of total requests to be made for each update, by default 3.
throttle : float, optional
The time to wait between requests and retries, by default 5.
after_each_callback : Callable[[int], None], optional
A callback function to be called after each update, by default None. The index
of the update is passed as a parameter to the callback.

Returns
-------
dict
The results of the bulk update. The key is the uuid of the entity. If
successful, the value is a dictionary with "success" as True and "data" as the
entity data. If failed, the value is a dictionary with "success" as False and
"data" as the error message.
"""
def bulk_update_entities(
entity_updates: dict,
token: str,
entity_api_url: str,
total_tries: int = 3,
throttle: float = 5,
after_each_callback: Optional[Callable[[int], None]] = None,
) -> dict:
headers = {
"Authorization": f"Bearer {token}",
SchemaConstants.HUBMAP_APP_HEADER: SchemaConstants.ENTITY_API_APP,
}
# create a session with retries
session = requests.Session()
session.headers = headers
retries = Retry(
total=total_tries,
backoff_factor=throttle,
status_forcelist=[500, 502, 503, 504],
)
session.mount(entity_api_url, HTTPAdapter(max_retries=retries))

results = {}
with session as s:
for idx, (uuid, payload) in enumerate(entity_updates.items()):
try:
# https://github.com/hubmapconsortium/entity-api/issues/698#issuecomment-2260799700
# yuanzhou: When you iterate over the target uuids make individual PUT /entities/<uuid> calls.
# The main reason we use the PUT call rather than direct neo4j query is because the entity update
# needs to go through the schema trigger methods and generate corresponding values programmatically
# before sending over to neo4j.
# The PUT call returns the response immediately while the backend updating may be still going on.
res = s.put(
f"{entity_api_url}/entities/{uuid}", json=payload, timeout=15
)
results[uuid] = {
"success": res.ok,
"data": res.json() if res.ok else res.json().get("error"),
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update entity {uuid}: {e}")
results[uuid] = {"success": False, "data": str(e)}

if after_each_callback:
after_each_callback(idx)

if idx < len(entity_updates) - 1:
time.sleep(throttle)

return results


# For this call to work READ_ONLY_MODE = False in the app.cfg file.
def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: str) -> None:
update_payload = {ds.pop("uuid"): ds for ds in entity_updates}

# send the dataset/upload updates to entity-api
update_res = bulk_update_entities(update_payload, token, entity_api_url)

for uuid, res in update_res.items():
if not res["success"]:
logger.error(f"Failed to update entity {uuid}: {res['data']}")


ENTITY_BULK_UPDATE_FIELDS_ACCEPTED = ['uuid', 'status', 'ingest_task', 'assigned_to_group_name']


"""
New endpoints (PUT /datasets and PUT /uploads) to handle the bulk updating of entities see Issue: #698
Expand All @@ -4216,6 +4121,9 @@ def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: st
@app.route('/datasets', methods=['PUT'])
@app.route('/uploads', methods=['PUT'])
def entity_bulk_update():
# Only in the PUT call: `assigned_to_group_name` is allowed to use an empty string value to reser/clear existing values
ENTITY_BULK_UPDATE_FIELDS_ACCEPTED = ['uuid', 'status', 'ingest_task', 'assigned_to_group_name']

entity_type: str = 'dataset'
if request.path == "/uploads":
entity_type = "upload"
Expand All @@ -4233,6 +4141,10 @@ def entity_bulk_update():
validate_user_update_privilege(entity, user_token)

uuids = [e.get("uuid") for e in entities]

logger.debug(f"Bulk updating the following {entity_type} uuids:")
logger.debug(uuids)

if None in uuids:
bad_request_error(f"All {entity_type}s must have a 'uuid' field")
if len(set(uuids)) != len(uuids):
Expand All @@ -4256,10 +4168,11 @@ def entity_bulk_update():
if len(diff) > 0:
bad_request_error(f"No {entity_type} found with the following uuids: {', '.join(diff)}")

entity_api_url = app.config["ENTITY_API_URL"].rstrip('/')
logger.info(f"Bulk update {len(entities)} {entity_type} in a separate thread...")

thread_instance =\
threading.Thread(target=update_datasets_uploads,
args=(entities, user_token, entity_api_url))
args=(entities, user_token, app.config["ENTITY_API_URL"]))
thread_instance.start()

return jsonify(list(uuids)), 202
Expand Down Expand Up @@ -5423,6 +5336,113 @@ def validate_organ_code(organ_code):
internal_server_error(msg)


"""
Bulk update the entities in the entity-api.

This function supports request throttling and retries.

Parameters
----------
entity_updates : dict
The dictionary of entity updates. The key is the uuid and the value is the
update dictionary.
token : str
The groups token for the request.
entity_api_url : str
The url of the entity-api.
total_tries : int, optional
The number of total requests to be made for each update, by default 3.
throttle : float, optional
The time to wait between requests and retries, by default 5.
after_each_callback : Callable[[int], None], optional
A callback function to be called after each update, by default None. The index
of the update is passed as a parameter to the callback.

Returns
-------
dict
The results of the bulk update. The key is the uuid of the entity. If
successful, the value is a dictionary with "success" as True and "data" as the
entity data. If failed, the value is a dictionary with "success" as False and
"data" as the error message.
"""
def bulk_update_entities(
entity_updates: dict,
token: str,
entity_api_url: str,
total_tries: int = 3,
throttle: float = 5,
after_each_callback: Optional[Callable[[int], None]] = None,
) -> dict:
headers = {
"Authorization": f"Bearer {token}",
SchemaConstants.HUBMAP_APP_HEADER: SchemaConstants.ENTITY_API_APP,
}
# create a session with retries
session = requests.Session()
session.headers = headers
retries = Retry(
total=total_tries,
backoff_factor=throttle,
status_forcelist=[500, 502, 503, 504],
)
session.mount(entity_api_url, HTTPAdapter(max_retries=retries))

results = {}
with session as s:
for idx, (uuid, payload) in enumerate(entity_updates.items()):
try:
# https://github.com/hubmapconsortium/entity-api/issues/698#issuecomment-2260799700
# yuanzhou: When you iterate over the target uuids make individual PUT /entities/<uuid> calls.
# The main reason we use the PUT call rather than direct neo4j query is because the entity update
# needs to go through the schema trigger methods and generate corresponding values programmatically
# before sending over to neo4j.
# The PUT call returns the response immediately while the backend updating may be still going on.
res = s.put(f"{entity_api_url}/entities/{uuid}", json=payload)

results[uuid] = {
"success": res.ok,
"data": res.json() if res.ok else res.json().get("error"),
}

logger.info(f"Successfully made No.{idx + 1} internal entity-api call to update {uuid}")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update entity {uuid}: {e}")
results[uuid] = {"success": False, "data": str(e)}

if after_each_callback:
after_each_callback(idx)

if idx < len(entity_updates) - 1:
time.sleep(throttle)

logger.debug("Returning bulk_update_entities() resulting data")
logger.debug(results)

return results


"""
Bulk update the entities called in a separate thread

Parameters
----------
entity_updates : dict
The dictionary of entity updates
token : str
The groups token for the request
entity_api_url : str
The url of the entity-api
"""
def update_datasets_uploads(entity_updates: list, token: str, entity_api_url: str) -> None:
update_payload = {ds.pop("uuid"): ds for ds in entity_updates}
update_res = bulk_update_entities(update_payload, token, entity_api_url)

for uuid, res in update_res.items():
if not res["success"]:
logger.error(f"Failed to update entity {uuid}: {res['data']}")


"""
Retrieve the JSON containing the normalized metadata information for a given entity appropriate for the
scope of metadata requested e.g. complete data for a another service, indexing data for an OpenSearch document, etc.
Expand Down