diff --git a/idc_index/index.py b/idc_index/index.py index c2bbc7b..24ed86f 100644 --- a/idc_index/index.py +++ b/idc_index/index.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json import logging import os import re @@ -25,6 +26,7 @@ aws_endpoint_url = "https://s3.amazonaws.com" gcp_endpoint_url = "https://storage.googleapis.com" asset_endpoint_url = f"https://github.com/ImagingDataCommons/idc-index-data/releases/download/{idc_index_data.__version__}" +github_api_url = f"https://api.github.com/repos/ImagingDataCommons/idc-index-data/releases/tags/{idc_index_data.__version__}" logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO) logger = logging.getLogger(__name__) @@ -125,40 +127,14 @@ def __init__(self): ) self.clinical_data_dir = None - self.indices_overview = { - "index": { - "description": "Main index containing one row per DICOM series.", - "installed": True, - "url": None, - "file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH, - }, - "prior_versions_index": { - "description": "index containing one row per DICOM series from all previous IDC versions that are not in current version.", - "installed": True, - "url": None, - "file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH, - }, - "sm_index": { - "description": "DICOM Slide Microscopy series-level index.", - "installed": False, - "url": f"{asset_endpoint_url}/sm_index.parquet", - "file_path": None, - }, - "sm_instance_index": { - "description": "DICOM Slide Microscopy instance-level index.", - "installed": False, - "url": f"{asset_endpoint_url}/sm_instance_index.parquet", - "file_path": None, - }, - "clinical_index": { - "description": "Index of clinical data accompanying the available images.", - "installed": False, - "url": f"{asset_endpoint_url}/clinical_index.parquet", - "file_path": None, - }, - } + # Cache for index schemas fetched from release assets + self._index_schemas: dict = {} + + # Initialize indices overview with automatic discovery + self.indices_overview = self._discover_available_indices() - # these will point to the dataframes containing the respective indices, once installed + # These will point to the dataframes containing the respective indices, once installed + # Initialize as None to allow checking before they are fetched self.sm_index = None self.sm_instance_index = None self.clinical_index = None @@ -182,6 +158,284 @@ def __init__(self): # ... and check it can be executed subprocess.check_call([self.s5cmdPath, "--help"], stdout=subprocess.DEVNULL) + def _discover_available_indices(self, refresh: bool = False) -> dict: + """Discover available index tables from the idc-index-data GitHub release assets. + + This method discovers available index parquet files by querying the GitHub + releases API to dynamically find all available indices. Descriptions are + populated from the accompanying JSON schema files. + + Schemas are cached to disk in the indices_data_dir. On subsequent calls, + schemas are loaded from disk unless the idc-index-data version changes or + refresh is requested. + + Args: + refresh: If True, forces a refresh of the cached index list and schemas. + If False, loads from disk cache if available. + + Returns: + dict: A dictionary of available indices with their descriptions, URLs, + installation status, and file paths. + """ + # Return cached data if available and refresh is not requested + if ( + not refresh + and hasattr(self, "indices_overview") + and self.indices_overview is not None + ): + return self.indices_overview + + # Try to load from disk cache first (if not forcing refresh) + if not refresh: + cached_data = self._load_indices_cache_from_disk() + if cached_data: + logger.debug("Loaded indices overview from disk cache") + # Populate the in-memory schema cache + if "schemas" in cached_data: + self._index_schemas = cached_data["schemas"] + return cached_data["indices"] + + # Mapping of asset filenames to canonical index names for bundled indices + bundled_indices = { + "idc_index": { + "canonical_name": "index", + "file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH, + }, + "prior_versions_index": { + "canonical_name": "prior_versions_index", + "file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH, + }, + } + + indices = {} + + # Discover indices from the GitHub release API + try: + response = requests.get(github_api_url, timeout=30) + if response.status_code == 200: + release_data = response.json() + assets = release_data.get("assets", []) + + # Find all parquet files in the release assets + parquet_assets = { + a["name"]: a["browser_download_url"] + for a in assets + if a["name"].endswith(".parquet") + } + + # Find all json schema files in the release assets + json_assets = { + a["name"]: a["browser_download_url"] + for a in assets + if a["name"].endswith(".json") + } + + # Process all discovered parquet files + for parquet_name, parquet_url in parquet_assets.items(): + # Extract index name from filename (e.g., "sm_index.parquet" -> "sm_index") + asset_index_name = parquet_name.replace(".parquet", "") + + # Check if this is a bundled index + if asset_index_name in bundled_indices: + bundled_info = bundled_indices[asset_index_name] + index_name = bundled_info["canonical_name"] + installed = True + file_path = bundled_info["file_path"] + url = None # Bundled indices don't need URL + else: + index_name = asset_index_name + local_path = os.path.join( + self.indices_data_dir, f"{index_name}.parquet" + ) + installed = os.path.exists(local_path) + file_path = local_path if installed else None + url = parquet_url + + # Determine description from schema file and cache the full schema + description = "" + schema_name = f"{asset_index_name}.json" + if schema_name in json_assets: + schema = self._fetch_index_schema_from_url( + json_assets[schema_name] + ) + if schema: + description = schema.get("table_description", "") + # Cache the full schema in memory + self._index_schemas[index_name] = schema + + indices[index_name] = { + "description": description, + "installed": installed, + "url": url, + "file_path": str(file_path) if file_path else None, + } + + else: + logger.warning( + f"GitHub API returned status {response.status_code}. " + "Unable to discover available indices." + ) + except requests.exceptions.RequestException as e: + logger.warning( + f"GitHub API request failed: {e}. Unable to discover available indices." + ) + + # If no indices were discovered, add at least the bundled indices with default descriptions + if not indices: + indices = { + "index": { + "description": "Main index containing one row per DICOM series.", + "installed": True, + "url": None, + "file_path": str(idc_index_data.IDC_INDEX_PARQUET_FILEPATH), + }, + "prior_versions_index": { + "description": "Index containing one row per DICOM series from all previous IDC versions that are not in current version.", + "installed": True, + "url": None, + "file_path": str( + idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH + ), + }, + } + + # Try to fetch schemas for bundled indices even when API fails + for index_name, schema_filename in [ + ("index", "idc_index.json"), + ("prior_versions_index", "prior_versions_index.json"), + ]: + schema_url = f"{asset_endpoint_url}/{schema_filename}" + schema = self._fetch_index_schema_from_url(schema_url) + if schema: + indices[index_name]["description"] = schema.get( + "table_description", indices[index_name]["description"] + ) + self._index_schemas[index_name] = schema + + # Save to disk cache + self._save_indices_cache_to_disk(indices, self._index_schemas) + + return indices + + def _load_indices_cache_from_disk(self) -> dict | None: + """Load cached indices overview and schemas from disk. + + Returns: + dict or None: Dictionary containing 'indices' and 'schemas' if cache is valid, + None otherwise. + """ + cache_file = os.path.join(self.indices_data_dir, "indices_cache.json") + + if not os.path.exists(cache_file): + return None + + try: + with open(cache_file) as f: + cache_data = json.load(f) + + # Verify cache is for current version + if cache_data.get("version") != idc_index_data.__version__: + logger.debug( + f"Cache version mismatch: {cache_data.get('version')} != {idc_index_data.__version__}" + ) + return None + + return { + "indices": cache_data.get("indices", {}), + "schemas": cache_data.get("schemas", {}), + } + except (json.JSONDecodeError, OSError) as e: + logger.debug(f"Failed to load indices cache from disk: {e}") + return None + + def _save_indices_cache_to_disk(self, indices: dict, schemas: dict) -> None: + """Save indices overview and schemas to disk cache. + + Args: + indices: Dictionary of indices overview + schemas: Dictionary of index schemas + """ + cache_file = os.path.join(self.indices_data_dir, "indices_cache.json") + + try: + os.makedirs(self.indices_data_dir, exist_ok=True) + + cache_data = { + "version": idc_index_data.__version__, + "indices": indices, + "schemas": schemas, + } + + with open(cache_file, "w") as f: + json.dump(cache_data, f, indent=2) + + logger.debug(f"Saved indices cache to {cache_file}") + except (OSError, TypeError) as e: + logger.warning(f"Failed to save indices cache to disk: {e}") + + def _fetch_index_schema_from_url(self, url: str) -> dict | None: + """Fetch an index schema JSON from a URL. + + Args: + url: The URL to fetch the schema from. + + Returns: + dict or None: The parsed schema dictionary, or None if fetching fails. + """ + try: + response = requests.get(url, timeout=30) + if response.status_code == 200: + return response.json() + except (requests.exceptions.RequestException, json.JSONDecodeError) as e: + logger.debug(f"Failed to fetch schema from {url}: {e}") + return None + + def refresh_indices_overview(self) -> dict: + """Refresh the list of available indices by re-querying the GitHub release. + + This method forces a refresh of the indices_overview dictionary by querying + the GitHub releases API again, even if a cached version is available. + + Returns: + dict: The refreshed indices_overview dictionary. + """ + self.indices_overview = self._discover_available_indices(refresh=True) + return self.indices_overview + + def get_index_schema(self, index_name: str, refresh: bool = False) -> dict | None: + """Get the full schema for an index, including column definitions. + + This method returns the JSON schema for the specified index. The schema + includes table_description and column definitions with name, type, mode, + and description. Schemas are cached in memory and on disk during discovery. + + Args: + index_name: The name of the index to get the schema for. + refresh: If True, forces a refresh by re-discovering all indices. + + Returns: + dict or None: The schema dictionary containing 'table_description' and + 'columns', or None if the schema is not available. + """ + if index_name not in self.indices_overview: + logger.error(f"Index {index_name} is not available.") + return None + + # If refresh is requested, re-discover indices to refresh all schemas + if refresh: + self.indices_overview = self._discover_available_indices(refresh=True) + + # Return cached schema if available + if index_name in self._index_schemas: + return self._index_schemas[index_name] + + # Schema was not cached during discovery (shouldn't happen in normal operation) + logger.warning( + f"Schema for {index_name} not available in cache. " + "This may indicate the index was discovered but schema fetch failed." + ) + return None + @staticmethod def _replace_aws_with_gcp_buckets(dataframe, column_name): # mapping from AWS to GCS buckets is fixed @@ -368,11 +622,33 @@ def fetch_index(self, index_name) -> None: """ if index_name not in self.indices_overview: logger.error(f"Index {index_name} is not available and can not be fetched.") - elif self.indices_overview[index_name]["installed"]: - logger.warning( - f"Index {index_name} already installed and will not be fetched again." - ) - else: + return + if self.indices_overview[index_name]["installed"]: + # Index is already installed, load it from disk if not already loaded + if not hasattr(self, index_name) or getattr(self, index_name) is None: + filepath = self.indices_overview[index_name]["file_path"] + if filepath and os.path.exists(filepath): + logger.info( + f"Index {index_name} already installed, loading from {filepath}" + ) + index_table = pd.read_parquet(filepath) + setattr(self, index_name, index_table) + else: + logger.warning( + f"Index {index_name} marked as installed but file not found. Re-downloading." + ) + # Reset installed status to allow download + self.indices_overview[index_name]["installed"] = False + self.indices_overview[index_name]["file_path"] = None + # Fall through to the download logic below instead of recursive call + else: + logger.warning( + f"Index {index_name} already installed and will not be fetched again." + ) + return + + # Download the index if not installed + if not self.indices_overview[index_name]["installed"]: logger.info("Fetching index %s", index_name) response = requests.get( self.indices_overview[index_name]["url"], timeout=30 diff --git a/tests/idcindex.py b/tests/idcindex.py index ba12004..4007aa8 100644 --- a/tests/idcindex.py +++ b/tests/idcindex.py @@ -563,10 +563,69 @@ def test_list_indices(self): i = IDCClient() assert i.indices_overview # assert that dict was created + def test_discovered_indices_have_descriptions(self): + """Test that discovered indices have descriptions from schema files.""" + i = IDCClient() + # All indices should have descriptions + for index_name, index_info in i.indices_overview.items(): + assert "description" in index_info + # Most indices should have non-empty descriptions from schema files + # (though some may be empty if the schema fetch fails) + + def test_get_index_schema(self): + """Test that get_index_schema returns valid schema data.""" + i = IDCClient() + # Test getting schema for the main index (always available as bundled) + schema = i.get_index_schema("index") + assert schema is not None + assert "table_description" in schema + assert "columns" in schema + assert len(schema["columns"]) > 0 + + # Test getting schema for a remote index if available + # (may not be available if GitHub API is rate-limited) + if "sm_index" in i.indices_overview: + schema = i.get_index_schema("sm_index") + assert schema is not None + assert "table_description" in schema + assert "columns" in schema + + def test_get_index_schema_caching(self): + """Test that schemas are cached after first fetch.""" + i = IDCClient() + # First fetch should populate cache for the bundled index + schema1 = i.get_index_schema("index") + assert "index" in i._index_schemas + + # Second fetch should use cache + schema2 = i.get_index_schema("index") + assert schema1 is schema2 # Same object from cache + + # Refresh should fetch new data + schema3 = i.get_index_schema("index", refresh=True) + assert schema3 is not None + + def test_get_index_schema_invalid_index(self): + """Test that get_index_schema returns None for invalid index.""" + i = IDCClient() + schema = i.get_index_schema("nonexistent_index") + assert schema is None + + def test_refresh_indices_overview(self): + """Test that refresh_indices_overview updates the indices list.""" + i = IDCClient() + original_count = len(i.indices_overview) + # Refresh should return updated indices + refreshed = i.refresh_indices_overview() + assert refreshed is not None + assert len(refreshed) == original_count + def test_fetch_index(self): i = IDCClient() - assert i.indices_overview["sm_index"]["installed"] is False + # Check if sm_index is discovered + assert "sm_index" in i.indices_overview i.fetch_index("sm_index") + # After fetch_index, it should be installed assert i.indices_overview["sm_index"]["installed"] is True assert hasattr(i, "sm_index") @@ -578,8 +637,10 @@ def test_indices_urls(self): def test_clinical_index_install(self): i = IDCClient() - assert i.indices_overview["clinical_index"]["installed"] is False + # Check if clinical_index is discovered + assert "clinical_index" in i.indices_overview i.fetch_index("clinical_index") + # After fetch_index, it should be installed assert i.indices_overview["clinical_index"]["installed"] is True assert len(os.listdir(i.clinical_data_dir)) > 0