Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 95 additions & 4 deletions examples/genomicsdb_query
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ def parse_callset_json_for_row_ranges(callset_file, samples=None):
return row_tuples


def parse_callset_json_for_split_row_ranges(callset_file, chunk_size):
callset = json.loads(genomicsdb.read_entire_file(callset_file))
callsets = callset["callsets"]
chunks = int(len(callsets) / chunk_size + 1)
last_chunk_size = len(callsets) - (chunks - 1) * chunk_size
# Collapse small last chunk into the last but one chunk
if last_chunk_size < chunk_size / 2:
chunks -= 1
last_chunk_size += chunk_size
if chunks == 1:
return None
split_row_ranges = []
for i in range(0, chunks):
if i == chunks - 1:
split_row_ranges.append((chunk_size * i, chunk_size * i + last_chunk_size - 1))
else:
split_row_ranges.append((chunk_size * i, chunk_size * (i + 1) - 1))
return split_row_ranges


def parse_vidmap_json_for_attributes(vidmap_file, attributes=None):
if attributes is None:
return ["GT"]
Expand Down Expand Up @@ -229,6 +249,11 @@ def setup():
default=8,
help="Optional - number of processing units for multiprocessing(default: %(default)s). Run nproc from command line to print the number of processing units available to a process for the user", # noqa
)
parser.add_argument(
"--chunk-size",
default=10240,
help="Optional - hint to split number of samples for multiprocessing used in conjunction with -n/--nproc and when -s/-S/--sample/--sample-list is not specified (default: %(default)s)", # noqa
)
parser.add_argument(
"-t",
"--output-type",
Expand All @@ -253,7 +278,19 @@ def setup():
"-o",
"--output",
default="query_output",
help="a prefix filename to csv outputs from the tool. The filenames will be suffixed with the interval and .csv/.json (default: %(default)s)", # noqa
help="a prefix filename to outputs from the tool. The filenames will be suffixed with the interval and .csv/.json/... (default: %(default)s)", # noqa
)
parser.add_argument(
"-d",
"--dryrun",
action="store_true",
help="displays the query that will be run without actually executing the query (default: %(default)s)", # noqa
)
parser.add_argument(
"-b",
"--bypass-intersecting-intervals-phase",
action="store_true",
help="iterate only once bypassing the intersecting intervals phase (default: %(default)s)", # noqa
)

args = parser.parse_args()
Expand Down Expand Up @@ -369,6 +406,15 @@ class GenomicsDBExportConfig(NamedTuple):
callset_file: str
attributes: str
filter: str
bypass_intersecting_intervals_phase: bool

def __str__(self):
if self.filter:
filter_str = f" filter={self.filter}"
else:
filter_str = ""
bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}"
return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}"


class GenomicsDBQueryConfig(NamedTuple):
Expand All @@ -379,6 +425,13 @@ class GenomicsDBQueryConfig(NamedTuple):
array_name: str
row_tuples: List[tuple]

def __str__(self):
if self.row_tuples:
row_tuples_str = f"{self.row_tuples}"
else:
row_tuples_str = "ALL"
return f"\tinterval={self.interval} array={self.array_name} callset rows={row_tuples_str}"


class OutputConfig(NamedTuple):
filename: str
Expand All @@ -398,7 +451,7 @@ def configure_export(config: GenomicsDBExportConfig):
export_config.workspace = config.workspace
export_config.vid_mapping_file = config.vidmap_file
export_config.callset_mapping_file = config.callset_file
export_config.bypass_intersecting_intervals_phase = False
export_config.bypass_intersecting_intervals_phase = config.bypass_intersecting_intervals_phase
export_config.enable_shared_posixfs_optimizations = True
if config.attributes:
export_config.attributes.extend(config.attributes)
Expand Down Expand Up @@ -505,15 +558,17 @@ def main():
max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size)
print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files")

export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file, attributes, args.filter)
export_config = GenomicsDBExportConfig(
workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase
)
configs = []
for interval in intervals:
print(f"Processing interval({interval})...")

contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions)
if len(arrays) == 0:
logging.error(f"No arrays in the workspace matched input interval({interval})")
continue
# continue

print(f"\tArrays:{arrays} under consideration for interval({interval})")
for idx, array in enumerate(arrays):
Expand All @@ -529,6 +584,42 @@ def main():
if len(configs) == 0:
print("Nothing to process!!. Check output for possible errors")
sys.exit(1)

# Check if there is room for row_tuples to be parallelized
chunk_size = int(args.chunk_size)
if row_tuples is None and len(configs) < args.nproc and chunk_size > 1:
row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size)
if row_tuples:
new_configs = []
for idx_row, row_tuple in enumerate(row_tuples):
for idx, config in enumerate(configs):
query_config = config.query_config
split_query_config = GenomicsDBQueryConfig(
query_config.interval,
query_config.contig,
query_config.start,
query_config.end,
query_config.array_name,
[row_tuple],
)
output_config = config.output_config
split_output_config = OutputConfig(
generate_output_filename(
output, output_type, query_config.interval, len(configs) * idx + idx_row
),
output_type,
json_type,
max_arrow_bytes,
)
new_configs.append(Config(export_config, split_query_config, split_output_config))
configs = new_configs

if args.dryrun:
print(f"Query configurations for {export_config}:")
for config in configs:
print(config.query_config)
sys.exit(0)

if min(len(configs), args.nproc) == 1:
results = list(map(process, configs))
else:
Expand Down
6 changes: 6 additions & 0 deletions examples/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,14 @@ if [[ $PARTITION != "t0_1_2" ]]; then
exit 1
fi
run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT -d"
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -a GT -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE -i 1 -o $OUTPUT -d"
run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT -d"
run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT"
run_command "genomicsdb_query -w $WORKSPACE -i 4 --chunk-size=4 -b -o $OUTPUT -d"

OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json"
run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS"
Expand Down