diff --git a/pyproject.toml b/pyproject.toml index f4a62cf2..02f842b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,7 +78,7 @@ Source = "https://github.com/ga4gh/vrs-python" "Bug Tracker" = "https://github.com/ga4gh/vrs-python/issues" [project.scripts] -vrs-annotate = "ga4gh.vrs.extras.vcf_annotation:_cli" +vrs-annotate = "ga4gh.vrs.extras.annotator.cli:_cli" [build-system] requires = ["setuptools>=65.3", "setuptools_scm>=8"] @@ -193,9 +193,6 @@ exclude = [ "ANN201", "ANN202", ] -"src/ga4gh/vrs/extras/vcf_annotation.py" = [ - "PTH123", # see https://github.com/ga4gh/vrs-python/issues/482 -] "src/ga4gh/vrs/extras/object_store.py" = [ "ANN", "D", diff --git a/src/ga4gh/vrs/extras/annotator/__init__.py b/src/ga4gh/vrs/extras/annotator/__init__.py new file mode 100644 index 00000000..7b787f94 --- /dev/null +++ b/src/ga4gh/vrs/extras/annotator/__init__.py @@ -0,0 +1 @@ +"""Provide tools for annotating data with corresponding VRS objects and attributes.""" diff --git a/src/ga4gh/vrs/extras/annotator/cli.py b/src/ga4gh/vrs/extras/annotator/cli.py new file mode 100644 index 00000000..612335c2 --- /dev/null +++ b/src/ga4gh/vrs/extras/annotator/cli.py @@ -0,0 +1,182 @@ +"""Define command-line interface for VRS annotator tool. + +$ vrs-annotate vcf input.vcf.gz --vcf_out output.vcf.gz --vrs_pickle_out vrs_objects.pkl + +""" + +import logging +from collections.abc import Callable +from enum import Enum +from pathlib import Path +from timeit import default_timer as timer + +import click + +from ga4gh.vrs.dataproxy import create_dataproxy +from ga4gh.vrs.extras.annotator.vcf import VCFAnnotator + +_logger = logging.getLogger(__name__) + + +@click.group() +def _cli() -> None: + """Annotate input files with VRS variation objects.""" + logging.basicConfig( + filename="vrs-annotator.log", + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + +class _LogLevel(str, Enum): + """Define legal values for `--log_level` option.""" + + DEBUG = "debug" + INFO = "info" + WARNING = "warning" + ERROR = "error" + CRITICAL = "critical" + + +def _log_level_option(func: Callable) -> Callable: + """Provide reusable log level CLI option decorator. + + Adds a `--log_level` CLI option to any decorated command. Doesn't pass on any + values, just sets the logging level for this module. + + :param func: incoming click command + :return: same command, wrapped with log level option + """ + + def _set_log_level(ctx: dict, param: str, value: _LogLevel) -> None: # noqa: ARG001 + level_map = { + _LogLevel.DEBUG: logging.DEBUG, + _LogLevel.INFO: logging.INFO, + _LogLevel.WARNING: logging.WARNING, + _LogLevel.ERROR: logging.ERROR, + _LogLevel.CRITICAL: logging.CRITICAL, + } + logging.getLogger(__name__).setLevel(level_map[value]) + + return click.option( + "--log_level", + type=click.Choice([v.value for v in _LogLevel.__members__.values()]), + default="info", + help="Set the logging level.", + callback=_set_log_level, + expose_value=False, + is_eager=True, + )(func) + + +@_cli.command(name="vcf") +@_log_level_option +@click.argument( + "vcf-in", + nargs=1, + type=click.Path(exists=True, readable=True, dir_okay=False, path_type=Path), +) +@click.option( + "--vcf-out", + required=False, + type=click.Path(writable=True, allow_dash=False, path_type=Path), + help=( + "Declare save location for output annotated VCF. If not provided, must provide --vrs_pickle_out." + ), +) +@click.option( + "--pkl-out", + required=False, + type=click.Path(writable=True, allow_dash=False, path_type=Path), + help=( + "Declare save location for output VCF pickle. If not provided, must provide --vcf_out." + ), +) +@click.option( + "--incl-vrs-attrs", + is_flag=True, + default=False, + help="Include VRS_Start, VRS_End, and VRS_State fields in the VCF output INFO field.", +) +@click.option( + "--dataproxy-uri", + required=False, + default="seqrepo+http://localhost:5000/seqrepo", + help="URI declaring source of sequence data. See subcommand description for more information.", + show_default=True, +) +@click.option( + "--assembly", + required=False, + default="GRCh38", + show_default=True, + help="Specify assembly that was used to create input VCF.", + type=str, +) +@click.option( + "--incl-ref-allele", + is_flag=True, + default=False, + help="Skip VRS computation for REF alleles.", +) +@click.option( + "--require-validation", + is_flag=True, + default=False, + help="Require validation checks to pass to construct a VRS object.", +) +@click.option( + "--silent", + "-s", + is_flag=True, + default=False, + help="Suppress messages printed to stdout", +) +def _annotate_vcf_cli( + vcf_in: Path, + vcf_out: Path | None, + pkl_out: Path | None, + dataproxy_uri: str, + assembly: str, + incl_vrs_attrs: bool, + incl_ref_allele: bool, + require_validation: bool, + silent: bool, +) -> None: + """Extract VRS objects from VCF located at VCF_IN. + + $ vrs-annotate vcf input.vcf.gz --vcf_out output.vcf.gz --vrs_pickle_out vrs_objects.pkl + + Note that at least one of --vcf_out or --vrs_pickle_out must be selected and defined. + + Sequence data from a provider such as SeqRepo is required. Use the `--dataproxy_api` + option or the environment variable `GA4GH_VRS_DATAPROXY_URI` to define its location. + Currently accepted URI schemes: + + \b + * seqrepo+file:///path/to/seqrepo/root + * seqrepo+:../relative/path/to/seqrepo/root + * seqrepo+http://localhost:5000/seqrepo + * seqrepo+https://somewhere:5000/seqrepo + """ # noqa: D301 + data_proxy = create_dataproxy(dataproxy_uri) + annotator = VCFAnnotator(data_proxy) + start = timer() + msg = f"Annotating {vcf_in} with the VCF Annotator..." + _logger.info(msg) + if not silent: + click.echo(msg) + annotator.annotate( + vcf_in, + output_vcf_path=vcf_out, + output_pkl_path=pkl_out, + incl_vrs_attrs=incl_vrs_attrs, + incl_ref_allele=incl_ref_allele, + assembly=assembly, + require_validation=require_validation, + ) + end = timer() + msg = f"VCF Annotator finished in {(end - start):.5f} seconds" + _logger.info(msg) + if not silent: + click.echo(msg) diff --git a/src/ga4gh/vrs/extras/annotator/vcf.py b/src/ga4gh/vrs/extras/annotator/vcf.py new file mode 100644 index 00000000..9b703967 --- /dev/null +++ b/src/ga4gh/vrs/extras/annotator/vcf.py @@ -0,0 +1,311 @@ +"""Annotate VCFs with VRS.""" + +import logging +import pickle +from pathlib import Path +from typing import ClassVar + +import pysam + +from ga4gh.core import VrsObjectIdentifierIs, use_ga4gh_compute_identifier_when +from ga4gh.vrs.dataproxy import _DataProxy +from ga4gh.vrs.extras.translator import AlleleTranslator + +_logger = logging.getLogger(__name__) + + +class VCFAnnotatorError(Exception): + """Raise for errors specific to the VCF annotation process""" + + +class VCFAnnotator: + """Annotate VCFs with VRS allele IDs. + + Uses pysam to read, store, and (optionally) output VCFs. Alleles are translated + into VRS IDs using the VRS-Python translator class. + """ + + # Field names for VCF + VRS_ALLELE_IDS_FIELD = "VRS_Allele_IDs" + VRS_STARTS_FIELD = "VRS_Starts" + VRS_ENDS_FIELD = "VRS_Ends" + VRS_STATES_FIELD = "VRS_States" + VRS_ERROR_FIELD = "VRS_Error" + # VCF character escape map + VCF_ESCAPE_MAP: ClassVar = str.maketrans( + { + "%": "%25", + ";": "%3B", + ",": "%2C", + "\r": "%0D", + "\n": "%0A", + "\t": "%09", + } + ) + + def __init__(self, data_proxy: _DataProxy) -> None: + """Initialize the VCFAnnotator class. + + :param data_proxy: GA4GH sequence dataproxy instance. + """ + self.data_proxy = data_proxy + self.tlr = AlleleTranslator(self.data_proxy) + + def _update_vcf_header( + self, + vcf: pysam.VariantFile, + incl_ref_allele: bool, + incl_vrs_attrs: bool, + ) -> None: + """Add new fields to VCF header + + :param vcf: pysam VCF object to annotate + :param incl_ref_allele: whether VRS alleles will be calculated for REFs + :param incl_vrs_attrs: whether INFO properties should be defined for VRS attributes + (normalized coordinates/state) + """ + info_field_num = "R" if incl_ref_allele else "A" + info_field_desc = "REF and ALT" if incl_ref_allele else "ALT" + vcf.header.info.add( + self.VRS_ALLELE_IDS_FIELD, + info_field_num, + "String", + f"The computed identifiers for the GA4GH VRS Alleles corresponding to the GT indexes of the {info_field_desc} alleles", + ) + vcf.header.info.add( + self.VRS_ERROR_FIELD, + ".", + "String", + "If an error occurred computing a VRS Identifier, the error message", + ) + + if incl_vrs_attrs: + vcf.header.info.add( + self.VRS_STARTS_FIELD, + info_field_num, + "String", + f"Interresidue coordinates used as the location starts for the GA4GH VRS Alleles corresponding to the GT indexes of the {info_field_desc} alleles", + ) + vcf.header.info.add( + self.VRS_ENDS_FIELD, + info_field_num, + "String", + f"Interresidue coordinates used as the location ends for the GA4GH VRS Alleles corresponding to the GT indexes of the {info_field_desc} alleles", + ) + vcf.header.info.add( + self.VRS_STATES_FIELD, + info_field_num, + "String", + f"The literal sequence states used for the GA4GH VRS Alleles corresponding to the GT indexes of the {info_field_desc} alleles", + ) + + def _process_allele( + self, + vcf_coords: str, + vrs_data: dict, + annotations: dict, + assembly: str, + vrs_data_key: str | None = None, + create_pickle: bool = True, + incl_vrs_attrs: bool = False, + require_validation: bool = True, + ) -> None: + """Get VRS object given `vcf_coords`. `vrs_data` and `vrs_field_data` will + be mutated. + + # TODO update + """ + try: + vrs_obj = self.tlr._from_gnomad( # noqa: SLF001 + vcf_coords, + assembly_name=assembly, + require_validation=require_validation, + ) + except Exception: + vrs_obj = None + _logger.exception( + "Exception encountered during translation of variation: %s", vcf_coords + ) + raise + if vrs_obj is None: + _logger.debug( + "None was returned when translating %s from gnomad", vcf_coords + ) + + if create_pickle and vrs_obj: + key = vrs_data_key if vrs_data_key else vcf_coords + vrs_data[key] = str(vrs_obj.model_dump(exclude_none=True)) + + if annotations: + allele_id = vrs_obj.id if vrs_obj else "" + annotations[self.VRS_ALLELE_IDS_FIELD].append(allele_id) + + if incl_vrs_attrs: + if vrs_obj: + start = str(vrs_obj.location.start) + end = str(vrs_obj.location.end) + alt = ( + str(vrs_obj.state.sequence.root) + if vrs_obj.state.sequence + else "" + ) + else: + start = end = alt = "" + + annotations[self.VRS_STARTS_FIELD].append(start) + annotations[self.VRS_ENDS_FIELD].append(end) + annotations[self.VRS_STATES_FIELD].append(alt) + + def _process_vcf_row( + self, + record: pysam.VariantRecord, + vrs_data: dict, + assembly: str, + vrs_info_fields: list[str], + incl_vrs_attrs: bool, + incl_ref_allele: bool, + create_pickle: bool, + require_validation: bool, + ) -> dict: + """Compute VRS objects for a VCF row. + + Get VRS data for record's reference (if requested) and alt alleles. Return + INFO field values to annotate VCF row with. + + # TODO update these + """ + info_field_annotations = {field: [] for field in vrs_info_fields} + + # Get VRS data for reference allele + gnomad_loc = f"{record.chrom}-{record.pos}" + if incl_ref_allele: + reference_allele = f"{gnomad_loc}-{record.ref}-{record.ref}" + self._process_allele( + reference_allele, + vrs_data, + info_field_annotations, + assembly, + create_pickle=create_pickle, + incl_vrs_attrs=incl_vrs_attrs, + require_validation=require_validation, + ) + + # Get VRS data for alts + alts = record.alts or [] + alleles = [f"{gnomad_loc}-{record.ref}-{a}" for a in [*alts]] + data_key = f"{record.chrom}\t{record.pos}\t{record.ref}\t{record.alts}" + for allele in alleles: + if "*" in allele: + _logger.debug("Star allele found: %s", allele) + for field in vrs_info_fields: + info_field_annotations[field].append("") + else: + self._process_allele( + allele, + vrs_data, + info_field_annotations, + assembly, + vrs_data_key=data_key, + create_pickle=create_pickle, + incl_vrs_attrs=incl_vrs_attrs, + require_validation=require_validation, + ) + + return info_field_annotations + + @use_ga4gh_compute_identifier_when(VrsObjectIdentifierIs.MISSING) + def annotate( + self, + input_vcf_path: Path, + output_vcf_path: Path | None = None, + output_pkl_path: Path | None = None, + incl_vrs_attrs: bool = False, + incl_ref_allele: bool = True, + assembly: str = "GRCh38", + require_validation: bool = True, + ) -> None: + """Given a VCF, produce an output VCF annotated with VRS allele IDs, and/or + a pickle file containing the full VRS objects. + + :param input_vcf_path: location of input VCF + :param output_vcf_path: location at which to save output VCF (optional) + :param output_pkl_path: location at which to save output PKL file (output) + :param incl_vrs_attrs: whether ``VRS_Start``, ``VRS_End``, and ``VRS_State`` + attributes should be included in output VCF info field. These properties + may be useful to retain outside of the VRS object for reasons like + searchability. Does nothing if ``output_vcf_path`` left unset. + :param incl_ref_allele: If true, perform VRS ID computation for REF allele and + include the corresponding VRS object in any data dumps + :param assembly: The assembly used in `vcf_in` data + :param require_validation: If ``True``, validation checks (i.e., REF value + matches expected REF at given location) must pass in order to return a VRS + object for a record. If ``False`` then VRS object will be returned even if + validation checks fail, although all instances of failed validation are + logged as warnings regardless. + """ + if not any((output_vcf_path, output_pkl_path)): + msg = "Must provide one of: `output_vcf_path` or `output_pkl_path`" + raise VCFAnnotatorError(msg) + + vcf = pysam.VariantFile(filename=str(input_vcf_path.absolute())) + if output_vcf_path: + self._update_vcf_header(vcf, incl_ref_allele, incl_vrs_attrs) + + vcf_out = ( + pysam.VariantFile(str(output_vcf_path.absolute()), "w", header=vcf.header) + if output_vcf_path + else None + ) + create_pkl = bool(output_pkl_path) + + vrs_data = {} + for record in vcf: + if vcf_out: + vrs_info_fields = [self.VRS_ALLELE_IDS_FIELD] + if incl_vrs_attrs: + vrs_info_fields += [ + self.VRS_STARTS_FIELD, + self.VRS_ENDS_FIELD, + self.VRS_STATES_FIELD, + ] + else: + # no info fields are necessary if we aren't producing an annotated VCF + vrs_info_fields = [] + try: + vrs_info_field_annotations = self._process_vcf_row( + record, + vrs_data, + assembly, + vrs_info_fields, + incl_vrs_attrs=incl_vrs_attrs, + incl_ref_allele=incl_ref_allele, + create_pickle=create_pkl, + require_validation=require_validation, + ) + except Exception as ex: + _logger.exception("VRS error on %s-%s", record.chrom, record.pos) + err_msg = f"{ex}" or f"{type(ex)}" + err_msg = err_msg.translate(self.VCF_ESCAPE_MAP) + vrs_info_fields = [self.VRS_ERROR_FIELD] + vrs_info_field_annotations = {self.VRS_ERROR_FIELD: [err_msg]} + + _logger.debug( + "VCF record %s-%s generated vrs_field_data %s", + record.chrom, + record.pos, + vrs_info_field_annotations, + ) + if output_vcf_path and vcf_out: + for k in vrs_info_fields: + record.info[k] = [ + value or "." for value in vrs_info_field_annotations[k] + ] + vcf_out.write(record) + + vcf.close() + + if vcf_out: + vcf_out.close() + if output_pkl_path: + with output_pkl_path.open("wb") as wf: + pickle.dump(vrs_data, wf) diff --git a/src/ga4gh/vrs/extras/vcf_annotation.py b/src/ga4gh/vrs/extras/vcf_annotation.py deleted file mode 100644 index d0748bdf..00000000 --- a/src/ga4gh/vrs/extras/vcf_annotation.py +++ /dev/null @@ -1,567 +0,0 @@ -"""Annotate VCFs with VRS - -$ vrs-annotate vcf input.vcf.gz --vcf_out output.vcf.gz --vrs_pickle_out vrs_objects.pkl - -""" - -import logging -import pathlib -import pickle -from collections.abc import Callable -from enum import Enum -from timeit import default_timer as timer - -import click -import pysam -from biocommons.seqrepo import SeqRepo -from pydantic import ValidationError - -from ga4gh.core import VrsObjectIdentifierIs, use_ga4gh_compute_identifier_when -from ga4gh.vrs.dataproxy import ( - DataProxyValidationError, - SeqRepoDataProxy, - SeqRepoRESTDataProxy, -) -from ga4gh.vrs.extras.translator import AlleleTranslator - -_logger = logging.getLogger(__name__) -_logger.setLevel(logging.DEBUG) - - -class VCFAnnotatorException(Exception): # noqa: N818 - """Custom exceptions for VCF Annotator tool""" - - -class SeqRepoProxyType(str, Enum): - """Define constraints for SeqRepo Data Proxy types""" - - LOCAL = "local" - REST = "rest" - - -@click.group() -def _cli() -> None: - """Annotate input files with VRS variation objects.""" - logging.basicConfig( - filename="vrs-annotate.log", - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) - - -class _LogLevel(str, Enum): - """Define legal values for `--log_level` option.""" - - DEBUG = "debug" - INFO = "info" - WARNING = "warning" - ERROR = "error" - CRITICAL = "critical" - - -def _log_level_option(func: Callable) -> Callable: - """Provide reusable log level CLI option decorator. - - Adds a `--log_level` CLI option to any decorated command. Doesn't pass on any - values, just sets the logging level for this module. - - :param func: incoming click command - :return: same command, wrapped with log level option - """ - - def _set_log_level(ctx: dict, param: str, value: _LogLevel) -> None: # noqa: ARG001 - level_map = { - _LogLevel.DEBUG: logging.DEBUG, - _LogLevel.INFO: logging.INFO, - _LogLevel.WARNING: logging.WARNING, - _LogLevel.ERROR: logging.ERROR, - _LogLevel.CRITICAL: logging.CRITICAL, - } - logging.getLogger(__name__).setLevel(level_map[value]) - - return click.option( - "--log_level", - type=click.Choice([v.value for v in _LogLevel.__members__.values()]), - default="info", - help="Set the logging level.", - callback=_set_log_level, - expose_value=False, - is_eager=True, - )(func) - - -@_cli.command(name="vcf") -@_log_level_option -@click.argument( - "vcf_in", - nargs=1, - type=click.Path(exists=True, readable=True, dir_okay=False, path_type=pathlib.Path), -) -@click.option( - "--vcf_out", - required=False, - type=click.Path(writable=True, allow_dash=False, path_type=pathlib.Path), - help=( - "Declare save location for output annotated VCF. If not provided, must provide --vrs_pickle_out." - ), -) -@click.option( - "--vrs_pickle_out", - required=False, - type=click.Path(writable=True, allow_dash=False, path_type=pathlib.Path), - help=( - "Declare save location for output VCF pickle. If not provided, must provide --vcf_out." - ), -) -@click.option( - "--vrs_attributes", - is_flag=True, - default=False, - help="Include VRS_Start, VRS_End, and VRS_State fields in the VCF output INFO field.", -) -@click.option( - "--seqrepo_dp_type", - required=False, - default=SeqRepoProxyType.LOCAL, - type=click.Choice( - [v.value for v in SeqRepoProxyType.__members__.values()], case_sensitive=True - ), - help="Specify type of SeqRepo dataproxy to use.", - show_default=True, - show_choices=True, -) -@click.option( - "--seqrepo_root_dir", - required=False, - default=pathlib.Path("/usr/local/share/seqrepo/latest"), - type=click.Path(path_type=pathlib.Path), - help="Define root directory for local SeqRepo instance, if --seqrepo_dp_type=local.", - show_default=True, -) -@click.option( - "--seqrepo_base_url", - required=False, - default="http://localhost:5000/seqrepo", - help="Specify base URL for SeqRepo REST API, if --seqrepo_dp_type=rest.", - show_default=True, -) -@click.option( - "--assembly", - required=False, - default="GRCh38", - show_default=True, - help="Specify assembly that was used to create input VCF.", - type=str, -) -@click.option( - "--skip_ref", - is_flag=True, - default=False, - help="Skip VRS computation for REF alleles.", -) -@click.option( - "--require_validation", - is_flag=True, - default=False, - help="Require validation checks to pass to construct a VRS object.", -) -@click.option( - "--silent", - "-s", - is_flag=True, - default=False, - help="Suppress messages printed to stdout", -) -def _annotate_vcf_cli( - vcf_in: pathlib.Path, - vcf_out: pathlib.Path | None, - vrs_pickle_out: pathlib.Path | None, - vrs_attributes: bool, - seqrepo_dp_type: SeqRepoProxyType, - seqrepo_root_dir: pathlib.Path, - seqrepo_base_url: str, - assembly: str, - skip_ref: bool, - require_validation: bool, - silent: bool, -) -> None: - """Extract VRS objects from VCF located at VCF_IN. - - $ vrs-annotate vcf input.vcf.gz --vcf_out output.vcf.gz --vrs_pickle_out vrs_objects.pkl - - Note that at least one of --vcf_out or --vrs_pickle_out must be selected and defined. - """ - annotator = VCFAnnotator( - seqrepo_dp_type, seqrepo_base_url, str(seqrepo_root_dir.absolute()) - ) - vcf_out_str = str(vcf_out.absolute()) if vcf_out is not None else vcf_out - vrs_pkl_out_str = ( - str(vrs_pickle_out.absolute()) if vrs_pickle_out is not None else vrs_pickle_out - ) - start = timer() - msg = f"Annotating {vcf_in} with the VCF Annotator..." - _logger.info(msg) - if not silent: - click.echo(msg) - annotator.annotate( - str(vcf_in.absolute()), - vcf_out=vcf_out_str, - vrs_pickle_out=vrs_pkl_out_str, - vrs_attributes=vrs_attributes, - assembly=assembly, - compute_for_ref=(not skip_ref), - require_validation=require_validation, - ) - end = timer() - msg = f"VCF Annotator finished in {(end - start):.5f} seconds" - _logger.info(msg) - if not silent: - click.echo(msg) - - -class VCFAnnotator: - """Annotate VCFs with VRS allele IDs. - - Uses pysam to read, store, and (optionally) output VCFs. Alleles are translated - into VRS IDs using the VRS-Python translator class. - """ - - # Field names for VCF - VRS_ALLELE_IDS_FIELD = "VRS_Allele_IDs" - VRS_STARTS_FIELD = "VRS_Starts" - VRS_ENDS_FIELD = "VRS_Ends" - VRS_STATES_FIELD = "VRS_States" - VRS_ERROR_FIELD = "VRS_Error" - # VCF character escape map - VCF_ESCAPE_MAP = [ # noqa: RUF012 - ("%", "%25"), - (";", "%3B"), - (",", "%2C"), - ("\r", "%0D"), - ("\n", "%0A"), - ("\t", "%09"), - ] - - def __init__( - self, - seqrepo_dp_type: SeqRepoProxyType = SeqRepoProxyType.LOCAL, - seqrepo_base_url: str = "http://localhost:5000/seqrepo", - seqrepo_root_dir: str = "/usr/local/share/seqrepo/latest", - ) -> None: - """Initialize the VCFAnnotator class. - - :param seqrepo_dp_type: The type of SeqRepo Data Proxy to use - (i.e., local vs REST) - :param seqrepo_base_url: The base url for SeqRepo REST API - :param seqrepo_root_dir: The root directory for the local SeqRepo instance - """ - if seqrepo_dp_type == SeqRepoProxyType.LOCAL: - self.dp = SeqRepoDataProxy(SeqRepo(seqrepo_root_dir)) - else: - self.dp = SeqRepoRESTDataProxy(seqrepo_base_url) - self.tlr = AlleleTranslator(self.dp) - - @use_ga4gh_compute_identifier_when(VrsObjectIdentifierIs.MISSING) - def annotate( - self, - vcf_in: str, - vcf_out: str | None = None, - vrs_pickle_out: str | None = None, - vrs_attributes: bool = False, - assembly: str = "GRCh38", - compute_for_ref: bool = True, - require_validation: bool = True, - ) -> None: - """Given a VCF, produce an output VCF annotated with VRS allele IDs, and/or - a pickle file containing the full VRS objects. - - :param vcf_in: Location of input VCF - :param vcf_out: The path for the output VCF file - :param vrs_pickle_out: The path for the output VCF pickle file - :param vrs_attributes: If `True`, include VRS_Start, VRS_End, VRS_State - properties in the VCF INFO field. If `False` will not include these - properties. Only used if `vcf_out` is defined. - :param assembly: The assembly used in `vcf_in` data - :param compute_for_ref: If true, compute VRS IDs for the reference allele - :param require_validation: If `True`, validation checks (i.e., REF value - matches expected REF at given location) must pass in order to return a VRS - object for a record. If `False` then VRS object will be returned even if - validation checks fail, although all instances of failed validation are - logged as warnings regardless. - """ - if not any((vcf_out, vrs_pickle_out)): - msg = "Must provide one of: `vcf_out` or `vrs_pickle_out`" - raise VCFAnnotatorException(msg) - - info_field_num = "R" if compute_for_ref else "A" - info_field_desc = "REF and ALT" if compute_for_ref else "ALT" - - vrs_data = {} - vcf_in = pysam.VariantFile(filename=vcf_in) - vcf_in.header.info.add( - self.VRS_ALLELE_IDS_FIELD, - info_field_num, - "String", - ( - "The computed identifiers for the GA4GH VRS Alleles corresponding to the " - f"GT indexes of the {info_field_desc} alleles" - ), - ) - vcf_in.header.info.add( - self.VRS_ERROR_FIELD, - ".", - "String", - ("If an error occurred computing a VRS Identifier, the error message"), - ) - - if vrs_attributes: - vcf_in.header.info.add( - self.VRS_STARTS_FIELD, - info_field_num, - "String", - ( - "Interresidue coordinates used as the location starts for the GA4GH " - f"VRS Alleles corresponding to the GT indexes of the {info_field_desc} alleles" - ), - ) - vcf_in.header.info.add( - self.VRS_ENDS_FIELD, - info_field_num, - "String", - ( - "Interresidue coordinates used as the location ends for the GA4GH VRS " - f"Alleles corresponding to the GT indexes of the {info_field_desc} alleles" - ), - ) - vcf_in.header.info.add( - self.VRS_STATES_FIELD, - info_field_num, - "String", - ( - "The literal sequence states used for the GA4GH VRS Alleles " - f"corresponding to the GT indexes of the {info_field_desc} alleles" - ), - ) - - if vcf_out: - vcf_out = pysam.VariantFile(vcf_out, "w", header=vcf_in.header) - - output_vcf = bool(vcf_out) - output_pickle = bool(vrs_pickle_out) - - for record in vcf_in: - additional_info_fields = [self.VRS_ALLELE_IDS_FIELD] - if vrs_attributes: - additional_info_fields += [ - self.VRS_STARTS_FIELD, - self.VRS_ENDS_FIELD, - self.VRS_STATES_FIELD, - ] - try: - vrs_field_data = self._get_vrs_data( - record, - vrs_data, - assembly, - additional_info_fields, - vrs_attributes=vrs_attributes, - output_pickle=output_pickle, - output_vcf=output_vcf, - compute_for_ref=compute_for_ref, - require_validation=require_validation, - ) - except Exception as ex: - _logger.exception("VRS error on %s-%s", record.chrom, record.pos) - err_msg = f"{ex}" or f"{type(ex)}" - for search_repl in VCFAnnotator.VCF_ESCAPE_MAP: - err_msg = err_msg.replace(search_repl[0], search_repl[1]) - additional_info_fields = [self.VRS_ERROR_FIELD] - vrs_field_data = {self.VRS_ERROR_FIELD: [err_msg]} - - _logger.debug( - "VCF record %s-%s generated vrs_field_data %s", - record.chrom, - record.pos, - vrs_field_data, - ) - - if output_vcf: - for k in additional_info_fields: - record.info[k] = [value or "." for value in vrs_field_data[k]] - vcf_out.write(record) - - vcf_in.close() - - if output_vcf: - vcf_out.close() - - if vrs_pickle_out: - with open(vrs_pickle_out, "wb") as wf: - pickle.dump(vrs_data, wf) - - def _get_vrs_object( - self, - vcf_coords: str, - vrs_data: dict, - vrs_field_data: dict, - assembly: str, - vrs_data_key: str | None = None, - output_pickle: bool = True, - output_vcf: bool = False, - vrs_attributes: bool = False, - require_validation: bool = True, - ) -> None: - """Get VRS object given `vcf_coords`. `vrs_data` and `vrs_field_data` will - be mutated. - - :param vcf_coords: Allele to get VRS object for. Format is chr-pos-ref-alt - :param vrs_data: Dictionary containing the VRS object information for the VCF - :param vrs_field_data: If `output_vcf`, keys are VRS Fields and values are list - of VRS data. Empty otherwise. - :param assembly: The assembly used in `vcf_coords` - :param vrs_data_key: The key to update in `vrs_data`. If not provided, will use - `vcf_coords` as the key. - :param output_pickle: If `True`, VRS pickle file will be output. - :param output_vcf: If `True`, annotated VCF file will be output. - :param vrs_attributes: If `True` will include VRS_Start, VRS_End, VRS_State - fields in the INFO field. If `False` will not include these fields. Only - used if `output_vcf` set to `True`. - :param require_validation: If `True` then validation checks must pass in order - to return a VRS object. If `False` then VRS object will be returned even if - validation checks fail. Defaults to `True`. - """ - try: - vrs_obj = self.tlr._from_gnomad( # noqa: SLF001 - vcf_coords, - assembly_name=assembly, - require_validation=require_validation, - ) - except (ValidationError, DataProxyValidationError): - vrs_obj = None - _logger.exception( - "ValidationError when translating %s from gnomad", vcf_coords - ) - raise - except KeyError: - vrs_obj = None - _logger.exception("KeyError when translating %s from gnomad", vcf_coords) - raise - except AssertionError: - vrs_obj = None - _logger.exception( - "AssertionError when translating %s from gnomad", vcf_coords - ) - raise - except Exception: - vrs_obj = None - _logger.exception( - "Unhandled Exception when translating %s from gnomad", vcf_coords - ) - raise - else: - if not vrs_obj: - _logger.debug( - "None was returned when translating %s from gnomad", vcf_coords - ) - - if output_pickle and vrs_obj: - key = vrs_data_key if vrs_data_key else vcf_coords - vrs_data[key] = str(vrs_obj.model_dump(exclude_none=True)) - - if output_vcf: - allele_id = vrs_obj.id if vrs_obj else "" - vrs_field_data[self.VRS_ALLELE_IDS_FIELD].append(allele_id) - - if vrs_attributes: - if vrs_obj: - start = str(vrs_obj.location.start) - end = str(vrs_obj.location.end) - alt = ( - str(vrs_obj.state.sequence.root) - if vrs_obj.state.sequence - else "" - ) - else: - start = "" - end = "" - alt = "" - - vrs_field_data[self.VRS_STARTS_FIELD].append(start) - vrs_field_data[self.VRS_ENDS_FIELD].append(end) - vrs_field_data[self.VRS_STATES_FIELD].append(alt) - - def _get_vrs_data( - self, - record: pysam.VariantRecord, - vrs_data: dict, - assembly: str, - additional_info_fields: list[str], - vrs_attributes: bool = False, - output_pickle: bool = True, - output_vcf: bool = True, - compute_for_ref: bool = True, - require_validation: bool = True, - ) -> dict: - """Get VRS data for record's reference and alt alleles. - - :param record: A row in the VCF file - :param vrs_data: Dictionary containing the VRS object information for the VCF. - Will be mutated if `output_pickle = True` - :param assembly: The assembly used in `record` - :param additional_info_fields: Additional VRS fields to add in INFO field - :param vrs_attributes: If `True` will include VRS_Start, VRS_End, VRS_State - fields in the INFO field. If `False` will not include these fields. Only - used if `output_vcf` set to `True`. - :param output_pickle: If `True`, VRS pickle file will be output. - :param output_vcf: If `True`, annotated VCF file will be output. - :param compute_for_ref: If true, compute VRS IDs for the reference allele - :param require_validation: If `True` then validation checks must pass in - order to return a VRS object. A `DataProxyValidationError` will be raised if - validation checks fail. If `False` then VRS object will be returned even if - validation checks fail. Defaults to `True`. - :return: If `output_vcf = True`, a dictionary containing VRS Fields and list - of associated values. If `output_vcf = False`, an empty dictionary will be - returned. - """ - vrs_field_data = ( - {field: [] for field in additional_info_fields} if output_vcf else {} - ) - - # Get VRS data for reference allele - gnomad_loc = f"{record.chrom}-{record.pos}" - if compute_for_ref: - reference_allele = f"{gnomad_loc}-{record.ref}-{record.ref}" - self._get_vrs_object( - reference_allele, - vrs_data, - vrs_field_data, - assembly, - output_pickle=output_pickle, - output_vcf=output_vcf, - vrs_attributes=vrs_attributes, - require_validation=require_validation, - ) - - # Get VRS data for alts - alts = record.alts or [] - alleles = [f"{gnomad_loc}-{record.ref}-{a}" for a in [*alts]] - data = f"{record.chrom}\t{record.pos}\t{record.ref}\t{record.alts}" - for allele in alleles: - if "*" in allele: - _logger.debug("Star allele found: %s", allele) - if output_vcf: - for field in additional_info_fields: - vrs_field_data[field].append("") - else: - self._get_vrs_object( - allele, - vrs_data, - vrs_field_data, - assembly, - vrs_data_key=data, - output_pickle=output_pickle, - output_vcf=output_vcf, - vrs_attributes=vrs_attributes, - require_validation=require_validation, - ) - - return vrs_field_data diff --git a/tests/extras/test_vcf_annotation.py b/tests/extras/test_annotate_vcf.py similarity index 53% rename from tests/extras/test_vcf_annotation.py rename to tests/extras/test_annotate_vcf.py index fd034b0b..f12a0b65 100644 --- a/tests/extras/test_vcf_annotation.py +++ b/tests/extras/test_annotate_vcf.py @@ -1,30 +1,50 @@ """Ensure proper functionality of VCFAnnotator""" import gzip +import logging +import os import re from pathlib import Path import pytest -from ga4gh.vrs.dataproxy import DataProxyValidationError -from ga4gh.vrs.extras.vcf_annotation import VCFAnnotator, VCFAnnotatorException +from ga4gh.vrs.dataproxy import ( + DataProxyValidationError, + SeqRepoRESTDataProxy, + _DataProxy, +) +from ga4gh.vrs.extras.annotator.vcf import VCFAnnotator, VCFAnnotatorError -TEST_DATA_DIR = "tests/extras/data" +TEST_DATA_DIR = Path("tests/extras/data") @pytest.fixture -def vcf_annotator(): - return VCFAnnotator("rest") +def rest_dataproxy_fn_scope(): + """REST dataproxy scoped to individual test functions, rather than the entire session""" + return SeqRepoRESTDataProxy( + base_url=os.environ.get("SEQREPO_REST_URL", "http://localhost:5000/seqrepo") + ) + + +@pytest.fixture +def vcf_annotator(rest_dataproxy_fn_scope: _DataProxy): + return VCFAnnotator(rest_dataproxy_fn_scope) + + +@pytest.fixture +def input_vcf(): + return TEST_DATA_DIR / "test_vcf_input.vcf" @pytest.mark.vcr -def test_annotate_vcf_grch38_noattrs(vcf_annotator, vcr_cassette): +def test_annotate_vcf_grch38_noattrs( + vcf_annotator: VCFAnnotator, vcr_cassette, input_vcf: Path, tmp_path: Path +): vcr_cassette.allow_playback_repeats = False - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - output_vcf = f"{TEST_DATA_DIR}/test_vcf_output_grch38_noattrs.vcf.gz" - output_vrs_pkl = f"{TEST_DATA_DIR}/test_vcf_pkl_grch38_noattrs.pkl" + output_vcf = tmp_path / "test_vcf_output_grch38_noattrs.vcf.gz" + output_vrs_pkl = tmp_path / "test_vcf_pkl_grch38_noattrs.pkl" expected_vcf_no_vrs_attrs = ( - f"{TEST_DATA_DIR}/test_vcf_expected_output_no_vrs_attrs.vcf.gz" + TEST_DATA_DIR / "test_vcf_expected_output_no_vrs_attrs.vcf.gz" ) # Test GRCh38 assembly, which was used for input_vcf and no vrs attributes @@ -37,22 +57,21 @@ def test_annotate_vcf_grch38_noattrs(vcf_annotator, vcr_cassette): out_vcf_lines, expected_output_lines, strict=False ): assert actual_line == expected_line - assert Path(output_vrs_pkl).exists() + assert output_vrs_pkl.exists() assert vcr_cassette.all_played - Path(output_vcf).unlink() - Path(output_vrs_pkl).unlink() @pytest.mark.vcr -def test_annotate_vcf_grch38_attrs(vcf_annotator, vcr_cassette): +def test_annotate_vcf_grch38_attrs( + vcf_annotator: VCFAnnotator, vcr_cassette, input_vcf: Path, tmp_path: Path +): vcr_cassette.allow_playback_repeats = False - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - output_vcf = f"{TEST_DATA_DIR}/test_vcf_output_grch38_attrs.vcf.gz" - output_vrs_pkl = f"{TEST_DATA_DIR}/test_vcf_pkl_grch38_attrs.pkl" - expected_vcf = f"{TEST_DATA_DIR}/test_vcf_expected_output.vcf.gz" + output_vcf = tmp_path / "test_vcf_output_grch38_attrs.vcf.gz" + output_vrs_pkl = tmp_path / "test_vcf_pkl_grch38_attrs.pkl" + expected_vcf = TEST_DATA_DIR / "test_vcf_expected_output.vcf.gz" # Test GRCh38 assembly, which was used for input_vcf and vrs attributes - vcf_annotator.annotate(input_vcf, output_vcf, output_vrs_pkl, vrs_attributes=True) + vcf_annotator.annotate(input_vcf, output_vcf, output_vrs_pkl, incl_vrs_attrs=True) with gzip.open(output_vcf, "rt") as out_vcf: out_vcf_lines = out_vcf.readlines() with gzip.open(expected_vcf, "rt") as expected_output: @@ -61,27 +80,26 @@ def test_annotate_vcf_grch38_attrs(vcf_annotator, vcr_cassette): out_vcf_lines, expected_output_lines, strict=False ): assert actual_line == expected_line - assert Path(output_vrs_pkl).exists() + assert output_vrs_pkl.exists() assert vcr_cassette.all_played - Path(output_vcf).unlink() - Path(output_vrs_pkl).unlink() @pytest.mark.vcr -def test_annotate_vcf_grch38_attrs_altsonly(vcf_annotator, vcr_cassette): +def test_annotate_vcf_grch38_attrs_altsonly( + vcf_annotator: VCFAnnotator, vcr_cassette, input_vcf: Path, tmp_path: Path +): vcr_cassette.allow_playback_repeats = False - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - output_vcf = f"{TEST_DATA_DIR}/test_vcf_output_grch38_attrs_altsonly.vcf.gz" - output_vrs_pkl = f"{TEST_DATA_DIR}/test_vcf_pkl_grch38_attrs_altsonly.pkl" - expected_altsonly_vcf = f"{TEST_DATA_DIR}/test_vcf_expected_altsonly_output.vcf.gz" + output_vcf = tmp_path / "test_vcf_output_grch38_attrs_altsonly.vcf.gz" + output_vrs_pkl = tmp_path / "test_vcf_pkl_grch38_attrs_altsonly.pkl" + expected_altsonly_vcf = TEST_DATA_DIR / "test_vcf_expected_altsonly_output.vcf.gz" # Test GRCh38 assembly with VRS computed for ALTs only, which was used for input_vcf and vrs attributes vcf_annotator.annotate( input_vcf, output_vcf, output_vrs_pkl, - vrs_attributes=True, - compute_for_ref=False, + incl_vrs_attrs=True, + incl_ref_allele=False, ) with gzip.open(output_vcf, "rt") as out_vcf: out_vcf_lines = out_vcf.readlines() @@ -91,23 +109,22 @@ def test_annotate_vcf_grch38_attrs_altsonly(vcf_annotator, vcr_cassette): out_vcf_lines, expected_output_lines, strict=False ): assert actual_line == expected_line - assert Path(output_vrs_pkl).exists() + assert output_vrs_pkl.exists() assert vcr_cassette.all_played - Path(output_vcf).unlink() - Path(output_vrs_pkl).unlink() @pytest.mark.vcr -def test_annotate_vcf_grch37_attrs(vcf_annotator, vcr_cassette): +def test_annotate_vcf_grch37_attrs( + vcf_annotator: VCFAnnotator, vcr_cassette, input_vcf: Path, tmp_path: Path +): vcr_cassette.allow_playback_repeats = False - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - output_vcf = f"{TEST_DATA_DIR}/test_vcf_output_grch37_attrs.vcf.gz" - output_vrs_pkl = f"{TEST_DATA_DIR}/test_vcf_pkl_grch37_attrs.pkl" - expected_vcf = f"{TEST_DATA_DIR}/test_vcf_expected_output.vcf.gz" + output_vcf = tmp_path / "test_vcf_output_grch37_attrs.vcf.gz" + output_vrs_pkl = tmp_path / "test_vcf_pkl_grch37_attrs.pkl" + expected_vcf = TEST_DATA_DIR / "test_vcf_expected_output.vcf.gz" # Test GRCh37 assembly, which was not used for input_vcf vcf_annotator.annotate( - input_vcf, output_vcf, output_vrs_pkl, vrs_attributes=True, assembly="GRCh37" + input_vcf, output_vcf, output_vrs_pkl, incl_vrs_attrs=True, assembly="GRCh37" ) with gzip.open(output_vcf, "rt") as out_vcf: out_vcf_lines = out_vcf.readlines() @@ -116,37 +133,36 @@ def test_annotate_vcf_grch37_attrs(vcf_annotator, vcr_cassette): assert out_vcf_lines != expected_output_lines assert Path(output_vrs_pkl).exists() assert vcr_cassette.all_played - Path(output_vcf).unlink() - Path(output_vrs_pkl).unlink() @pytest.mark.vcr -def test_annotate_vcf_pickle_only(vcf_annotator, vcr_cassette): +def test_annotate_vcf_pickle_only( + vcf_annotator: VCFAnnotator, vcr_cassette, input_vcf: Path, tmp_path: Path +): vcr_cassette.allow_playback_repeats = False - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - output_vcf = f"{TEST_DATA_DIR}/test_vcf_output_pickle_only.vcf.gz" - output_vrs_pkl = f"{TEST_DATA_DIR}/test_vcf_pkl_pickle_only.pkl" + output_vcf = tmp_path / "test_vcf_output_pickle_only.vcf.gz" + output_vrs_pkl = tmp_path / "test_vcf_pkl_pickle_only.pkl" # Test only pickle output vcf_annotator.annotate( - input_vcf, vrs_pickle_out=output_vrs_pkl, vrs_attributes=True + input_vcf, output_pkl_path=output_vrs_pkl, incl_vrs_attrs=True ) assert Path(output_vrs_pkl).exists() assert not Path(output_vcf).exists() assert vcr_cassette.all_played - Path(output_vrs_pkl).unlink() @pytest.mark.vcr -def test_annotate_vcf_vcf_only(vcf_annotator, vcr_cassette): +def test_annotate_vcf_vcf_only( + vcf_annotator: VCFAnnotator, vcr_cassette, input_vcf: Path, tmp_path: Path +): vcr_cassette.allow_playback_repeats = False - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - output_vcf = f"{TEST_DATA_DIR}/test_vcf_output_vcf_only.vcf.gz" - output_vrs_pkl = f"{TEST_DATA_DIR}/test_vcf_pkl_vcf_only.pkl" - expected_vcf = f"{TEST_DATA_DIR}/test_vcf_expected_output.vcf.gz" + output_vcf = tmp_path / "test_vcf_output_vcf_only.vcf.gz" + output_vrs_pkl = tmp_path / "test_vcf_pkl_vcf_only.pkl" + expected_vcf = TEST_DATA_DIR / "test_vcf_expected_output.vcf.gz" # Test only VCF output - vcf_annotator.annotate(input_vcf, vcf_out=output_vcf, vrs_attributes=True) + vcf_annotator.annotate(input_vcf, output_vcf_path=output_vcf, incl_vrs_attrs=True) with gzip.open(output_vcf, "rt") as out_vcf: out_vcf_lines = out_vcf.readlines() with gzip.open(expected_vcf, "rt") as expected_output: @@ -154,50 +170,51 @@ def test_annotate_vcf_vcf_only(vcf_annotator, vcr_cassette): assert out_vcf_lines == expected_output_lines assert vcr_cassette.all_played assert not Path(output_vrs_pkl).exists() - Path(output_vcf).unlink() -def test_annotate_vcf_input_validation(vcf_annotator): - input_vcf = f"{TEST_DATA_DIR}/test_vcf_input.vcf" - - with pytest.raises(VCFAnnotatorException) as e: +def test_annotate_vcf_input_validation(vcf_annotator: VCFAnnotator, input_vcf: Path): + with pytest.raises( + VCFAnnotatorError, + match="Must provide one of: `output_vcf_path` or `output_pkl_path`", + ): vcf_annotator.annotate(input_vcf) - assert str(e.value) == "Must provide one of: `vcf_out` or `vrs_pickle_out`" @pytest.mark.vcr -def test_get_vrs_object_invalid_input(vcf_annotator, caplog): +def test_get_vrs_object_invalid_input(vcf_annotator: VCFAnnotator, caplog): """Test that _get_vrs_object method works as expected with invalid input""" + caplog.set_level(logging.DEBUG) + # No CHROM - vcf_annotator._get_vrs_object(".-140753336-A-T", {}, [], "GRCh38") + vcf_annotator._process_allele(".-140753336-A-T", {}, {}, "GRCh38") assert "KeyError when getting refget accession: GRCh38:." in caplog.text # No POS - vcf_annotator._get_vrs_object("7-.-A-T", {}, [], "GRCh38") + vcf_annotator._process_allele("7-.-A-T", {}, {}, "GRCh38") assert "None was returned when translating 7-.-A-T from gnomad" in caplog.text # No REF - vcf_annotator._get_vrs_object("7-140753336-.-T", {}, [], "GRCh38") + vcf_annotator._process_allele("7-140753336-.-T", {}, {}, "GRCh38") assert ( "None was returned when translating 7-140753336-.-T from gnomad" in caplog.text ) # No ALT - vcf_annotator._get_vrs_object("7-140753336-A-.", {}, [], "GRCh38") + vcf_annotator._process_allele("7-140753336-A-.", {}, {}, "GRCh38") assert ( "None was returned when translating 7-140753336-A-. from gnomad" in caplog.text ) # Invalid ref, but not requiring validation checks so no error is raised - vcf_annotator._get_vrs_object( - "7-140753336-G-T", {}, [], "GRCh38", require_validation=False + vcf_annotator._process_allele( + "7-140753336-G-T", {}, {}, "GRCh38", require_validation=False ) assert "" in caplog.text # Invalid ref, but requiring validation checks so an error is raised invalid_ref_seq_msg = "Expected reference sequence C on GRCh38:7 at positions (140753335, 140753336) but found A" with pytest.raises(DataProxyValidationError, match=re.escape(invalid_ref_seq_msg)): - vcf_annotator._get_vrs_object( - "7-140753336-C-T", {}, [], "GRCh38", require_validation=True + vcf_annotator._process_allele( + "7-140753336-C-T", {}, {}, "GRCh38", require_validation=True ) assert invalid_ref_seq_msg in caplog.text