-
Notifications
You must be signed in to change notification settings - Fork 0
Checks genomicsdb_query output directory and retry once with a new genomicsdb instance when azure access tokens expire #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a4596a0
9e27565
14c6fb4
22771a8
f6076a1
40ce340
8f20e32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -540,7 +540,7 @@ def __str__(self): | |
| else: | ||
| filter_str = "" | ||
| bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}" | ||
| return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}" | ||
| return f"workspace={self.workspace} vidmap={self.vidmap_file} callset={self.callset_file} attributes={self.attributes}{filter_str}{bypass_str}" # noqa | ||
|
|
||
|
|
||
| class GenomicsDBQueryConfig(NamedTuple): | ||
|
|
@@ -607,6 +607,13 @@ def configure_query(config: GenomicsDBQueryConfig): | |
| return query_config | ||
|
|
||
|
|
||
| def instantiate_genomicsdb(pb_config, msg): | ||
| logging.info("Instantiating genomicsdb to process " + msg + "...") | ||
| gdb = genomicsdb.connect_with_protobuf(pb_config) | ||
| logging.info("Instantiating genomicsdb to process " + msg + " DONE") | ||
| return gdb | ||
|
|
||
|
|
||
| def process(config): | ||
| export_config = config.export_config | ||
| query_config = config.query_config | ||
|
|
@@ -618,52 +625,90 @@ def process(config): | |
| logging.error(msg + f" not imported into workspace({export_config.workspace})") | ||
| return -1 | ||
| global gdb | ||
| try: | ||
| if gdb: | ||
| logging.info("Found gdb to process " + msg) | ||
| # Allow one retry to account for expired access tokens for azure URLs | ||
| if export_config.workspace.startswith("az://"): | ||
| allow_retry = True | ||
| else: | ||
| allow_retry = False | ||
| while True: | ||
| try: | ||
| if gdb: | ||
| logging.info("Found gdb to process " + msg) | ||
| else: | ||
| logging.info("Starting new gdb to process " + msg) | ||
| gdb = instantiate_genomicsdb(configure_export(export_config), msg) | ||
| except NameError: | ||
| gdb = instantiate_genomicsdb(configure_export(export_config), msg) | ||
|
|
||
| query_protobuf = configure_query(query_config) | ||
|
|
||
| try: | ||
| if output_config.type == "csv": | ||
| df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) | ||
| df.to_csv(output_config.filename, index=False) | ||
| elif output_config.type == "json": | ||
| json_output = gdb.query_variant_calls( | ||
| query_protobuf=query_protobuf, json_output=output_config.json_type | ||
| ) | ||
| with open(output_config.filename, "wb") as f: | ||
| f.write(json_output) | ||
| elif output_config.type == "arrow": | ||
| nbytes = 0 | ||
| writer = None | ||
| i = 0 | ||
| for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): | ||
| reader = pa.ipc.open_stream(out) | ||
| for batch in reader: | ||
| if nbytes > output_config.max_arrow_bytes: | ||
| i += 1 | ||
| nbytes = 0 | ||
| if writer: | ||
| writer.close() | ||
| writer = None | ||
| if not writer: | ||
| writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) | ||
| nbytes += batch.nbytes | ||
| writer.write_batch(batch) | ||
| if writer: | ||
| writer.close() | ||
| writer = None | ||
| logging.info(f"Processed {msg}") | ||
| # exit out of the loop as the query has completed | ||
| return 0 | ||
| except Exception as e: | ||
| # Try handle read errors from TileDB storage for azure urls | ||
| # e.g. GenomicsDBIteratorException exception : Error while reading from TileDB array | ||
| if allow_retry and "GenomicsDB" in str(type(e)) and "TileDB" in str(e): | ||
| # Check for the possibility of an expired access token | ||
| allow_retry = False | ||
| try: | ||
| # If the check for workspace under consideration succeeds and the workspace exists as it should, | ||
| # genomicsdb instance is functional! Probably not an expired token, so re-raise outer exception | ||
| if not gdb.workspace_exists(export_config.workspace): | ||
| logging.info(f"Retrying query with a new instance of gdb for {msg}...") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you explain what we're trying to do with this check?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I m trying to check if the gdb is functional. If it is, it is probably not an expired token, I will document that better in the code. And maybe improve the logging message. Comments? |
||
| gdb = None | ||
| continue | ||
| except Exception as ex: | ||
| logging.info(f"Exception({ex}) encountered. Retrying query with a new instance of gdb for {msg}...") | ||
| gdb = None | ||
| continue | ||
| logging.critical(f"Unexpected exception while processing {msg} : {e}") | ||
| raise e | ||
|
|
||
|
|
||
| def check_output(output): | ||
| parent_dir = os.path.dirname(output) | ||
| if parent_dir and not os.path.isdir(parent_dir): | ||
| if os.path.isfile(os.path.dirname(output)): | ||
| raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) is a file") | ||
| else: | ||
| logging.error("Something wrong, gdb seems to be None") | ||
| return -1 | ||
| except NameError: | ||
| logging.info("Instantiating genomicsdb to process " + msg + "...") | ||
| gdb = genomicsdb.connect_with_protobuf(configure_export(export_config)) | ||
| logging.info("Instantiating genomicsdb to process " + msg + " DONE") | ||
| query_protobuf = configure_query(query_config) | ||
| try: | ||
| if output_config.type == "csv": | ||
| df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) | ||
| df.to_csv(output_config.filename, index=False) | ||
| elif output_config.type == "json": | ||
| json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) | ||
| with open(output_config.filename, "wb") as f: | ||
| f.write(json_output) | ||
| elif output_config.type == "arrow": | ||
| nbytes = 0 | ||
| writer = None | ||
| i = 0 | ||
| for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): | ||
| reader = pa.ipc.open_stream(out) | ||
| for batch in reader: | ||
| if nbytes > output_config.max_arrow_bytes: | ||
| i += 1 | ||
| nbytes = 0 | ||
| if writer: | ||
| writer.close() | ||
| writer = None | ||
| if not writer: | ||
| writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) | ||
| nbytes += batch.nbytes | ||
| writer.write_batch(batch) | ||
| if writer: | ||
| writer.close() | ||
| writer = None | ||
| except Exception as e: | ||
| logging.critical(f"Unexpected exception : {e}") | ||
| gdb = None | ||
| return -1 | ||
|
|
||
| logging.info(f"Processed {msg}") | ||
| return 0 | ||
| raise RuntimeError( | ||
| f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query" # noqa | ||
| ) | ||
| if output.endswith("/") or os.path.isdir(output): | ||
| return genomicsdb_common.join_paths(output, "query_output") | ||
| else: | ||
| return output | ||
|
|
||
|
|
||
| def main(): | ||
|
|
@@ -672,10 +717,8 @@ def main(): | |
| if row_tuples is not None and len(row_tuples) == 0: | ||
| return | ||
|
|
||
| print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") | ||
|
|
||
| output_type = args.output_type | ||
| output = args.output | ||
| output = check_output(args.output) | ||
| json_type = None | ||
| if output_type == "json": | ||
| json_type = parse_args_for_json_type(args.json_output_type) | ||
|
|
@@ -686,6 +729,8 @@ def main(): | |
| max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) | ||
| print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") | ||
|
|
||
| print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") | ||
|
|
||
| export_config = GenomicsDBExportConfig( | ||
| workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase | ||
| ) | ||
|
|
@@ -749,10 +794,18 @@ def main(): | |
| sys.exit(0) | ||
|
|
||
| if min(len(configs), args.nproc) == 1: | ||
| results = list(map(process, configs)) | ||
| try: | ||
| results = list(map(process, configs)) | ||
| except Exception as e: | ||
| raise RuntimeError(f"genomicsdb_query returned unexpectedly: {e}") | ||
| else: | ||
| with multiprocessing.Pool(processes=min(len(configs), args.nproc)) as pool: | ||
| results = list(pool.map(process, configs)) | ||
| try: | ||
| results = list(pool.map(process, configs)) | ||
| except Exception as e: | ||
| pool.terminate() | ||
| pool.join() | ||
| raise RuntimeError(f"Terminating as a query in the multiprocessing pool returned unexpectedly: {e}") | ||
|
|
||
| msg = "successfully" | ||
| for result in results: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check for a specific exception here, or even check the exception message to only target the signature for what we think is expired tokens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time we see a GenomicsDBIteratorException. But, we cannot be really sure.