Skip to content
352 changes: 314 additions & 38 deletions idc_index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import json
import logging
import os
import re
Expand All @@ -25,6 +26,7 @@
aws_endpoint_url = "https://s3.amazonaws.com"
gcp_endpoint_url = "https://storage.googleapis.com"
asset_endpoint_url = f"https://github.com/ImagingDataCommons/idc-index-data/releases/download/{idc_index_data.__version__}"
github_api_url = f"https://api.github.com/repos/ImagingDataCommons/idc-index-data/releases/tags/{idc_index_data.__version__}"

logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -125,40 +127,14 @@ def __init__(self):
)
self.clinical_data_dir = None

self.indices_overview = {
"index": {
"description": "Main index containing one row per DICOM series.",
"installed": True,
"url": None,
"file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH,
},
"prior_versions_index": {
"description": "index containing one row per DICOM series from all previous IDC versions that are not in current version.",
"installed": True,
"url": None,
"file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH,
},
"sm_index": {
"description": "DICOM Slide Microscopy series-level index.",
"installed": False,
"url": f"{asset_endpoint_url}/sm_index.parquet",
"file_path": None,
},
"sm_instance_index": {
"description": "DICOM Slide Microscopy instance-level index.",
"installed": False,
"url": f"{asset_endpoint_url}/sm_instance_index.parquet",
"file_path": None,
},
"clinical_index": {
"description": "Index of clinical data accompanying the available images.",
"installed": False,
"url": f"{asset_endpoint_url}/clinical_index.parquet",
"file_path": None,
},
}
# Cache for index schemas fetched from release assets
self._index_schemas: dict = {}

# Initialize indices overview with automatic discovery
self.indices_overview = self._discover_available_indices()

# these will point to the dataframes containing the respective indices, once installed
# These will point to the dataframes containing the respective indices, once installed
# Initialize as None to allow checking before they are fetched
self.sm_index = None
self.sm_instance_index = None
self.clinical_index = None
Expand All @@ -182,6 +158,284 @@ def __init__(self):
# ... and check it can be executed
subprocess.check_call([self.s5cmdPath, "--help"], stdout=subprocess.DEVNULL)

def _discover_available_indices(self, refresh: bool = False) -> dict:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve this function to add disk caching. One the first init, schema should be fetched and saved to disk. On subsequent calls, it should be loaded from disk instead of re-fetching. Cache should be matched against the version of idc-index-data and refreshed if the installed version does not match. User should have an option to force cache refresh if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented disk caching in commit ee35afb. Added _load_indices_cache_from_disk() and _save_indices_cache_to_disk() methods. Schemas are:

  • Fetched and saved to disk on first init (in indices_data_dir/indices_cache.json)
  • Loaded from disk on subsequent calls
  • Cache matched against idc-index-data version and refreshed if version doesn't match
  • User can force refresh via refresh=True parameter in _discover_available_indices() or by calling refresh_indices_overview()

"""Discover available index tables from the idc-index-data GitHub release assets.

This method discovers available index parquet files by querying the GitHub
releases API to dynamically find all available indices. Descriptions are
populated from the accompanying JSON schema files.

Schemas are cached to disk in the indices_data_dir. On subsequent calls,
schemas are loaded from disk unless the idc-index-data version changes or
refresh is requested.

Args:
refresh: If True, forces a refresh of the cached index list and schemas.
If False, loads from disk cache if available.

Returns:
dict: A dictionary of available indices with their descriptions, URLs,
installation status, and file paths.
"""
# Return cached data if available and refresh is not requested
if (
not refresh
and hasattr(self, "indices_overview")
and self.indices_overview is not None
):
return self.indices_overview

# Try to load from disk cache first (if not forcing refresh)
if not refresh:
cached_data = self._load_indices_cache_from_disk()
if cached_data:
logger.debug("Loaded indices overview from disk cache")
# Populate the in-memory schema cache
if "schemas" in cached_data:
self._index_schemas = cached_data["schemas"]
return cached_data["indices"]

# Mapping of asset filenames to canonical index names for bundled indices
bundled_indices = {
"idc_index": {
"canonical_name": "index",
"file_path": idc_index_data.IDC_INDEX_PARQUET_FILEPATH,
},
"prior_versions_index": {
"canonical_name": "prior_versions_index",
"file_path": idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH,
},
}

indices = {}

# Discover indices from the GitHub release API
try:
response = requests.get(github_api_url, timeout=30)
if response.status_code == 200:
release_data = response.json()
assets = release_data.get("assets", [])

# Find all parquet files in the release assets
parquet_assets = {
a["name"]: a["browser_download_url"]
for a in assets
if a["name"].endswith(".parquet")
}

# Find all json schema files in the release assets
json_assets = {
a["name"]: a["browser_download_url"]
for a in assets
if a["name"].endswith(".json")
}

# Process all discovered parquet files
for parquet_name, parquet_url in parquet_assets.items():
# Extract index name from filename (e.g., "sm_index.parquet" -> "sm_index")
asset_index_name = parquet_name.replace(".parquet", "")

# Check if this is a bundled index
if asset_index_name in bundled_indices:
bundled_info = bundled_indices[asset_index_name]
index_name = bundled_info["canonical_name"]
installed = True
file_path = bundled_info["file_path"]
url = None # Bundled indices don't need URL
else:
index_name = asset_index_name
local_path = os.path.join(
self.indices_data_dir, f"{index_name}.parquet"
)
installed = os.path.exists(local_path)
file_path = local_path if installed else None
url = parquet_url

# Determine description from schema file and cache the full schema
description = ""
schema_name = f"{asset_index_name}.json"
if schema_name in json_assets:
schema = self._fetch_index_schema_from_url(
json_assets[schema_name]
)
if schema:
description = schema.get("table_description", "")
# Cache the full schema in memory
self._index_schemas[index_name] = schema

indices[index_name] = {
"description": description,
"installed": installed,
"url": url,
"file_path": str(file_path) if file_path else None,
}

else:
logger.warning(
f"GitHub API returned status {response.status_code}. "
"Unable to discover available indices."
)
except requests.exceptions.RequestException as e:
logger.warning(
f"GitHub API request failed: {e}. Unable to discover available indices."
)

# If no indices were discovered, add at least the bundled indices with default descriptions
if not indices:
indices = {
"index": {
"description": "Main index containing one row per DICOM series.",
"installed": True,
"url": None,
"file_path": str(idc_index_data.IDC_INDEX_PARQUET_FILEPATH),
},
"prior_versions_index": {
"description": "Index containing one row per DICOM series from all previous IDC versions that are not in current version.",
"installed": True,
"url": None,
"file_path": str(
idc_index_data.PRIOR_VERSIONS_INDEX_PARQUET_FILEPATH
),
},
}

# Try to fetch schemas for bundled indices even when API fails
for index_name, schema_filename in [
("index", "idc_index.json"),
("prior_versions_index", "prior_versions_index.json"),
]:
schema_url = f"{asset_endpoint_url}/{schema_filename}"
schema = self._fetch_index_schema_from_url(schema_url)
if schema:
indices[index_name]["description"] = schema.get(
"table_description", indices[index_name]["description"]
)
self._index_schemas[index_name] = schema

# Save to disk cache
self._save_indices_cache_to_disk(indices, self._index_schemas)

return indices

def _load_indices_cache_from_disk(self) -> dict | None:
"""Load cached indices overview and schemas from disk.

Returns:
dict or None: Dictionary containing 'indices' and 'schemas' if cache is valid,
None otherwise.
"""
cache_file = os.path.join(self.indices_data_dir, "indices_cache.json")

if not os.path.exists(cache_file):
return None

try:
with open(cache_file) as f:
cache_data = json.load(f)

# Verify cache is for current version
if cache_data.get("version") != idc_index_data.__version__:
logger.debug(
f"Cache version mismatch: {cache_data.get('version')} != {idc_index_data.__version__}"
)
return None

return {
"indices": cache_data.get("indices", {}),
"schemas": cache_data.get("schemas", {}),
}
except (json.JSONDecodeError, OSError) as e:
logger.debug(f"Failed to load indices cache from disk: {e}")
return None

def _save_indices_cache_to_disk(self, indices: dict, schemas: dict) -> None:
"""Save indices overview and schemas to disk cache.

Args:
indices: Dictionary of indices overview
schemas: Dictionary of index schemas
"""
cache_file = os.path.join(self.indices_data_dir, "indices_cache.json")

try:
os.makedirs(self.indices_data_dir, exist_ok=True)

cache_data = {
"version": idc_index_data.__version__,
"indices": indices,
"schemas": schemas,
}

with open(cache_file, "w") as f:
json.dump(cache_data, f, indent=2)

logger.debug(f"Saved indices cache to {cache_file}")
except (OSError, TypeError) as e:
logger.warning(f"Failed to save indices cache to disk: {e}")

def _fetch_index_schema_from_url(self, url: str) -> dict | None:
"""Fetch an index schema JSON from a URL.

Args:
url: The URL to fetch the schema from.

Returns:
dict or None: The parsed schema dictionary, or None if fetching fails.
"""
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
return response.json()
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
logger.debug(f"Failed to fetch schema from {url}: {e}")
return None

def refresh_indices_overview(self) -> dict:
"""Refresh the list of available indices by re-querying the GitHub release.

This method forces a refresh of the indices_overview dictionary by querying
the GitHub releases API again, even if a cached version is available.

Returns:
dict: The refreshed indices_overview dictionary.
"""
self.indices_overview = self._discover_available_indices(refresh=True)
return self.indices_overview

def get_index_schema(self, index_name: str, refresh: bool = False) -> dict | None:
"""Get the full schema for an index, including column definitions.

This method returns the JSON schema for the specified index. The schema
includes table_description and column definitions with name, type, mode,
and description. Schemas are cached in memory and on disk during discovery.

Args:
index_name: The name of the index to get the schema for.
refresh: If True, forces a refresh by re-discovering all indices.

Returns:
dict or None: The schema dictionary containing 'table_description' and
'columns', or None if the schema is not available.
"""
if index_name not in self.indices_overview:
logger.error(f"Index {index_name} is not available.")
return None

# If refresh is requested, re-discover indices to refresh all schemas
if refresh:
self.indices_overview = self._discover_available_indices(refresh=True)

# Return cached schema if available
if index_name in self._index_schemas:
return self._index_schemas[index_name]

# Schema was not cached during discovery (shouldn't happen in normal operation)
logger.warning(
f"Schema for {index_name} not available in cache. "
"This may indicate the index was discovered but schema fetch failed."
)
return None

@staticmethod
def _replace_aws_with_gcp_buckets(dataframe, column_name):
# mapping from AWS to GCS buckets is fixed
Expand Down Expand Up @@ -368,11 +622,33 @@ def fetch_index(self, index_name) -> None:
"""
if index_name not in self.indices_overview:
logger.error(f"Index {index_name} is not available and can not be fetched.")
elif self.indices_overview[index_name]["installed"]:
logger.warning(
f"Index {index_name} already installed and will not be fetched again."
)
else:
return
if self.indices_overview[index_name]["installed"]:
# Index is already installed, load it from disk if not already loaded
if not hasattr(self, index_name) or getattr(self, index_name) is None:
filepath = self.indices_overview[index_name]["file_path"]
if filepath and os.path.exists(filepath):
logger.info(
f"Index {index_name} already installed, loading from {filepath}"
)
index_table = pd.read_parquet(filepath)
setattr(self, index_name, index_table)
else:
logger.warning(
f"Index {index_name} marked as installed but file not found. Re-downloading."
)
# Reset installed status to allow download
self.indices_overview[index_name]["installed"] = False
self.indices_overview[index_name]["file_path"] = None
# Fall through to the download logic below instead of recursive call
else:
logger.warning(
f"Index {index_name} already installed and will not be fetched again."
)
return

# Download the index if not installed
if not self.indices_overview[index_name]["installed"]:
logger.info("Fetching index %s", index_name)
response = requests.get(
self.indices_overview[index_name]["url"], timeout=30
Expand Down
Loading