From a4596a0d2bdd3038cef78c1fd7686980355de9ae Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Thu, 13 Feb 2025 11:23:35 -0800 Subject: [PATCH 1/7] Additional checks for genomicsdb_query when output' parent directory does not exist --- genomicsdb/scripts/genomicsdb_query.py | 14 +++++++++++--- test/scripts/test.sh | 3 ++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/genomicsdb/scripts/genomicsdb_query.py b/genomicsdb/scripts/genomicsdb_query.py index e12988d..8af0f96 100644 --- a/genomicsdb/scripts/genomicsdb_query.py +++ b/genomicsdb/scripts/genomicsdb_query.py @@ -665,6 +665,14 @@ def process(config): logging.info(f"Processed {msg}") return 0 +def check_output(output): + parent_dir = os.path.dirname(output) + if not os.path.isdir(parent_dir): + if os.path.isfile(os.path.dirname(output)): + raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) is a file") + else: + raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query") + return output def main(): workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, attributes, args = setup() @@ -672,10 +680,8 @@ def main(): if row_tuples is not None and len(row_tuples) == 0: return - print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") - output_type = args.output_type - output = args.output + output = check_output(args.output) json_type = None if output_type == "json": json_type = parse_args_for_json_type(args.json_output_type) @@ -686,6 +692,8 @@ def main(): max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") + print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})") + export_config = GenomicsDBExportConfig( workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase ) diff --git a/test/scripts/test.sh b/test/scripts/test.sh index f7fed15..aee5cd7 100755 --- a/test/scripts/test.sh +++ b/test/scripts/test.sh @@ -183,7 +183,8 @@ run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -S $TEMP_D run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -f $FILTER -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a $FIELDS -o $OUTPUT" -run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/output" 1 rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS" From 9e2756525131695cd287845e3aaec4c724f7acf0 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Thu, 13 Feb 2025 11:40:53 -0800 Subject: [PATCH 2/7] Add to tests --- test/scripts/test.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/scripts/test.sh b/test/scripts/test.sh index aee5cd7..1e9c214 100755 --- a/test/scripts/test.sh +++ b/test/scripts/test.sh @@ -185,6 +185,8 @@ run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples. run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a $FIELDS -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/output" 1 +touch $TEMP_DIR/just_a_file +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/just_a_file/out" 1 rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS" From 14c6fb44d969a5f5f979b41c2e3ddee1fc0f5626 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Thu, 13 Feb 2025 12:00:12 -0800 Subject: [PATCH 3/7] Lint --- genomicsdb/scripts/genomicsdb_query.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/genomicsdb/scripts/genomicsdb_query.py b/genomicsdb/scripts/genomicsdb_query.py index 8af0f96..6875557 100644 --- a/genomicsdb/scripts/genomicsdb_query.py +++ b/genomicsdb/scripts/genomicsdb_query.py @@ -665,15 +665,19 @@ def process(config): logging.info(f"Processed {msg}") return 0 + def check_output(output): parent_dir = os.path.dirname(output) if not os.path.isdir(parent_dir): if os.path.isfile(os.path.dirname(output)): raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) is a file") else: - raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query") + raise RuntimeError( + f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query" # noqa + ) return output + def main(): workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, attributes, args = setup() From 22771a8d684a5ac7b5ba70e683b9df063fb2ed85 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Sat, 15 Feb 2025 11:03:10 -0800 Subject: [PATCH 4/7] Retry once with a new genomicsdb instance when azure access tokens expire --- genomicsdb/scripts/genomicsdb_query.py | 121 +++++++++++++++---------- test/scripts/test.sh | 8 ++ 2 files changed, 81 insertions(+), 48 deletions(-) diff --git a/genomicsdb/scripts/genomicsdb_query.py b/genomicsdb/scripts/genomicsdb_query.py index 6875557..41edd36 100644 --- a/genomicsdb/scripts/genomicsdb_query.py +++ b/genomicsdb/scripts/genomicsdb_query.py @@ -540,7 +540,7 @@ def __str__(self): else: filter_str = "" bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}" - return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}" + return f"workspace={self.workspace} vidmap={self.vidmap_file} callset={self.callset_file} attributes={self.attributes}{filter_str}{bypass_str}" class GenomicsDBQueryConfig(NamedTuple): @@ -606,6 +606,12 @@ def configure_query(config: GenomicsDBQueryConfig): query_config.query_row_ranges.extend([row_range_list]) return query_config +def instantiate_genomicsdb(pb_config, msg): + logging.info("Instantiating genomicsdb to process " + msg + "...") + gdb = genomicsdb.connect_with_protobuf(pb_config) + logging.info("Instantiating genomicsdb to process " + msg + " DONE") + return gdb + def process(config): export_config = config.export_config @@ -618,64 +624,83 @@ def process(config): logging.error(msg + f" not imported into workspace({export_config.workspace})") return -1 global gdb - try: - if gdb: - logging.info("Found gdb to process " + msg) - else: - logging.error("Something wrong, gdb seems to be None") + # Allow one retry to account for expired access tokens for azure URLs + if export_config.workspace.startswith("az://"): + allow_retry = True + else: + allow_retry = False + while True: + try: + if gdb: + logging.info("Found gdb to process " + msg) + else: + logging.info("Starting new gdb to process " + msg) + gdb = instantiate_genomicsdb(configure_export(export_config), msg); + except NameError: + gdb = instantiate_genomicsdb(configure_export(export_config), msg) + + query_protobuf = configure_query(query_config) + + try: + if output_config.type == "csv": + df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) + df.to_csv(output_config.filename, index=False) + elif output_config.type == "json": + json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) + with open(output_config.filename, "wb") as f: + f.write(json_output) + elif output_config.type == "arrow": + nbytes = 0 + writer = None + i = 0 + for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): + reader = pa.ipc.open_stream(out) + for batch in reader: + if nbytes > output_config.max_arrow_bytes: + i += 1 + nbytes = 0 + if writer: + writer.close() + writer = None + if not writer: + writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) + nbytes += batch.nbytes + writer.write_batch(batch) + if writer: + writer.close() + writer = None + logging.info(f"Processed {msg}") + return 0 + except Exception as e: + if allow_retry: + # Check for the possibility of an expired access token + allow_retry = False + try: + if not gdb.workspace_exists(export_config.workspace): + logging.info(f"Retrying query with a new instance of gdb for {msg}...") + gdb = None + continue + except Exception as ex: + logging.info(f"Exception({ex}) encountered. Retrying query with a new instance of gdb for {msg}...") + gdb = None + continue + logging.critical(f"Unexpected exception : {e}") return -1 - except NameError: - logging.info("Instantiating genomicsdb to process " + msg + "...") - gdb = genomicsdb.connect_with_protobuf(configure_export(export_config)) - logging.info("Instantiating genomicsdb to process " + msg + " DONE") - query_protobuf = configure_query(query_config) - try: - if output_config.type == "csv": - df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) - df.to_csv(output_config.filename, index=False) - elif output_config.type == "json": - json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) - with open(output_config.filename, "wb") as f: - f.write(json_output) - elif output_config.type == "arrow": - nbytes = 0 - writer = None - i = 0 - for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): - reader = pa.ipc.open_stream(out) - for batch in reader: - if nbytes > output_config.max_arrow_bytes: - i += 1 - nbytes = 0 - if writer: - writer.close() - writer = None - if not writer: - writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) - nbytes += batch.nbytes - writer.write_batch(batch) - if writer: - writer.close() - writer = None - except Exception as e: - logging.critical(f"Unexpected exception : {e}") - gdb = None - return -1 - - logging.info(f"Processed {msg}") - return 0 def check_output(output): parent_dir = os.path.dirname(output) - if not os.path.isdir(parent_dir): + if parent_dir and not os.path.isdir(parent_dir): if os.path.isfile(os.path.dirname(output)): raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) is a file") else: raise RuntimeError( f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query" # noqa ) - return output + if output.endswith("/") or os.path.isdir(output): + return genomicsdb_common.join_paths(output, "query_output") + else: + return output def main(): diff --git a/test/scripts/test.sh b/test/scripts/test.sh index 1e9c214..2d8c3d9 100755 --- a/test/scripts/test.sh +++ b/test/scripts/test.sh @@ -185,8 +185,16 @@ run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples. run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a $FIELDS -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/output" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/" 1 touch $TEMP_DIR/just_a_file run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/just_a_file/out" 1 +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" 1 +#mkdir $TEMP_DIR/output_dir +#run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir" +#run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" +#pushd $TEMP_DIR +#run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS" +#popd rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS" From f6076a1a7c3c4aab01c26d5b40549dddbddfbe91 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Sat, 15 Feb 2025 11:09:08 -0800 Subject: [PATCH 5/7] Lint --- genomicsdb/scripts/genomicsdb_query.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/genomicsdb/scripts/genomicsdb_query.py b/genomicsdb/scripts/genomicsdb_query.py index 41edd36..13e842c 100644 --- a/genomicsdb/scripts/genomicsdb_query.py +++ b/genomicsdb/scripts/genomicsdb_query.py @@ -540,7 +540,7 @@ def __str__(self): else: filter_str = "" bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}" - return f"workspace={self.workspace} vidmap={self.vidmap_file} callset={self.callset_file} attributes={self.attributes}{filter_str}{bypass_str}" + return f"workspace={self.workspace} vidmap={self.vidmap_file} callset={self.callset_file} attributes={self.attributes}{filter_str}{bypass_str}" # noqa class GenomicsDBQueryConfig(NamedTuple): @@ -606,6 +606,7 @@ def configure_query(config: GenomicsDBQueryConfig): query_config.query_row_ranges.extend([row_range_list]) return query_config + def instantiate_genomicsdb(pb_config, msg): logging.info("Instantiating genomicsdb to process " + msg + "...") gdb = genomicsdb.connect_with_protobuf(pb_config) @@ -635,7 +636,7 @@ def process(config): logging.info("Found gdb to process " + msg) else: logging.info("Starting new gdb to process " + msg) - gdb = instantiate_genomicsdb(configure_export(export_config), msg); + gdb = instantiate_genomicsdb(configure_export(export_config), msg) except NameError: gdb = instantiate_genomicsdb(configure_export(export_config), msg) @@ -646,7 +647,9 @@ def process(config): df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) df.to_csv(output_config.filename, index=False) elif output_config.type == "json": - json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) + json_output = gdb.query_variant_calls( + query_protobuf=query_protobuf, json_output=output_config.json_type + ) with open(output_config.filename, "wb") as f: f.write(json_output) elif output_config.type == "arrow": From 40ce3405b8d1f79ed2bf6ba1f0e2eb4346797e4b Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Sat, 15 Feb 2025 13:08:34 -0800 Subject: [PATCH 6/7] Add tests --- test/scripts/test.sh | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/scripts/test.sh b/test/scripts/test.sh index 2d8c3d9..6293b87 100755 --- a/test/scripts/test.sh +++ b/test/scripts/test.sh @@ -189,12 +189,9 @@ run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/" touch $TEMP_DIR/just_a_file run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/just_a_file/out" 1 run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" 1 -#mkdir $TEMP_DIR/output_dir -#run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir" -#run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" -#pushd $TEMP_DIR -#run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS" -#popd +mkdir -p $TEMP_DIR/output_dir +genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir || exit 1 +genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/ || exit 1 rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS" From 8f20e32f2277c6f5c53fe24f7aede8f589c46354 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Sun, 16 Feb 2025 13:58:55 -0800 Subject: [PATCH 7/7] Address PR comments somewhat --- genomicsdb/scripts/genomicsdb_query.py | 23 ++++++++++++++++++----- test/scripts/test.sh | 4 ++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/genomicsdb/scripts/genomicsdb_query.py b/genomicsdb/scripts/genomicsdb_query.py index 13e842c..788d543 100644 --- a/genomicsdb/scripts/genomicsdb_query.py +++ b/genomicsdb/scripts/genomicsdb_query.py @@ -673,12 +673,17 @@ def process(config): writer.close() writer = None logging.info(f"Processed {msg}") + # exit out of the loop as the query has completed return 0 except Exception as e: - if allow_retry: + # Try handle read errors from TileDB storage for azure urls + # e.g. GenomicsDBIteratorException exception : Error while reading from TileDB array + if allow_retry and "GenomicsDB" in str(type(e)) and "TileDB" in str(e): # Check for the possibility of an expired access token allow_retry = False try: + # If the check for workspace under consideration succeeds and the workspace exists as it should, + # genomicsdb instance is functional! Probably not an expired token, so re-raise outer exception if not gdb.workspace_exists(export_config.workspace): logging.info(f"Retrying query with a new instance of gdb for {msg}...") gdb = None @@ -687,8 +692,8 @@ def process(config): logging.info(f"Exception({ex}) encountered. Retrying query with a new instance of gdb for {msg}...") gdb = None continue - logging.critical(f"Unexpected exception : {e}") - return -1 + logging.critical(f"Unexpected exception while processing {msg} : {e}") + raise e def check_output(output): @@ -789,10 +794,18 @@ def main(): sys.exit(0) if min(len(configs), args.nproc) == 1: - results = list(map(process, configs)) + try: + results = list(map(process, configs)) + except Exception as e: + raise RuntimeError(f"genomicsdb_query returned unexpectedly: {e}") else: with multiprocessing.Pool(processes=min(len(configs), args.nproc)) as pool: - results = list(pool.map(process, configs)) + try: + results = list(pool.map(process, configs)) + except Exception as e: + pool.terminate() + pool.join() + raise RuntimeError(f"Terminating as a query in the multiprocessing pool returned unexpectedly: {e}") msg = "successfully" for result in results: diff --git a/test/scripts/test.sh b/test/scripts/test.sh index 6293b87..dcd461c 100755 --- a/test/scripts/test.sh +++ b/test/scripts/test.sh @@ -190,8 +190,8 @@ touch $TEMP_DIR/just_a_file run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/just_a_file/out" 1 run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" 1 mkdir -p $TEMP_DIR/output_dir -genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir || exit 1 -genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/ || exit 1 +genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir >& /dev/null || (echo "query output to $TEMP_DIR/output_dir not successful"; exit 1) +genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/ >& /dev/null || (echo "query output to $TEMP_DIR/output_dir/ not successful"; exit 1) rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS"