diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index f952d55..04724d8 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -96,6 +96,26 @@ def parse_callset_json_for_row_ranges(callset_file, samples=None): return row_tuples +def parse_callset_json_for_split_row_ranges(callset_file, chunk_size): + callset = json.loads(genomicsdb.read_entire_file(callset_file)) + callsets = callset["callsets"] + chunks = int(len(callsets) / chunk_size + 1) + last_chunk_size = len(callsets) - (chunks - 1) * chunk_size + # Collapse small last chunk into the last but one chunk + if last_chunk_size < chunk_size / 2: + chunks -= 1 + last_chunk_size += chunk_size + if chunks == 1: + return None + split_row_ranges = [] + for i in range(0, chunks): + if i == chunks - 1: + split_row_ranges.append((chunk_size * i, chunk_size * i + last_chunk_size - 1)) + else: + split_row_ranges.append((chunk_size * i, chunk_size * (i + 1) - 1)) + return split_row_ranges + + def parse_vidmap_json_for_attributes(vidmap_file, attributes=None): if attributes is None: return ["GT"] @@ -229,6 +249,11 @@ def setup(): default=8, help="Optional - number of processing units for multiprocessing(default: %(default)s). Run nproc from command line to print the number of processing units available to a process for the user", # noqa ) + parser.add_argument( + "--chunk-size", + default=10240, + help="Optional - hint to split number of samples for multiprocessing used in conjunction with -n/--nproc and when -s/-S/--sample/--sample-list is not specified (default: %(default)s)", # noqa + ) parser.add_argument( "-t", "--output-type", @@ -253,7 +278,19 @@ def setup(): "-o", "--output", default="query_output", - help="a prefix filename to csv outputs from the tool. The filenames will be suffixed with the interval and .csv/.json (default: %(default)s)", # noqa + help="a prefix filename to outputs from the tool. The filenames will be suffixed with the interval and .csv/.json/... (default: %(default)s)", # noqa + ) + parser.add_argument( + "-d", + "--dryrun", + action="store_true", + help="displays the query that will be run without actually executing the query (default: %(default)s)", # noqa + ) + parser.add_argument( + "-b", + "--bypass-intersecting-intervals-phase", + action="store_true", + help="iterate only once bypassing the intersecting intervals phase (default: %(default)s)", # noqa ) args = parser.parse_args() @@ -369,6 +406,15 @@ class GenomicsDBExportConfig(NamedTuple): callset_file: str attributes: str filter: str + bypass_intersecting_intervals_phase: bool + + def __str__(self): + if self.filter: + filter_str = f" filter={self.filter}" + else: + filter_str = "" + bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}" + return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}" class GenomicsDBQueryConfig(NamedTuple): @@ -379,6 +425,13 @@ class GenomicsDBQueryConfig(NamedTuple): array_name: str row_tuples: List[tuple] + def __str__(self): + if self.row_tuples: + row_tuples_str = f"{self.row_tuples}" + else: + row_tuples_str = "ALL" + return f"\tinterval={self.interval} array={self.array_name} callset rows={row_tuples_str}" + class OutputConfig(NamedTuple): filename: str @@ -398,7 +451,7 @@ def configure_export(config: GenomicsDBExportConfig): export_config.workspace = config.workspace export_config.vid_mapping_file = config.vidmap_file export_config.callset_mapping_file = config.callset_file - export_config.bypass_intersecting_intervals_phase = False + export_config.bypass_intersecting_intervals_phase = config.bypass_intersecting_intervals_phase export_config.enable_shared_posixfs_optimizations = True if config.attributes: export_config.attributes.extend(config.attributes) @@ -505,7 +558,9 @@ def main(): max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") - export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file, attributes, args.filter) + export_config = GenomicsDBExportConfig( + workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase + ) configs = [] for interval in intervals: print(f"Processing interval({interval})...") @@ -513,7 +568,7 @@ def main(): contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) if len(arrays) == 0: logging.error(f"No arrays in the workspace matched input interval({interval})") - continue + # continue print(f"\tArrays:{arrays} under consideration for interval({interval})") for idx, array in enumerate(arrays): @@ -529,6 +584,42 @@ def main(): if len(configs) == 0: print("Nothing to process!!. Check output for possible errors") sys.exit(1) + + # Check if there is room for row_tuples to be parallelized + chunk_size = int(args.chunk_size) + if row_tuples is None and len(configs) < args.nproc and chunk_size > 1: + row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size) + if row_tuples: + new_configs = [] + for idx_row, row_tuple in enumerate(row_tuples): + for idx, config in enumerate(configs): + query_config = config.query_config + split_query_config = GenomicsDBQueryConfig( + query_config.interval, + query_config.contig, + query_config.start, + query_config.end, + query_config.array_name, + [row_tuple], + ) + output_config = config.output_config + split_output_config = OutputConfig( + generate_output_filename( + output, output_type, query_config.interval, len(configs) * idx + idx_row + ), + output_type, + json_type, + max_arrow_bytes, + ) + new_configs.append(Config(export_config, split_query_config, split_output_config)) + configs = new_configs + + if args.dryrun: + print(f"Query configurations for {export_config}:") + for config in configs: + print(config.query_config) + sys.exit(0) + if min(len(configs), args.nproc) == 1: results = list(map(process, configs)) else: diff --git a/examples/test.sh b/examples/test.sh index a4a648d..2c7e8dc 100755 --- a/examples/test.sh +++ b/examples/test.sh @@ -187,8 +187,14 @@ if [[ $PARTITION != "t0_1_2" ]]; then exit 1 fi run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT -d" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -a GT -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -i 1 -o $OUTPUT -d" +run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT -d" +run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -i 4 --chunk-size=4 -b -o $OUTPUT -d" OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json" run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS"