Skip to content
264 changes: 226 additions & 38 deletions idc_index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import json
import logging
import os
import re
Expand All @@ -25,6 +26,7 @@
aws_endpoint_url = "https://s3.amazonaws.com"
gcp_endpoint_url = "https://storage.googleapis.com"
asset_endpoint_url = f"https://github.com/ImagingDataCommons/idc-index-data/releases/download/{idc_index_data.__version__}"
github_api_url = f"https://api.github.com/repos/ImagingDataCommons/idc-index-data/releases/tags/{idc_index_data.__version__}"
Copy link
Member

@fedorov fedorov Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot Endpoint URL is switched from github.com to api.github.com. Is there a rate limit? Is rate limit applied to both endpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both endpoints have rate limits for unauthenticated access:

  1. api.github.com (REST API): 60 requests/hour for unauthenticated users
  2. github.com/.../releases/download/ (direct download): No strict rate limit, but subject to abuse detection

The implementation handles this by:

  • First trying the API endpoint to discover all available indices dynamically
  • If API returns 403 (rate limited), falling back to a known list of indices while still fetching descriptions directly from the schema JSON files via the download endpoint

This fallback approach ensures the package works even when rate limited - users get the known indices with descriptions fetched directly rather than failing entirely.


logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -125,40 +127,14 @@ def __init__(self):
)
self.clinical_data_dir = None

self.indices_overview = {
"index": {
"description": "Main index containing one row per DICOM series.",
"installed": True,
"url": None,
"file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH,
},
"prior_versions_index": {
"description": "index containing one row per DICOM series from all previous IDC versions that are not in current version.",
"installed": True,
"url": None,
"file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH,
},
"sm_index": {
"description": "DICOM Slide Microscopy series-level index.",
"installed": False,
"url": f"{asset_endpoint_url}/sm_index.parquet",
"file_path": None,
},
"sm_instance_index": {
"description": "DICOM Slide Microscopy instance-level index.",
"installed": False,
"url": f"{asset_endpoint_url}/sm_instance_index.parquet",
"file_path": None,
},
"clinical_index": {
"description": "Index of clinical data accompanying the available images.",
"installed": False,
"url": f"{asset_endpoint_url}/clinical_index.parquet",
"file_path": None,
},
}
# Cache for index schemas fetched from release assets
self._index_schemas: dict = {}

# these will point to the dataframes containing the respective indices, once installed
# Initialize indices overview with automatic discovery
self.indices_overview = self._discover_available_indices()

# These will point to the dataframes containing the respective indices, once installed
# Initialize as None to allow checking before they are fetched
self.sm_index = None
self.sm_instance_index = None
self.clinical_index = None
Expand All @@ -182,6 +158,196 @@ def __init__(self):
# ... and check it can be executed
subprocess.check_call([self.s5cmdPath, "--help"], stdout=subprocess.DEVNULL)

def _discover_available_indices(self, refresh: bool = False) -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve this function to add disk caching. One the first init, schema should be fetched and saved to disk. On subsequent calls, it should be loaded from disk instead of re-fetching. Cache should be matched against the version of idc-index-data and refreshed if the installed version does not match. User should have an option to force cache refresh if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented disk caching in commit ee35afb. Added _load_indices_cache_from_disk() and _save_indices_cache_to_disk() methods. Schemas are:

  • Fetched and saved to disk on first init (in indices_data_dir/indices_cache.json)
  • Loaded from disk on subsequent calls
  • Cache matched against idc-index-data version and refreshed if version doesn't match
  • User can force refresh via refresh=True parameter in _discover_available_indices() or by calling refresh_indices_overview()

"""Discover available index tables from the idc-index-data GitHub release assets.

This method discovers available index parquet files by querying the GitHub
releases API to dynamically find all available indices. Descriptions are
populated from the accompanying JSON schema files.

Args:
refresh: If True, forces a refresh of the cached index list. If False,
returns the cached list if available.

Returns:
dict: A dictionary of available indices with their descriptions, URLs,
installation status, and file paths.
"""
# Return cached data if available and refresh is not requested
if (
not refresh
and hasattr(self, "indices_overview")
and self.indices_overview is not None
):
return self.indices_overview

# Mapping of asset filenames to canonical index names for bundled indices
bundled_indices = {
"idc_index": {
"canonical_name": "index",
"file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH,
},
"prior_versions_index": {
"canonical_name": "prior_versions_index",
"file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH,
},
}

indices = {}

# Discover indices from the GitHub release API
try:
response = requests.get(github_api_url, timeout=30)
if response.status_code == 200:
release_data = response.json()
assets = release_data.get("assets", [])

# Find all parquet files in the release assets
parquet_assets = {
a["name"]: a["browser_download_url"]
for a in assets
if a["name"].endswith(".parquet")
}

# Find all json schema files in the release assets
json_assets = {
a["name"]: a["browser_download_url"]
for a in assets
if a["name"].endswith(".json")
}

# Process all discovered parquet files
for parquet_name, parquet_url in parquet_assets.items():
# Extract index name from filename (e.g., "sm_index.parquet" -> "sm_index")
asset_index_name = parquet_name.replace(".parquet", "")

# Check if this is a bundled index
if asset_index_name in bundled_indices:
bundled_info = bundled_indices[asset_index_name]
index_name = bundled_info["canonical_name"]
installed = True
file_path = bundled_info["file_path"]
url = None # Bundled indices don't need URL
else:
index_name = asset_index_name
local_path = os.path.join(
self.indices_data_dir, f"{index_name}.parquet"
)
installed = os.path.exists(local_path)
file_path = local_path if installed else None
url = parquet_url

# Determine description from schema file
description = ""
schema_name = f"{asset_index_name}.json"
if schema_name in json_assets:
schema = self._fetch_index_schema_from_url(
json_assets[schema_name]
)
if schema:
description = schema.get("table_description", "")

indices[index_name] = {
"description": description,
"installed": installed,
"url": url,
"file_path": file_path,
}

else:
logger.warning(
f"GitHub API returned status {response.status_code}. "
"Unable to discover available indices."
)
except requests.exceptions.RequestException as e:
logger.warning(
f"GitHub API request failed: {e}. Unable to discover available indices."
)

# If no indices were discovered, add at least the bundled indices with default descriptions
if not indices:
indices = {
"index": {
"description": "Main index containing one row per DICOM series.",
"installed": True,
"url": None,
"file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH,
},
"prior_versions_index": {
"description": "Index containing one row per DICOM series from all previous IDC versions that are not in current version.",
"installed": True,
"url": None,
"file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH,
},
}

return indices

def _fetch_index_schema_from_url(self, url: str) -> dict | None:
"""Fetch an index schema JSON from a URL.

Args:
url: The URL to fetch the schema from.

Returns:
dict or None: The parsed schema dictionary, or None if fetching fails.
"""
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
return response.json()
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
logger.debug(f"Failed to fetch schema from {url}: {e}")
return None

def refresh_indices_overview(self) -> dict:
"""Refresh the list of available indices by re-querying the GitHub release.

This method forces a refresh of the indices_overview dictionary by querying
the GitHub releases API again, even if a cached version is available.

Returns:
dict: The refreshed indices_overview dictionary.
"""
self.indices_overview = self._discover_available_indices(refresh=True)
return self.indices_overview

def get_index_schema(self, index_name: str, refresh: bool = False) -> dict | None:
"""Get the full schema for an index, including column definitions.

This method fetches the JSON schema file for the specified index from the
idc-index-data release assets. The schema includes table_description and
column definitions with name, type, mode, and description.

Args:
index_name: The name of the index to get the schema for.
refresh: If True, forces a refresh of the cached schema.

Returns:
dict or None: The schema dictionary containing 'table_description' and
'columns', or None if the schema could not be fetched.
"""
if index_name not in self.indices_overview:
logger.error(f"Index {index_name} is not available.")
return None

# Return cached schema if available and refresh is not requested
if not refresh and index_name in self._index_schemas:
return self._index_schemas[index_name]

# Construct the schema URL
schema_url = f"{asset_endpoint_url}/{index_name}.json"

# Special handling for the main index - it uses idc_index.json
if index_name == "index":
schema_url = f"{asset_endpoint_url}/idc_index.json"

schema = self._fetch_index_schema_from_url(schema_url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to re-fetch schema from the URL, since _discover_available_indices was called in __init__ and the schema should be available for all indices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit ee35afb. get_index_schema() now uses schemas cached during _discover_available_indices() instead of re-fetching from URL. Schemas are stored in _index_schemas during discovery and reused.

if schema:
self._index_schemas[index_name] = schema

return schema

@staticmethod
def _replace_aws_with_gcp_buckets(dataframe, column_name):
# mapping from AWS to GCS buckets is fixed
Expand Down Expand Up @@ -368,11 +534,33 @@ def fetch_index(self, index_name) -> None:
"""
if index_name not in self.indices_overview:
logger.error(f"Index {index_name} is not available and can not be fetched.")
elif self.indices_overview[index_name]["installed"]:
logger.warning(
f"Index {index_name} already installed and will not be fetched again."
)
else:
return
if self.indices_overview[index_name]["installed"]:
# Index is already installed, load it from disk if not already loaded
if not hasattr(self, index_name) or getattr(self, index_name) is None:
filepath = self.indices_overview[index_name]["file_path"]
if filepath and os.path.exists(filepath):
logger.info(
f"Index {index_name} already installed, loading from {filepath}"
)
index_table = pd.read_parquet(filepath)
setattr(self, index_name, index_table)
else:
logger.warning(
f"Index {index_name} marked as installed but file not found. Re-downloading."
)
# Reset installed status to allow download
self.indices_overview[index_name]["installed"] = False
self.indices_overview[index_name]["file_path"] = None
# Fall through to the download logic below instead of recursive call
else:
logger.warning(
f"Index {index_name} already installed and will not be fetched again."
)
return

# Download the index if not installed
if not self.indices_overview[index_name]["installed"]:
logger.info("Fetching index %s", index_name)
response = requests.get(
self.indices_overview[index_name]["url"], timeout=30
Expand Down
65 changes: 63 additions & 2 deletions tests/idcindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,69 @@ def test_list_indices(self):
i = IDCClient()
assert i.indices_overview # assert that dict was created

def test_discovered_indices_have_descriptions(self):
"""Test that discovered indices have descriptions from schema files."""
i = IDCClient()
# All indices should have descriptions
for index_name, index_info in i.indices_overview.items():
assert "description" in index_info
# Most indices should have non-empty descriptions from schema files
# (though some may be empty if the schema fetch fails)

def test_get_index_schema(self):
"""Test that get_index_schema returns valid schema data."""
i = IDCClient()
# Test getting schema for the main index (always available as bundled)
schema = i.get_index_schema("index")
assert schema is not None
assert "table_description" in schema
assert "columns" in schema
assert len(schema["columns"]) > 0

# Test getting schema for a remote index if available
# (may not be available if GitHub API is rate-limited)
if "sm_index" in i.indices_overview:
schema = i.get_index_schema("sm_index")
assert schema is not None
assert "table_description" in schema
assert "columns" in schema

def test_get_index_schema_caching(self):
"""Test that schemas are cached after first fetch."""
i = IDCClient()
# First fetch should populate cache for the bundled index
schema1 = i.get_index_schema("index")
assert "index" in i._index_schemas

# Second fetch should use cache
schema2 = i.get_index_schema("index")
assert schema1 is schema2 # Same object from cache

# Refresh should fetch new data
schema3 = i.get_index_schema("index", refresh=True)
assert schema3 is not None

def test_get_index_schema_invalid_index(self):
"""Test that get_index_schema returns None for invalid index."""
i = IDCClient()
schema = i.get_index_schema("nonexistent_index")
assert schema is None

def test_refresh_indices_overview(self):
"""Test that refresh_indices_overview updates the indices list."""
i = IDCClient()
original_count = len(i.indices_overview)
# Refresh should return updated indices
refreshed = i.refresh_indices_overview()
assert refreshed is not None
assert len(refreshed) == original_count

def test_fetch_index(self):
i = IDCClient()
assert i.indices_overview["sm_index"]["installed"] is False
# Check if sm_index is discovered
assert "sm_index" in i.indices_overview
i.fetch_index("sm_index")
# After fetch_index, it should be installed
assert i.indices_overview["sm_index"]["installed"] is True
assert hasattr(i, "sm_index")

Expand All @@ -578,8 +637,10 @@ def test_indices_urls(self):

def test_clinical_index_install(self):
i = IDCClient()
assert i.indices_overview["clinical_index"]["installed"] is False
# Check if clinical_index is discovered
assert "clinical_index" in i.indices_overview
i.fetch_index("clinical_index")
# After fetch_index, it should be installed
assert i.indices_overview["clinical_index"]["installed"] is True
assert len(os.listdir(i.clinical_data_dir)) > 0

Expand Down
Loading