-
Notifications
You must be signed in to change notification settings - Fork 9
Support discovery of index tables available from idc-index-data #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
b61636e
8f20347
33cdb8f
de287b9
d693536
41d7785
ee35afb
8774cb6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging | ||
| import os | ||
| import re | ||
|
|
@@ -25,6 +26,7 @@ | |
| aws_endpoint_url = "https://s3.amazonaws.com" | ||
| gcp_endpoint_url = "https://storage.googleapis.com" | ||
| asset_endpoint_url = f"https://github.com/ImagingDataCommons/idc-index-data/releases/download/{idc_index_data.__version__}" | ||
| github_api_url = f"https://api.github.com/repos/ImagingDataCommons/idc-index-data/releases/tags/{idc_index_data.__version__}" | ||
|
|
||
| logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -125,40 +127,14 @@ def __init__(self): | |
| ) | ||
| self.clinical_data_dir = None | ||
|
|
||
| self.indices_overview = { | ||
| "index": { | ||
| "description": "Main index containing one row per DICOM series.", | ||
| "installed": True, | ||
| "url": None, | ||
| "file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH, | ||
| }, | ||
| "prior_versions_index": { | ||
| "description": "index containing one row per DICOM series from all previous IDC versions that are not in current version.", | ||
| "installed": True, | ||
| "url": None, | ||
| "file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH, | ||
| }, | ||
| "sm_index": { | ||
| "description": "DICOM Slide Microscopy series-level index.", | ||
| "installed": False, | ||
| "url": f"{asset_endpoint_url}/sm_index.parquet", | ||
| "file_path": None, | ||
| }, | ||
| "sm_instance_index": { | ||
| "description": "DICOM Slide Microscopy instance-level index.", | ||
| "installed": False, | ||
| "url": f"{asset_endpoint_url}/sm_instance_index.parquet", | ||
| "file_path": None, | ||
| }, | ||
| "clinical_index": { | ||
| "description": "Index of clinical data accompanying the available images.", | ||
| "installed": False, | ||
| "url": f"{asset_endpoint_url}/clinical_index.parquet", | ||
| "file_path": None, | ||
| }, | ||
| } | ||
| # Cache for index schemas fetched from release assets | ||
| self._index_schemas: dict = {} | ||
|
|
||
| # these will point to the dataframes containing the respective indices, once installed | ||
| # Initialize indices overview with automatic discovery | ||
| self.indices_overview = self._discover_available_indices() | ||
|
|
||
| # These will point to the dataframes containing the respective indices, once installed | ||
| # Initialize as None to allow checking before they are fetched | ||
| self.sm_index = None | ||
| self.sm_instance_index = None | ||
| self.clinical_index = None | ||
|
|
@@ -182,6 +158,196 @@ def __init__(self): | |
| # ... and check it can be executed | ||
| subprocess.check_call([self.s5cmdPath, "--help"], stdout=subprocess.DEVNULL) | ||
|
|
||
| def _discover_available_indices(self, refresh: bool = False) -> dict: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve this function to add disk caching. One the first init, schema should be fetched and saved to disk. On subsequent calls, it should be loaded from disk instead of re-fetching. Cache should be matched against the version of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented disk caching in commit ee35afb. Added
|
||
| """Discover available index tables from the idc-index-data GitHub release assets. | ||
|
|
||
| This method discovers available index parquet files by querying the GitHub | ||
| releases API to dynamically find all available indices. Descriptions are | ||
| populated from the accompanying JSON schema files. | ||
|
|
||
| Args: | ||
| refresh: If True, forces a refresh of the cached index list. If False, | ||
| returns the cached list if available. | ||
|
|
||
| Returns: | ||
| dict: A dictionary of available indices with their descriptions, URLs, | ||
| installation status, and file paths. | ||
| """ | ||
| # Return cached data if available and refresh is not requested | ||
| if ( | ||
| not refresh | ||
| and hasattr(self, "indices_overview") | ||
| and self.indices_overview is not None | ||
| ): | ||
| return self.indices_overview | ||
|
|
||
| # Mapping of asset filenames to canonical index names for bundled indices | ||
| bundled_indices = { | ||
| "idc_index": { | ||
| "canonical_name": "index", | ||
| "file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH, | ||
| }, | ||
| "prior_versions_index": { | ||
| "canonical_name": "prior_versions_index", | ||
| "file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH, | ||
| }, | ||
| } | ||
|
|
||
| indices = {} | ||
|
|
||
| # Discover indices from the GitHub release API | ||
| try: | ||
| response = requests.get(github_api_url, timeout=30) | ||
| if response.status_code == 200: | ||
| release_data = response.json() | ||
| assets = release_data.get("assets", []) | ||
|
|
||
| # Find all parquet files in the release assets | ||
| parquet_assets = { | ||
| a["name"]: a["browser_download_url"] | ||
| for a in assets | ||
| if a["name"].endswith(".parquet") | ||
| } | ||
|
|
||
| # Find all json schema files in the release assets | ||
| json_assets = { | ||
| a["name"]: a["browser_download_url"] | ||
| for a in assets | ||
| if a["name"].endswith(".json") | ||
| } | ||
|
|
||
| # Process all discovered parquet files | ||
| for parquet_name, parquet_url in parquet_assets.items(): | ||
| # Extract index name from filename (e.g., "sm_index.parquet" -> "sm_index") | ||
| asset_index_name = parquet_name.replace(".parquet", "") | ||
|
|
||
| # Check if this is a bundled index | ||
| if asset_index_name in bundled_indices: | ||
| bundled_info = bundled_indices[asset_index_name] | ||
| index_name = bundled_info["canonical_name"] | ||
| installed = True | ||
| file_path = bundled_info["file_path"] | ||
| url = None # Bundled indices don't need URL | ||
| else: | ||
| index_name = asset_index_name | ||
| local_path = os.path.join( | ||
| self.indices_data_dir, f"{index_name}.parquet" | ||
| ) | ||
| installed = os.path.exists(local_path) | ||
| file_path = local_path if installed else None | ||
| url = parquet_url | ||
|
|
||
| # Determine description from schema file | ||
| description = "" | ||
| schema_name = f"{asset_index_name}.json" | ||
| if schema_name in json_assets: | ||
| schema = self._fetch_index_schema_from_url( | ||
| json_assets[schema_name] | ||
| ) | ||
| if schema: | ||
| description = schema.get("table_description", "") | ||
|
|
||
| indices[index_name] = { | ||
| "description": description, | ||
| "installed": installed, | ||
| "url": url, | ||
| "file_path": file_path, | ||
| } | ||
|
|
||
| else: | ||
| logger.warning( | ||
| f"GitHub API returned status {response.status_code}. " | ||
| "Unable to discover available indices." | ||
| ) | ||
| except requests.exceptions.RequestException as e: | ||
| logger.warning( | ||
| f"GitHub API request failed: {e}. Unable to discover available indices." | ||
| ) | ||
|
|
||
| # If no indices were discovered, add at least the bundled indices with default descriptions | ||
| if not indices: | ||
| indices = { | ||
| "index": { | ||
| "description": "Main index containing one row per DICOM series.", | ||
| "installed": True, | ||
| "url": None, | ||
| "file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH, | ||
| }, | ||
| "prior_versions_index": { | ||
| "description": "Index containing one row per DICOM series from all previous IDC versions that are not in current version.", | ||
| "installed": True, | ||
| "url": None, | ||
| "file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH, | ||
| }, | ||
| } | ||
|
|
||
| return indices | ||
|
|
||
| def _fetch_index_schema_from_url(self, url: str) -> dict | None: | ||
| """Fetch an index schema JSON from a URL. | ||
|
|
||
| Args: | ||
| url: The URL to fetch the schema from. | ||
|
|
||
| Returns: | ||
| dict or None: The parsed schema dictionary, or None if fetching fails. | ||
| """ | ||
| try: | ||
| response = requests.get(url, timeout=30) | ||
| if response.status_code == 200: | ||
| return response.json() | ||
| except (requests.exceptions.RequestException, json.JSONDecodeError) as e: | ||
| logger.debug(f"Failed to fetch schema from {url}: {e}") | ||
| return None | ||
|
|
||
| def refresh_indices_overview(self) -> dict: | ||
| """Refresh the list of available indices by re-querying the GitHub release. | ||
|
|
||
| This method forces a refresh of the indices_overview dictionary by querying | ||
| the GitHub releases API again, even if a cached version is available. | ||
|
|
||
| Returns: | ||
| dict: The refreshed indices_overview dictionary. | ||
| """ | ||
| self.indices_overview = self._discover_available_indices(refresh=True) | ||
| return self.indices_overview | ||
|
|
||
| def get_index_schema(self, index_name: str, refresh: bool = False) -> dict | None: | ||
| """Get the full schema for an index, including column definitions. | ||
|
|
||
| This method fetches the JSON schema file for the specified index from the | ||
| idc-index-data release assets. The schema includes table_description and | ||
| column definitions with name, type, mode, and description. | ||
|
|
||
| Args: | ||
| index_name: The name of the index to get the schema for. | ||
| refresh: If True, forces a refresh of the cached schema. | ||
|
|
||
| Returns: | ||
| dict or None: The schema dictionary containing 'table_description' and | ||
| 'columns', or None if the schema could not be fetched. | ||
| """ | ||
| if index_name not in self.indices_overview: | ||
| logger.error(f"Index {index_name} is not available.") | ||
| return None | ||
|
|
||
| # Return cached schema if available and refresh is not requested | ||
| if not refresh and index_name in self._index_schemas: | ||
| return self._index_schemas[index_name] | ||
|
|
||
| # Construct the schema URL | ||
| schema_url = f"{asset_endpoint_url}/{index_name}.json" | ||
|
|
||
| # Special handling for the main index - it uses idc_index.json | ||
| if index_name == "index": | ||
| schema_url = f"{asset_endpoint_url}/idc_index.json" | ||
|
|
||
| schema = self._fetch_index_schema_from_url(schema_url) | ||
|
||
| if schema: | ||
| self._index_schemas[index_name] = schema | ||
|
|
||
| return schema | ||
|
|
||
| @staticmethod | ||
| def _replace_aws_with_gcp_buckets(dataframe, column_name): | ||
| # mapping from AWS to GCS buckets is fixed | ||
|
|
@@ -368,11 +534,33 @@ def fetch_index(self, index_name) -> None: | |
| """ | ||
| if index_name not in self.indices_overview: | ||
| logger.error(f"Index {index_name} is not available and can not be fetched.") | ||
| elif self.indices_overview[index_name]["installed"]: | ||
| logger.warning( | ||
| f"Index {index_name} already installed and will not be fetched again." | ||
| ) | ||
| else: | ||
| return | ||
| if self.indices_overview[index_name]["installed"]: | ||
| # Index is already installed, load it from disk if not already loaded | ||
| if not hasattr(self, index_name) or getattr(self, index_name) is None: | ||
| filepath = self.indices_overview[index_name]["file_path"] | ||
| if filepath and os.path.exists(filepath): | ||
| logger.info( | ||
| f"Index {index_name} already installed, loading from {filepath}" | ||
| ) | ||
| index_table = pd.read_parquet(filepath) | ||
| setattr(self, index_name, index_table) | ||
| else: | ||
| logger.warning( | ||
| f"Index {index_name} marked as installed but file not found. Re-downloading." | ||
| ) | ||
| # Reset installed status to allow download | ||
| self.indices_overview[index_name]["installed"] = False | ||
| self.indices_overview[index_name]["file_path"] = None | ||
| # Fall through to the download logic below instead of recursive call | ||
| else: | ||
| logger.warning( | ||
| f"Index {index_name} already installed and will not be fetched again." | ||
| ) | ||
| return | ||
|
|
||
| # Download the index if not installed | ||
| if not self.indices_overview[index_name]["installed"]: | ||
| logger.info("Fetching index %s", index_name) | ||
| response = requests.get( | ||
| self.indices_overview[index_name]["url"], timeout=30 | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot Endpoint URL is switched from github.com to api.github.com. Is there a rate limit? Is rate limit applied to both endpoints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, both endpoints have rate limits for unauthenticated access:
The implementation handles this by:
This fallback approach ensures the package works even when rate limited - users get the known indices with descriptions fetched directly rather than failing entirely.