diff --git a/genomicsdb/scripts/genomicsdb_query.py b/genomicsdb/scripts/genomicsdb_query.py index e12988d..788d543 100644 --- a/genomicsdb/scripts/genomicsdb_query.py +++ b/genomicsdb/scripts/genomicsdb_query.py @@ -540,7 +540,7 @@ def __str__(self): else: filter_str = "" bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}" - return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}" + return f"workspace={self.workspace} vidmap={self.vidmap_file} callset={self.callset_file} attributes={self.attributes}{filter_str}{bypass_str}" # noqa class GenomicsDBQueryConfig(NamedTuple): @@ -607,6 +607,13 @@ def configure_query(config: GenomicsDBQueryConfig): return query_config +def instantiate_genomicsdb(pb_config, msg): + logging.info("Instantiating genomicsdb to process " + msg + "...") + gdb = genomicsdb.connect_with_protobuf(pb_config) + logging.info("Instantiating genomicsdb to process " + msg + " DONE") + return gdb + + def process(config): export_config = config.export_config query_config = config.query_config @@ -618,52 +625,90 @@ def process(config): logging.error(msg + f" not imported into workspace({export_config.workspace})") return -1 global gdb - try: - if gdb: - logging.info("Found gdb to process " + msg) + # Allow one retry to account for expired access tokens for azure URLs + if export_config.workspace.startswith("az://"): + allow_retry = True + else: + allow_retry = False + while True: + try: + if gdb: + logging.info("Found gdb to process " + msg) + else: + logging.info("Starting new gdb to process " + msg) + gdb = instantiate_genomicsdb(configure_export(export_config), msg) + except NameError: + gdb = instantiate_genomicsdb(configure_export(export_config), msg) + + query_protobuf = configure_query(query_config) + + try: + if output_config.type == "csv": + df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) + df.to_csv(output_config.filename, index=False) + elif output_config.type == "json": + json_output = gdb.query_variant_calls( + query_protobuf=query_protobuf, json_output=output_config.json_type + ) + with open(output_config.filename, "wb") as f: + f.write(json_output) + elif output_config.type == "arrow": + nbytes = 0 + writer = None + i = 0 + for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): + reader = pa.ipc.open_stream(out) + for batch in reader: + if nbytes > output_config.max_arrow_bytes: + i += 1 + nbytes = 0 + if writer: + writer.close() + writer = None + if not writer: + writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) + nbytes += batch.nbytes + writer.write_batch(batch) + if writer: + writer.close() + writer = None + logging.info(f"Processed {msg}") + # exit out of the loop as the query has completed + return 0 + except Exception as e: + # Try handle read errors from TileDB storage for azure urls + # e.g. GenomicsDBIteratorException exception : Error while reading from TileDB array + if allow_retry and "GenomicsDB" in str(type(e)) and "TileDB" in str(e): + # Check for the possibility of an expired access token + allow_retry = False + try: + # If the check for workspace under consideration succeeds and the workspace exists as it should, + # genomicsdb instance is functional! Probably not an expired token, so re-raise outer exception + if not gdb.workspace_exists(export_config.workspace): + logging.info(f"Retrying query with a new instance of gdb for {msg}...") + gdb = None + continue + except Exception as ex: + logging.info(f"Exception({ex}) encountered. Retrying query with a new instance of gdb for {msg}...") + gdb = None + continue + logging.critical(f"Unexpected exception while processing {msg} : {e}") + raise e + + +def check_output(output): + parent_dir = os.path.dirname(output) + if parent_dir and not os.path.isdir(parent_dir): + if os.path.isfile(os.path.dirname(output)): + raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) is a file") else: - logging.error("Something wrong, gdb seems to be None") - return -1 - except NameError: - logging.info("Instantiating genomicsdb to process " + msg + "...") - gdb = genomicsdb.connect_with_protobuf(configure_export(export_config)) - logging.info("Instantiating genomicsdb to process " + msg + " DONE") - query_protobuf = configure_query(query_config) - try: - if output_config.type == "csv": - df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) - df.to_csv(output_config.filename, index=False) - elif output_config.type == "json": - json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) - with open(output_config.filename, "wb") as f: - f.write(json_output) - elif output_config.type == "arrow": - nbytes = 0 - writer = None - i = 0 - for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): - reader = pa.ipc.open_stream(out) - for batch in reader: - if nbytes > output_config.max_arrow_bytes: - i += 1 - nbytes = 0 - if writer: - writer.close() - writer = None - if not writer: - writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) - nbytes += batch.nbytes - writer.write_batch(batch) - if writer: - writer.close() - writer = None - except Exception as e: - logging.critical(f"Unexpected exception : {e}") - gdb = None - return -1 - - logging.info(f"Processed {msg}") - return 0 + raise RuntimeError( + f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query" # noqa + ) + if output.endswith("/") or os.path.isdir(output): + return genomicsdb_common.join_paths(output, "query_output") + else: + return output def main(): @@ -672,10 +717,8 @@ def main(): if row_tuples is not None and len(row_tuples) == 0: return - print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") - output_type = args.output_type - output = args.output + output = check_output(args.output) json_type = None if output_type == "json": json_type = parse_args_for_json_type(args.json_output_type) @@ -686,6 +729,8 @@ def main(): max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") + print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") + export_config = GenomicsDBExportConfig( workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase ) @@ -749,10 +794,18 @@ def main(): sys.exit(0) if min(len(configs), args.nproc) == 1: - results = list(map(process, configs)) + try: + results = list(map(process, configs)) + except Exception as e: + raise RuntimeError(f"genomicsdb_query returned unexpectedly: {e}") else: with multiprocessing.Pool(processes=min(len(configs), args.nproc)) as pool: - results = list(pool.map(process, configs)) + try: + results = list(pool.map(process, configs)) + except Exception as e: + pool.terminate() + pool.join() + raise RuntimeError(f"Terminating as a query in the multiprocessing pool returned unexpectedly: {e}") msg = "successfully" for result in results: diff --git a/test/scripts/test.sh b/test/scripts/test.sh index f7fed15..dcd461c 100755 --- a/test/scripts/test.sh +++ b/test/scripts/test.sh @@ -183,7 +183,15 @@ run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -S $TEMP_D run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -f $FILTER -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a $FIELDS -o $OUTPUT" -run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/output" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/" 1 +touch $TEMP_DIR/just_a_file +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/just_a_file/out" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" 1 +mkdir -p $TEMP_DIR/output_dir +genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir >& /dev/null || (echo "query output to $TEMP_DIR/output_dir not successful"; exit 1) +genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/ >& /dev/null || (echo "query output to $TEMP_DIR/output_dir/ not successful"; exit 1) rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS"