Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 104 additions & 51 deletions genomicsdb/scripts/genomicsdb_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def __str__(self):
else:
filter_str = ""
bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}"
return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}"
return f"workspace={self.workspace} vidmap={self.vidmap_file} callset={self.callset_file} attributes={self.attributes}{filter_str}{bypass_str}" # noqa


class GenomicsDBQueryConfig(NamedTuple):
Expand Down Expand Up @@ -607,6 +607,13 @@ def configure_query(config: GenomicsDBQueryConfig):
return query_config


def instantiate_genomicsdb(pb_config, msg):
logging.info("Instantiating genomicsdb to process " + msg + "...")
gdb = genomicsdb.connect_with_protobuf(pb_config)
logging.info("Instantiating genomicsdb to process " + msg + " DONE")
return gdb


def process(config):
export_config = config.export_config
query_config = config.query_config
Expand All @@ -618,52 +625,90 @@ def process(config):
logging.error(msg + f" not imported into workspace({export_config.workspace})")
return -1
global gdb
try:
if gdb:
logging.info("Found gdb to process " + msg)
# Allow one retry to account for expired access tokens for azure URLs
if export_config.workspace.startswith("az://"):
allow_retry = True
else:
allow_retry = False
while True:
try:
if gdb:
logging.info("Found gdb to process " + msg)
else:
logging.info("Starting new gdb to process " + msg)
gdb = instantiate_genomicsdb(configure_export(export_config), msg)
except NameError:
gdb = instantiate_genomicsdb(configure_export(export_config), msg)

query_protobuf = configure_query(query_config)

try:
if output_config.type == "csv":
df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True)
df.to_csv(output_config.filename, index=False)
elif output_config.type == "json":
json_output = gdb.query_variant_calls(
query_protobuf=query_protobuf, json_output=output_config.json_type
)
with open(output_config.filename, "wb") as f:
f.write(json_output)
elif output_config.type == "arrow":
nbytes = 0
writer = None
i = 0
for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True):
reader = pa.ipc.open_stream(out)
for batch in reader:
if nbytes > output_config.max_arrow_bytes:
i += 1
nbytes = 0
if writer:
writer.close()
writer = None
if not writer:
writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema)
nbytes += batch.nbytes
writer.write_batch(batch)
if writer:
writer.close()
writer = None
logging.info(f"Processed {msg}")
# exit out of the loop as the query has completed
return 0
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check for a specific exception here, or even check the exception message to only target the signature for what we think is expired tokens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time we see a GenomicsDBIteratorException. But, we cannot be really sure.

# Try handle read errors from TileDB storage for azure urls
# e.g. GenomicsDBIteratorException exception : Error while reading from TileDB array
if allow_retry and "GenomicsDB" in str(type(e)) and "TileDB" in str(e):
# Check for the possibility of an expired access token
allow_retry = False
try:
# If the check for workspace under consideration succeeds and the workspace exists as it should,
# genomicsdb instance is functional! Probably not an expired token, so re-raise outer exception
if not gdb.workspace_exists(export_config.workspace):
logging.info(f"Retrying query with a new instance of gdb for {msg}...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what we're trying to do with this check?

Copy link
Member Author

@nalinigans nalinigans Feb 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I m trying to check if the gdb is functional. If it is, it is probably not an expired token, I will document that better in the code. And maybe improve the logging message. Comments?

gdb = None
continue
except Exception as ex:
logging.info(f"Exception({ex}) encountered. Retrying query with a new instance of gdb for {msg}...")
gdb = None
continue
logging.critical(f"Unexpected exception while processing {msg} : {e}")
raise e


def check_output(output):
parent_dir = os.path.dirname(output)
if parent_dir and not os.path.isdir(parent_dir):
if os.path.isfile(os.path.dirname(output)):
raise RuntimeError(f"Cannot proceed as output's parent directory({parent_dir}) is a file")
else:
logging.error("Something wrong, gdb seems to be None")
return -1
except NameError:
logging.info("Instantiating genomicsdb to process " + msg + "...")
gdb = genomicsdb.connect_with_protobuf(configure_export(export_config))
logging.info("Instantiating genomicsdb to process " + msg + " DONE")
query_protobuf = configure_query(query_config)
try:
if output_config.type == "csv":
df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True)
df.to_csv(output_config.filename, index=False)
elif output_config.type == "json":
json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type)
with open(output_config.filename, "wb") as f:
f.write(json_output)
elif output_config.type == "arrow":
nbytes = 0
writer = None
i = 0
for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True):
reader = pa.ipc.open_stream(out)
for batch in reader:
if nbytes > output_config.max_arrow_bytes:
i += 1
nbytes = 0
if writer:
writer.close()
writer = None
if not writer:
writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema)
nbytes += batch.nbytes
writer.write_batch(batch)
if writer:
writer.close()
writer = None
except Exception as e:
logging.critical(f"Unexpected exception : {e}")
gdb = None
return -1

logging.info(f"Processed {msg}")
return 0
raise RuntimeError(
f"Cannot proceed as output's parent directory({parent_dir}) does not exist. Create dir({parent_dir}) before restarting query" # noqa
)
if output.endswith("/") or os.path.isdir(output):
return genomicsdb_common.join_paths(output, "query_output")
else:
return output


def main():
Expand All @@ -672,10 +717,8 @@ def main():
if row_tuples is not None and len(row_tuples) == 0:
return

print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})")

output_type = args.output_type
output = args.output
output = check_output(args.output)
json_type = None
if output_type == "json":
json_type = parse_args_for_json_type(args.json_output_type)
Expand All @@ -686,6 +729,8 @@ def main():
max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size)
print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files")

print(f"Starting genomicsdb_query for workspace({workspace}) and intervals({intervals})")

export_config = GenomicsDBExportConfig(
workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase
)
Expand Down Expand Up @@ -749,10 +794,18 @@ def main():
sys.exit(0)

if min(len(configs), args.nproc) == 1:
results = list(map(process, configs))
try:
results = list(map(process, configs))
except Exception as e:
raise RuntimeError(f"genomicsdb_query returned unexpectedly: {e}")
else:
with multiprocessing.Pool(processes=min(len(configs), args.nproc)) as pool:
results = list(pool.map(process, configs))
try:
results = list(pool.map(process, configs))
except Exception as e:
pool.terminate()
pool.join()
raise RuntimeError(f"Terminating as a query in the multiprocessing pool returned unexpectedly: {e}")

msg = "successfully"
for result in results:
Expand Down
10 changes: 9 additions & 1 deletion test/scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,15 @@ run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -S $TEMP_D
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -f $FILTER -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a $FIELDS -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/output" 1
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o NON_EXISTENT_DIR/" 1
touch $TEMP_DIR/just_a_file
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/just_a_file/out" 1
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/" 1
mkdir -p $TEMP_DIR/output_dir
genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir >& /dev/null || (echo "query output to $TEMP_DIR/output_dir not successful"; exit 1)
genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -o $TEMP_DIR/output_dir/ >& /dev/null || (echo "query output to $TEMP_DIR/output_dir/ not successful"; exit 1)

rm -f loader.json callset.json vidmap.json
run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS"
Expand Down