From 8260318b72f04ba82ed394ce0f48bb2c1fa58d98 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Mon, 13 Jan 2025 08:59:31 -0800 Subject: [PATCH 1/5] Add multiprocessing for samples in callset --- examples/genomicsdb_query | 93 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 4 deletions(-) diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index f952d55..99fe0c4 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -96,6 +96,26 @@ def parse_callset_json_for_row_ranges(callset_file, samples=None): return row_tuples +def parse_callset_json_for_split_row_ranges(callset_file, chunk_length): + callset = json.loads(genomicsdb.read_entire_file(callset_file)) + callsets = callset["callsets"] + chunks = int(len(callsets) / chunk_length + 1) + last_chunk_length = len(callsets) - (chunks - 1) * chunk_length + # Collapse small last chunk into the last but one chunk + if last_chunk_length < chunk_length / 2: + chunks -= 1 + last_chunk_length += chunk_length + if chunks == 1: + return None + split_row_ranges = [] + for i in range(0, chunks): + if i == chunks - 1: + split_row_ranges.append((chunk_length * i, chunk_length * i + last_chunk_length - 1)) + else: + split_row_ranges.append((chunk_length * i, chunk_length * (i + 1) - 1)) + return split_row_ranges + + def parse_vidmap_json_for_attributes(vidmap_file, attributes=None): if attributes is None: return ["GT"] @@ -229,6 +249,11 @@ def setup(): default=8, help="Optional - number of processing units for multiprocessing(default: %(default)s). Run nproc from command line to print the number of processing units available to a process for the user", # noqa ) + parser.add_argument( + "--chunk-size", + default=10240, + help="Optional - hint to split number of samples for multiprocessing used in conjunction with -n/--nproc and when -s/-S/--sample/--sample-list is not specified (default: %(default)s)", # noqa + ) parser.add_argument( "-t", "--output-type", @@ -253,7 +278,19 @@ def setup(): "-o", "--output", default="query_output", - help="a prefix filename to csv outputs from the tool. The filenames will be suffixed with the interval and .csv/.json (default: %(default)s)", # noqa + help="a prefix filename to outputs from the tool. The filenames will be suffixed with the interval and .csv/.json/... (default: %(default)s)", # noqa + ) + parser.add_argument( + "-d", + "--dryrun", + action="store_true", + help="displays the query that will be run without actually executing the query (default: %(default)s)", # noqa + ) + parser.add_argument( + "-b", + "--bypass-intersecting-intervals-phase", + action="store_true", + help="iterate only once bypassing the intersecting intervals phase (default: %(default)s)", # noqa ) args = parser.parse_args() @@ -369,6 +406,15 @@ class GenomicsDBExportConfig(NamedTuple): callset_file: str attributes: str filter: str + bypass_intersecting_intervals_phase: bool + + def __str__(self): + if self.filter: + filter_str = f" filter={self.filter}" + else: + filter_str = "" + bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}" + return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}" class GenomicsDBQueryConfig(NamedTuple): @@ -379,6 +425,9 @@ class GenomicsDBQueryConfig(NamedTuple): array_name: str row_tuples: List[tuple] + def __str__(self): + return f"\tinterval={self.interval} array={self.array_name} callset rows={self.row_tuples}" + class OutputConfig(NamedTuple): filename: str @@ -398,7 +447,7 @@ def configure_export(config: GenomicsDBExportConfig): export_config.workspace = config.workspace export_config.vid_mapping_file = config.vidmap_file export_config.callset_mapping_file = config.callset_file - export_config.bypass_intersecting_intervals_phase = False + export_config.bypass_intersecting_intervals_phase = config.bypass_intersecting_intervals_phase export_config.enable_shared_posixfs_optimizations = True if config.attributes: export_config.attributes.extend(config.attributes) @@ -505,7 +554,9 @@ def main(): max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") - export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file, attributes, args.filter) + export_config = GenomicsDBExportConfig( + workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase + ) configs = [] for interval in intervals: print(f"Processing interval({interval})...") @@ -513,7 +564,7 @@ def main(): contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) if len(arrays) == 0: logging.error(f"No arrays in the workspace matched input interval({interval})") - continue + # continue print(f"\tArrays:{arrays} under consideration for interval({interval})") for idx, array in enumerate(arrays): @@ -529,6 +580,40 @@ def main(): if len(configs) == 0: print("Nothing to process!!. Check output for possible errors") sys.exit(1) + + # Check if there is room for row_tuples to be parallelized + if len(configs) < args.nproc: + chunk_size = args.chunk_size + if row_tuples is None: + row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size) + new_configs = [] + for idx_row, row_tuple in enumerate(row_tuples): + for idx, config in enumerate(configs): + query_config = config.query_config + split_query_config = GenomicsDBQueryConfig( + query_config.interval, + query_config.contig, + query_config.start, + query_config.end, + query_config.array_name, + [row_tuple], + ) + output_config = config.output_config + split_output_config = OutputConfig( + generate_output_filename(output, output_type, query_config.interval, len(configs) * idx + idx_row), + output_type, + json_type, + max_arrow_bytes, + ) + new_configs.append(Config(export_config, split_query_config, split_output_config)) + configs = new_configs + + if args.dryrun: + print(f"Query configurations for {export_config}:") + for config in configs: + print(config.query_config) + sys.exit(0) + if min(len(configs), args.nproc) == 1: results = list(map(process, configs)) else: From 71052280687e3612e05ceb0e4fa7c2b1a4aa0bc2 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Mon, 13 Jan 2025 10:20:15 -0800 Subject: [PATCH 2/5] Add tests --- examples/genomicsdb_query | 18 +++++++++--------- examples/test.sh | 5 +++++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index 99fe0c4..8a3012c 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -96,23 +96,23 @@ def parse_callset_json_for_row_ranges(callset_file, samples=None): return row_tuples -def parse_callset_json_for_split_row_ranges(callset_file, chunk_length): +def parse_callset_json_for_split_row_ranges(callset_file, chunk_size): callset = json.loads(genomicsdb.read_entire_file(callset_file)) callsets = callset["callsets"] - chunks = int(len(callsets) / chunk_length + 1) - last_chunk_length = len(callsets) - (chunks - 1) * chunk_length + chunks = int(len(callsets) / chunk_size + 1) + last_chunk_size = len(callsets) - (chunks - 1) * chunk_size # Collapse small last chunk into the last but one chunk - if last_chunk_length < chunk_length / 2: + if last_chunk_size < chunk_size / 2: chunks -= 1 - last_chunk_length += chunk_length + last_chunk_size += chunk_size if chunks == 1: return None split_row_ranges = [] for i in range(0, chunks): if i == chunks - 1: - split_row_ranges.append((chunk_length * i, chunk_length * i + last_chunk_length - 1)) + split_row_ranges.append((chunk_size * i, chunk_size * i + last_chunk_size - 1)) else: - split_row_ranges.append((chunk_length * i, chunk_length * (i + 1) - 1)) + split_row_ranges.append((chunk_size * i, chunk_size * (i + 1) - 1)) return split_row_ranges @@ -582,8 +582,8 @@ def main(): sys.exit(1) # Check if there is room for row_tuples to be parallelized - if len(configs) < args.nproc: - chunk_size = args.chunk_size + chunk_size = int(args.chunk_size) + if len(configs) < args.nproc and chunk_size > 1: if row_tuples is None: row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size) new_configs = [] diff --git a/examples/test.sh b/examples/test.sh index a4a648d..e693b47 100755 --- a/examples/test.sh +++ b/examples/test.sh @@ -187,8 +187,13 @@ if [[ $PARTITION != "t0_1_2" ]]; then exit 1 fi run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT -d" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -a GT -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -i 1 -o $OUTPUT -d" +run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT -d" +run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT" OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json" run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS" From 5c603e74643e73e533d06c6150a1d8711d8d8c75 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Tue, 14 Jan 2025 12:43:07 -0800 Subject: [PATCH 3/5] Address PR comments --- examples/genomicsdb_query | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index 8a3012c..f6b7721 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -583,9 +583,8 @@ def main(): # Check if there is room for row_tuples to be parallelized chunk_size = int(args.chunk_size) - if len(configs) < args.nproc and chunk_size > 1: - if row_tuples is None: - row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size) + if len(configs) < args.nproc and chunk_size > 1 and row_tuples is None: + row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size) new_configs = [] for idx_row, row_tuple in enumerate(row_tuples): for idx, config in enumerate(configs): From 105132a5a83c833bc2c43b3eae0574f64182acf7 Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Tue, 14 Jan 2025 15:12:04 -0800 Subject: [PATCH 4/5] Check if row_tuples is None after processing --- examples/genomicsdb_query | 51 +++++++++++++++++++++------------------ examples/test.sh | 1 + 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index f6b7721..8faf650 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -426,7 +426,11 @@ class GenomicsDBQueryConfig(NamedTuple): row_tuples: List[tuple] def __str__(self): - return f"\tinterval={self.interval} array={self.array_name} callset rows={self.row_tuples}" + if self.row_tuples: + row_tuples_str = f"{self.row_tuples}" + else: + row_tuples_str = "ALL" + return f"\tinterval={self.interval} array={self.array_name} callset rows={row_tuples_str}" class OutputConfig(NamedTuple): @@ -583,29 +587,30 @@ def main(): # Check if there is room for row_tuples to be parallelized chunk_size = int(args.chunk_size) - if len(configs) < args.nproc and chunk_size > 1 and row_tuples is None: + if row_tuples is None and len(configs) < args.nproc and chunk_size > 1: row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size) - new_configs = [] - for idx_row, row_tuple in enumerate(row_tuples): - for idx, config in enumerate(configs): - query_config = config.query_config - split_query_config = GenomicsDBQueryConfig( - query_config.interval, - query_config.contig, - query_config.start, - query_config.end, - query_config.array_name, - [row_tuple], - ) - output_config = config.output_config - split_output_config = OutputConfig( - generate_output_filename(output, output_type, query_config.interval, len(configs) * idx + idx_row), - output_type, - json_type, - max_arrow_bytes, - ) - new_configs.append(Config(export_config, split_query_config, split_output_config)) - configs = new_configs + if row_tuples: + new_configs = [] + for idx_row, row_tuple in enumerate(row_tuples): + for idx, config in enumerate(configs): + query_config = config.query_config + split_query_config = GenomicsDBQueryConfig( + query_config.interval, + query_config.contig, + query_config.start, + query_config.end, + query_config.array_name, + [row_tuple], + ) + output_config = config.output_config + split_output_config = OutputConfig( + generate_output_filename(output, output_type, query_config.interval, len(configs) * idx + idx_row), + output_type, + json_type, + max_arrow_bytes, + ) + new_configs.append(Config(export_config, split_query_config, split_output_config)) + configs = new_configs if args.dryrun: print(f"Query configurations for {export_config}:") diff --git a/examples/test.sh b/examples/test.sh index e693b47..2c7e8dc 100755 --- a/examples/test.sh +++ b/examples/test.sh @@ -194,6 +194,7 @@ run_command "genomicsdb_query -w $WORKSPACE -i 1 -o $OUTPUT -d" run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT -d" run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -i 4 --chunk-size=4 -b -o $OUTPUT -d" OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json" run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS" From dd4091eaa3fe067d93747a8bb5c9884b2dd2276a Mon Sep 17 00:00:00 2001 From: Nalini Ganapati Date: Tue, 14 Jan 2025 15:14:31 -0800 Subject: [PATCH 5/5] Lint --- examples/genomicsdb_query | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index 8faf650..04724d8 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -604,7 +604,9 @@ def main(): ) output_config = config.output_config split_output_config = OutputConfig( - generate_output_filename(output, output_type, query_config.interval, len(configs) * idx + idx_row), + generate_output_filename( + output, output_type, query_config.interval, len(configs) * idx + idx_row + ), output_type, json_type, max_arrow_bytes,