From fc818ab13921f7bb72c9824d97fe780e5f0246de Mon Sep 17 00:00:00 2001 From: Carlos Uribe Date: Sat, 24 Jan 2026 15:04:33 -0800 Subject: [PATCH 1/8] reformatted dicom receiver by splitting organization of files to a new module for better post processing --- pytheranostics/dicomtools/__init__.py | 7 + pytheranostics/dicomtools/dicom_organizer.py | 414 +++++++++++++++++++ pytheranostics/dicomtools/dicom_receiver.py | 328 +-------------- 3 files changed, 433 insertions(+), 316 deletions(-) create mode 100644 pytheranostics/dicomtools/dicom_organizer.py diff --git a/pytheranostics/dicomtools/__init__.py b/pytheranostics/dicomtools/__init__.py index 1ab4f90..16510a2 100644 --- a/pytheranostics/dicomtools/__init__.py +++ b/pytheranostics/dicomtools/__init__.py @@ -1 +1,8 @@ """DICOM utilities exposed at the package level.""" + +from .dicom_organizer import organize_folder_by_cycles, summarize_timepoints + +__all__ = [ + "organize_folder_by_cycles", + "summarize_timepoints", +] diff --git a/pytheranostics/dicomtools/dicom_organizer.py b/pytheranostics/dicomtools/dicom_organizer.py new file mode 100644 index 0000000..2f03040 --- /dev/null +++ b/pytheranostics/dicomtools/dicom_organizer.py @@ -0,0 +1,414 @@ +"""Utilities for organizing DICOM files by patient, cycle, and timepoint.""" + +import logging +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional + +import pydicom + +logger = logging.getLogger(__name__) + + +def _parse_dt(date_str: Optional[str], time_str: Optional[str]) -> Optional[datetime]: + """Parse common DICOM date/time fields to a datetime object. + + Parameters + ---------- + date_str : str | None + DICOM DA (YYYYMMDD) + time_str : str | None + DICOM TM (HHMMSS.frac) + + Returns + ------- + datetime | None + Parsed datetime or None if not enough info + """ + if not date_str: + return None + try: + y = int(date_str[0:4]) + m = int(date_str[4:6]) + d = int(date_str[6:8]) + if time_str: + hh = int(time_str[0:2]) if len(time_str) >= 2 else 0 + mm = int(time_str[2:4]) if len(time_str) >= 4 else 0 + ss = int(time_str[4:6]) if len(time_str) >= 6 else 0 + micro = 0 + if len(time_str) > 7 and "." in time_str: + frac = time_str.split(".")[-1] + # pad/cut to microseconds + frac = (frac + "000000")[:6] + micro = int(frac) + return datetime(y, m, d, hh, mm, ss, micro) + return datetime(y, m, d) + except Exception: + return None + + +def _series_datetime_from_any(dcm: pydicom.Dataset) -> Optional[datetime]: + """Best-effort extraction of a datetime for a DICOM series instance. + + Tries AcquisitionDate/Time first (most accurate), then SeriesDate/Time, + then ContentDate/Time, finally falls back to StudyDate/Time. + """ + # Acquisition (most accurate for actual scan time) + dt = _parse_dt( + getattr(dcm, "AcquisitionDate", None), getattr(dcm, "AcquisitionTime", None) + ) + if dt: + return dt + # Series + dt = _parse_dt(getattr(dcm, "SeriesDate", None), getattr(dcm, "SeriesTime", None)) + if dt: + return dt + # Content + dt = _parse_dt(getattr(dcm, "ContentDate", None), getattr(dcm, "ContentTime", None)) + if dt: + return dt + # Study + return _parse_dt(getattr(dcm, "StudyDate", None), getattr(dcm, "StudyTime", None)) + + +def organize_folder_by_cycles( + storage_root: Path | str, + output_base: Path | str | None = None, + *, + cycle_gap_days: float = 15, + timepoint_separation_days: float = 1, + move: bool = True, + patient_id_filter: Optional[List[str]] = None, +) -> Dict[str, Dict[str, List[Path]]]: + """Organize a folder of DICOM files into Patient/Cycle/Timepoint structure. + + This scans ``storage_root`` recursively for ``*.dcm`` files, groups them by + PatientID and StudyDate, then creates folders like: + + PatientID/Cycle1/tp1/CT + PatientID/Cycle1/tp1/SPECT + PatientID/Cycle1/tp1/CT/RTstruct + + Behavior mirrors ``organize_by_cycles()`` but does not require + a running receiver nor a metadata file; grouping is inferred from DICOM tags. + + Parameters + ---------- + storage_root : Path | str + Root directory to scan for DICOM files (searched recursively). + output_base : Path | str | None + Base directory where organized output will be created. Defaults to + ``storage_root`` when None. + cycle_gap_days : float + New cycle if consecutive study dates differ by >= this many days. + timepoint_separation_days : float + New timepoint when datetime gap is >= this many days (can be fractional, e.g., 0.2 ≈ 4.8 hours). + move : bool + If True, move files (and prune emptied dirs opportunistically). If False, copy files. + patient_id_filter : list[str] | None + If provided, only organize these PatientIDs. + + Returns + ------- + dict + Mapping: {PatientID: {"CycleX": {"tpY": [Path, ...]}}} + """ + storage_root = Path(storage_root) + if output_base is None: + output_base = storage_root + output_base = Path(output_base) + + index: Dict[str, Dict[tuple, List[Path]]] = {} + rep_dt_by_series: Dict[str, Dict[tuple, List[datetime]]] = {} + + def _read_minimal(dcm_path: Path) -> Optional[pydicom.Dataset]: + try: + return pydicom.dcmread(str(dcm_path), stop_before_pixels=True, force=True) + except Exception: + return None + + candidates: set[Path] = set() + for pattern in ("*.dcm", "*.DCM"): + candidates.update(storage_root.rglob(pattern)) + + for dcm_file in sorted(candidates): + ds = _read_minimal(dcm_file) + if ds is None: + continue + + patient_id = getattr(ds, "PatientID", None) or "UNKNOWN" + if patient_id_filter and patient_id not in patient_id_filter: + continue + + dt = _series_datetime_from_any(ds) + study_date = getattr(ds, "StudyDate", None) + if dt is None: + # Last-resort fallback to file modification time to help split same-day scans + try: + dt = datetime.fromtimestamp(dcm_file.stat().st_mtime) + except Exception: + dt = None + + if not study_date: + if dt: + study_date = dt.strftime("%Y%m%d") + else: + study_date = "00000000" + + modality = getattr(ds, "Modality", None) or "UNKNOWN" + series_number = getattr(ds, "SeriesNumber", None) + try: + series_number = int(series_number) if series_number is not None else -1 + except Exception: + series_number = -1 + + key = (study_date, modality, series_number) + index.setdefault(patient_id, {}).setdefault(key, []).append(dcm_file) + + if dt is None: + try: + dt = datetime.strptime(study_date, "%Y%m%d") + except Exception: + dt = datetime(1900, 1, 1) + + rep_dt_by_series.setdefault(patient_id, {}).setdefault(key, []).append(dt) + + results: Dict[str, Dict[str, Dict[str, List[Path]]]] = {} + + for patient_id, series_map in index.items(): + series_entries: List[Dict[str, object]] = [] + for key, files in series_map.items(): + study_date, modality, series_number = key + dt_list = rep_dt_by_series.get(patient_id, {}).get(key, []) + rep_dt = min(dt_list) if dt_list else datetime(1900, 1, 1) + + # Sub-group files within this series by datetime gaps to split same series_number across timepoints + # Sort files by their datetime and split into subgroups when gap >= timepoint_separation_days + file_dts: List[tuple[Path, datetime]] = [] + for f in files: + try: + ds = pydicom.dcmread(str(f), stop_before_pixels=True, force=True) + fdt = _series_datetime_from_any(ds) + if fdt is None: + try: + fdt = datetime.fromtimestamp(f.stat().st_mtime) + except Exception: + fdt = datetime.strptime(study_date, "%Y%m%d") + except Exception: + try: + fdt = datetime.fromtimestamp(f.stat().st_mtime) + except Exception: + fdt = datetime.strptime(study_date, "%Y%m%d") + file_dts.append((f, fdt)) + + file_dts = sorted(file_dts, key=lambda x: x[1]) + + # Split into subgroups when gap >= timepoint_separation_days + subgroups: List[List[Path]] = [] + current_group: List[Path] = [] + prev_dt: Optional[datetime] = None + for f, fdt in file_dts: + if prev_dt is not None and (fdt - prev_dt) >= timedelta( + days=timepoint_separation_days + ): + # Start new subgroup + subgroups.append(current_group) + current_group = [f] + else: + current_group.append(f) + prev_dt = fdt + + if current_group: + subgroups.append(current_group) + + # Create a series_entry per subgroup + for sg_idx, sg_files in enumerate(subgroups): + sg_dts = [fdt for f, fdt in file_dts if f in sg_files] + sg_rep_dt = min(sg_dts) if sg_dts else rep_dt + series_entries.append( + { + "study_date": study_date, + "datetime": sg_rep_dt, + "modality": modality, + "series_number": series_number, + "files": sg_files, + } + ) + + series_entries = sorted(series_entries, key=lambda s: s["datetime"]) + if not series_entries: + continue + + patient_root = output_base / patient_id + patient_root.mkdir(parents=True, exist_ok=True) + + cycle_idx = 1 + tp_idx = 1 + prev_dt = series_entries[0]["datetime"] + src_dirs_for_cleanup: set[Path] = set() + + for i, s in enumerate(series_entries): + this_dt = s["datetime"] + if i > 0: + if (this_dt - prev_dt) >= timedelta(days=cycle_gap_days): + cycle_idx += 1 + tp_idx = 1 + elif (this_dt - prev_dt) >= timedelta(days=timepoint_separation_days): + tp_idx += 1 + + cycle_dir = patient_root / f"Cycle{cycle_idx}" / f"tp{tp_idx}" + cycle_dir.mkdir(parents=True, exist_ok=True) + + cycle_key = f"Cycle{cycle_idx}" + tp_key = f"tp{tp_idx}" + results.setdefault(patient_id, {}).setdefault(cycle_key, {}).setdefault( + tp_key, [] + ) + + modality = s["modality"] + series_number = s["series_number"] + files = s["files"] + + if modality in ["NM", "PT"]: + dest_dir = cycle_dir / "SPECT" + elif modality == "RTSTRUCT": + dest_dir = cycle_dir / "CT" / "RTstruct" + else: + dest_dir = cycle_dir / (modality or "UNKNOWN") + + dest_dir.mkdir(parents=True, exist_ok=True) + + imported = 0 + for src_file in files: + try: + target = dest_dir / src_file.name + if target.exists(): + continue + if move: + import shutil + + shutil.move(str(src_file), str(target)) + src_dirs_for_cleanup.add(src_file.parent) + else: + import shutil + + shutil.copy2(str(src_file), str(target)) + imported += 1 + except Exception: + continue + + logger.info( + f"Organized {imported} files -> {dest_dir} ({modality}, Series{series_number}, {this_dt})" + ) + + results[patient_id][cycle_key][tp_key].append(dest_dir) + + prev_dt = this_dt + + if move: + for src_dir in list(src_dirs_for_cleanup): + try: + if src_dir.exists() and not any(src_dir.iterdir()): + src_dir.rmdir() + except Exception: + pass + try: + parent1 = src_dir.parent + if parent1.exists() and not any(parent1.iterdir()): + parent1.rmdir() + except Exception: + pass + + return results + + +def summarize_timepoints( + storage_root: Path | str, + *, + patient_id_filter: Optional[List[str]] = None, +) -> Dict[str, List[Dict[str, object]]]: + """Summarize detected series (one per modality/series_number per StudyDate) for debugging. + + Scans ``storage_root`` with the same datetime extraction logic used by + ``organize_folder_by_cycles`` and returns, per patient, the ordered list of + all distinct (study_date, modality, series_number) with their representative + datetimes and gaps in hours to the previous entry. + """ + storage_root = Path(storage_root) + + # Track all unique (study_date, modality, series_number) per patient with min datetime + index: Dict[str, Dict[tuple, List[datetime]]] = {} + + def _read_minimal(dcm_path: Path) -> Optional[pydicom.Dataset]: + try: + return pydicom.dcmread(str(dcm_path), stop_before_pixels=True, force=True) + except Exception: + return None + + candidates: set[Path] = set() + for pattern in ("*.dcm", "*.DCM"): + candidates.update(storage_root.rglob(pattern)) + + for dcm_file in sorted(candidates): + ds = _read_minimal(dcm_file) + if ds is None: + continue + patient_id = getattr(ds, "PatientID", None) or "UNKNOWN" + if patient_id_filter and patient_id not in patient_id_filter: + continue + + dt = _series_datetime_from_any(ds) + study_date = getattr(ds, "StudyDate", None) + if dt is None: + try: + dt = datetime.fromtimestamp(dcm_file.stat().st_mtime) + except Exception: + dt = None + + if not study_date: + study_date = dt.strftime("%Y%m%d") if dt else "00000000" + + modality = getattr(ds, "Modality", None) or "UNKNOWN" + series_number = getattr(ds, "SeriesNumber", None) + try: + series_number = int(series_number) if series_number is not None else -1 + except Exception: + series_number = -1 + + if dt is None: + try: + dt = datetime.strptime(study_date, "%Y%m%d") + except Exception: + dt = datetime(1900, 1, 1) + + key = (study_date, modality, series_number) + index.setdefault(patient_id, {}).setdefault(key, []).append(dt) + + summary: Dict[str, List[Dict[str, object]]] = {} + for patient_id, by_key in index.items(): + entries: List[Dict[str, object]] = [] + for key, dts in sorted(by_key.items(), key=lambda kv: min(kv[1])): + sd, mod, sn = key + rep_dt = min(dts) + entries.append( + { + "study_date": sd, + "modality": mod, + "series_number": sn, + "datetime": rep_dt, + } + ) + + # Compute deltas after sorting by datetime + entries = sorted(entries, key=lambda e: e["datetime"]) + prev_dt: Optional[datetime] = None + for entry in entries: + delta_hours = None + if prev_dt is not None: + delta_hours = (entry["datetime"] - prev_dt).total_seconds() / 3600.0 + entry["delta_hours"] = delta_hours + prev_dt = entry["datetime"] + + summary[patient_id] = entries + + return summary diff --git a/pytheranostics/dicomtools/dicom_receiver.py b/pytheranostics/dicomtools/dicom_receiver.py index 6f7edb7..c372687 100644 --- a/pytheranostics/dicomtools/dicom_receiver.py +++ b/pytheranostics/dicomtools/dicom_receiver.py @@ -6,7 +6,7 @@ import json import logging -from datetime import datetime, timedelta +from datetime import datetime from pathlib import Path from typing import Callable, Dict, List, Optional @@ -22,6 +22,8 @@ from pynetdicom import AE, AllStoragePresentationContexts, evt from pynetdicom.sop_class import Verification +from .dicom_organizer import organize_folder_by_cycles + # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -36,6 +38,8 @@ class DICOMReceiver: - Metadata extraction for dosimetry parameters - Support for CT, SPECT/NM, PET, and RT Structure Sets - Configurable storage paths and callbacks + + Requires pynetdicom to be installed. """ def __init__( @@ -163,11 +167,15 @@ def _runner(): logger.info( f"Auto-organizing cycles for patient {patient_id} after {self.auto_organize_debounce_seconds}s idle" ) - self.organize_by_cycles( - patient_id=patient_id, + organize_folder_by_cycles( + storage_root=self.storage_root, output_base=self.auto_organize_output_base, cycle_gap_days=self.auto_organize_cycle_gap_days, - timepoint_separation_days=self.auto_organize_timepoint_separation_days, + timepoint_separation_days=float( + self.auto_organize_timepoint_separation_days + ), + move=True, + patient_id_filter=[patient_id], ) except Exception as e: logger.exception(f"Auto-organize failed for {patient_id}: {e}") @@ -275,318 +283,6 @@ def _get_storage_path(self, ds: pydicom.Dataset) -> Path: path.mkdir(parents=True, exist_ok=True) return path - # -------------------------- - # Post-processing utilities - # -------------------------- - @staticmethod - def _parse_dt( - date_str: Optional[str], time_str: Optional[str] - ) -> Optional[datetime]: - """Parse common DICOM date/time fields to a datetime object. - - Parameters - ---------- - date_str : str | None - DICOM DA (YYYYMMDD) - time_str : str | None - DICOM TM (HHMMSS.frac) - - Returns - ------- - datetime | None - Parsed datetime or None if not enough info - """ - if not date_str: - return None - try: - y = int(date_str[0:4]) - m = int(date_str[4:6]) - d = int(date_str[6:8]) - if time_str: - hh = int(time_str[0:2]) if len(time_str) >= 2 else 0 - mm = int(time_str[2:4]) if len(time_str) >= 4 else 0 - ss = int(time_str[4:6]) if len(time_str) >= 6 else 0 - micro = 0 - if len(time_str) > 7 and "." in time_str: - frac = time_str.split(".")[-1] - # pad/cut to microseconds - frac = (frac + "000000")[:6] - micro = int(frac) - return datetime(y, m, d, hh, mm, ss, micro) - return datetime(y, m, d) - except Exception: - return None - - @staticmethod - def _series_datetime_from_any(dcm: pydicom.Dataset) -> Optional[datetime]: - """Best-effort extraction of a datetime for a DICOM series instance. - - Tries SeriesDate/Time, then AcquisitionDate/Time, then ContentDate/Time, - finally falls back to StudyDate/Time. - """ - # Series - dt = DICOMReceiver._parse_dt( - getattr(dcm, "SeriesDate", None), getattr(dcm, "SeriesTime", None) - ) - if dt: - return dt - # Acquisition - dt = DICOMReceiver._parse_dt( - getattr(dcm, "AcquisitionDate", None), getattr(dcm, "AcquisitionTime", None) - ) - if dt: - return dt - # Content - dt = DICOMReceiver._parse_dt( - getattr(dcm, "ContentDate", None), getattr(dcm, "ContentTime", None) - ) - if dt: - return dt - # Study - return DICOMReceiver._parse_dt( - getattr(dcm, "StudyDate", None), getattr(dcm, "StudyTime", None) - ) - - @staticmethod - def _get_any_dicom_datetime_in_path(path: Path) -> Optional[datetime]: - """Find any DICOM file in a directory and return its best-effort datetime. - - Parameters - ---------- - path : Path - Directory containing DICOM files - - Returns - ------- - datetime | None - """ - try: - for dcm_file in sorted(path.glob("*.dcm")): - try: - ds = pydicom.dcmread( - str(dcm_file), stop_before_pixels=True, force=True - ) - dt = DICOMReceiver._series_datetime_from_any(ds) - if dt: - return dt - except Exception: - continue - return None - except Exception: - return None - - def _collect_patient_series(self, patient_id: str) -> List[Dict]: - """Collect all known series for a patient across all studies. - - Returns list of dicts with keys: modality, series_number, series_description, - path (Path), datetime (datetime | None), study_date (str | None). - """ - series_list: List[Dict] = [] - for key, info in self.metadata.items(): - if not key.startswith(f"{patient_id}_"): - continue - study_date = info.get("patient_info", {}).get("StudyDate") - series = info.get("series", {}) - for s_key, s in series.items(): - src_path = Path(s.get("path", self.storage_root)) - # Determine a representative datetime for the series - rep_dt = self._get_any_dicom_datetime_in_path(src_path) - if rep_dt is None and study_date: - # Fallback to study_date - rep_dt = self._parse_dt( - study_date, info.get("patient_info", {}).get("StudyTime") - ) - series_list.append( - { - "modality": s.get("modality", "UNKNOWN"), - "series_number": s.get("series_number", 0), - "series_description": s.get("series_description", ""), - "path": src_path, - "datetime": rep_dt, - "study_date": study_date, - } - ) - # Filter out those without any path - return [x for x in series_list if x.get("path") is not None] - - def organize_by_cycles( - self, - patient_id: str, - output_base: Path, - cycle_gap_days: int = 15, - timepoint_separation_days: int = 1, - ) -> Dict[str, Dict[str, List[Path]]]: - """Post-process received DICOMs into Cycle/Timepoint structure. - - Creates folders like: - PatientID/Cycle1/tp1/CT/Series3 - PatientID/Cycle1/tp1/SPECT/Series5 - PatientID/Cycle1/tp2/CT/Series2 - - RTSTRUCT will be placed under the corresponding CT timepoint: - PatientID/Cycle1/tp1/CT/RTstruct/Series7 - - Parameters - ---------- - patient_id : str - Patient identifier - output_base : Path - Directory under which the new structure will be created - cycle_gap_days : int - Start a new cycle if the gap since the previous scan is >= this many days (default 15 days). - timepoint_separation_days : int - Start a new timepoint when acquisition date changes by this many days or more (default 1 day) - - Returns - ------- - dict - Nested dict with created directories per cycle and timepoint - """ - series_list = self._collect_patient_series(patient_id) - if not series_list: - raise ValueError(f"No series found for patient '{patient_id}'.") - - # Ensure we have datetimes; if some missing, use file mtime as last resort - for s in series_list: - if s["datetime"] is None: - try: - any_file = next(iter(sorted(s["path"].glob("*.dcm")))) - mtime = datetime.fromtimestamp(any_file.stat().st_mtime) - s["datetime"] = mtime - except StopIteration: - # No files present - skip later - s["datetime"] = None - - # Drop any without datetime ultimately - series_list = [s for s in series_list if s["datetime"] is not None] - - # Group series by StudyDate to define timepoints, so RTSTRUCT doesn't create new cycles - # Build mapping: study_date -> list[series] - tp_by_date: Dict[str, List[Dict]] = {} - for s in series_list: - sd = s.get("study_date") or s["datetime"].strftime("%Y%m%d") - tp_by_date.setdefault(sd, []).append(s) - - # Sort timepoints by study date - sorted_dates = sorted(tp_by_date.keys()) - - out: Dict[str, Dict[str, List[Path]]] = {} - patient_root = Path(output_base) / patient_id - patient_root.mkdir(parents=True, exist_ok=True) - - if not sorted_dates: - return out - - # Compute cycles from consecutive study date gaps - cycle_idx = 1 - tp_idx = 1 - prev_date_dt = datetime.strptime(sorted_dates[0], "%Y%m%d") - - for i, sd in enumerate(sorted_dates): - this_date_dt = datetime.strptime(sd, "%Y%m%d") - if i > 0: - if (this_date_dt - prev_date_dt) >= timedelta(days=cycle_gap_days): - # New cycle - cycle_idx += 1 - tp_idx = 1 - else: - # Same cycle, next timepoint (optionally collapse same-day scans if needed) - if ( - this_date_dt.date() - prev_date_dt.date() - ).days >= timepoint_separation_days: - tp_idx += 1 - - # For all series in this study date, place under tp folder - cycle_dir = patient_root / f"Cycle{cycle_idx}" / f"tp{tp_idx}" - cycle_dir.mkdir(parents=True, exist_ok=True) - - # Track source modality directories seen for cleanup after moving - src_dirs_for_cleanup: set[Path] = set() - - for s in tp_by_date[sd]: - modality = s["modality"] - # Normalize modality names for destination - if modality in ["NM", "PT"]: - modality_folder = "SPECT" - elif modality == "RTSTRUCT": - modality_folder = "CT" # RTSTRUCT under CT/RTstruct - else: - modality_folder = modality - - series_number = s.get("series_number", 0) or 0 - # Destination folders drop the Series subfolder; put instances directly under modality - if modality == "RTSTRUCT": - dest_dir = cycle_dir / "CT" / "RTstruct" - else: - dest_dir = cycle_dir / modality_folder - - dest_dir.mkdir(parents=True, exist_ok=True) - - # Copy only files belonging to this SeriesNumber - src_path: Path = s["path"] - src_dirs_for_cleanup.add(src_path) - copied = 0 - for dcm_file in src_path.glob("*.dcm"): - try: - ds = pydicom.dcmread( - str(dcm_file), stop_before_pixels=True, force=True - ) - if int(getattr(ds, "SeriesNumber", -1) or -1) == int( - series_number - ): - import shutil - - dest_file = dest_dir / dcm_file.name - if dest_file.exists(): - # Skip if already present to avoid accidental overwrite - continue - # Move instead of copy to avoid duplication - shutil.move(str(dcm_file), str(dest_file)) - copied += 1 - except Exception: - continue - logger.info( - f"Organized {copied} files -> {dest_dir} ({modality}, Series{int(series_number)}, {sd})" - ) - - # Record in output mapping - cycle_key = f"Cycle{cycle_idx}" - tp_key = f"tp{tp_idx}" - out.setdefault(cycle_key, {}).setdefault(tp_key, []).append(dest_dir) - - # After processing all series for this StudyDate, prune empty source directories - try: - for src_dir in src_dirs_for_cleanup: - # Remove dir if empty - try: - if src_dir.exists() and not any(src_dir.iterdir()): - src_dir.rmdir() - except Exception: - pass - # Attempt to remove parent StudyDate dir if empty - try: - study_parent = src_dir.parent - if study_parent.exists() and not any(study_parent.iterdir()): - study_parent.rmdir() - except Exception: - pass - # Attempt to remove patient dir if now empty (rare) - try: - patient_dir = study_parent.parent - if patient_dir.exists() and not any(patient_dir.iterdir()): - patient_dir.rmdir() - except Exception: - pass - except Exception: - logger.debug("Cleanup after move encountered issues; continuing.") - - prev_date_dt = this_date_dt - - logger.info( - f"Cycle/Timepoint organization complete for patient {patient_id} at {patient_root}" - ) - return out - def _handle_store(self, event): """ Handle an incoming C-STORE request. From 7869ad07c9af2e05806ca6db8e0d31222dad0fb9 Mon Sep 17 00:00:00 2001 From: Carlos Uribe Date: Sat, 24 Jan 2026 15:08:41 -0800 Subject: [PATCH 2/8] Add documentation for the dicom organization --- .../dicom_organization/dicom_organization.rst | 178 ++++++++++++++++++ docs/source/tutorials/index.rst | 3 +- 2 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 docs/source/tutorials/dicom_organization/dicom_organization.rst diff --git a/docs/source/tutorials/dicom_organization/dicom_organization.rst b/docs/source/tutorials/dicom_organization/dicom_organization.rst new file mode 100644 index 0000000..cf1025b --- /dev/null +++ b/docs/source/tutorials/dicom_organization/dicom_organization.rst @@ -0,0 +1,178 @@ +DICOM File Organization +======================= + +PyTheranostics provides utilities to organize DICOM files into a structured format suitable for dosimetry workflows. The ``dicom_organizer`` module can process folders of DICOM files and automatically organize them by patient, cycle, and timepoint. + +Overview +-------- + +The organizer handles: + +* **Multiple patients** in a single folder +* **Multiple imaging cycles** per patient (e.g., therapy cycles separated by weeks) +* **Multiple timepoints** per cycle (e.g., scans at different times during a cycle) +* **Mixed modalities** (CT, SPECT/NM, PET, RTSTRUCT) +* **Same-day acquisitions** at different times (using datetime-based splitting) + +Output structure +---------------- + +The organizer creates a hierarchical folder structure:: + + PatientID/ + Cycle1/ + tp1/ + CT/ + *.dcm + SPECT/ + *.dcm + CT/ + RTstruct/ + *.dcm + tp2/ + CT/ + SPECT/ + Cycle2/ + tp1/ + ... + +Basic Usage +----------- + +Organize all DICOM files in a folder +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + from pytheranostics.dicomtools import organize_folder_by_cycles + + # Organize all patients in a folder + result = organize_folder_by_cycles( + storage_root="/path/to/dicom/files", + output_base="/path/to/organized/output", + cycle_gap_days=15, # New cycle if gap >= 15 days + timepoint_separation_days=1, # New timepoint if gap >= 1 day + move=True # Move files (False to copy) + ) + + # Result is a dict: {PatientID: {CycleX: {tpY: [Path, ...]}}} + print(result) + +Organize specific patients only +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + # Filter to specific patient IDs + result = organize_folder_by_cycles( + storage_root="/path/to/dicom/files", + output_base="/path/to/organized/output", + patient_id_filter=["PATIENT001", "PATIENT002"] + ) + +Handle same-day acquisitions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For protocols with multiple scans on the same day (e.g., morning CT and afternoon SPECT+CT), use fractional days to separate timepoints based on actual acquisition times: + +.. code-block:: python + + # Separate timepoints if acquisition times differ by >= 4.8 hours + result = organize_folder_by_cycles( + storage_root="/path/to/dicom/files", + output_base="/path/to/organized/output", + timepoint_separation_days=0.2 # 0.2 days ≈ 4.8 hours + ) + +This uses ``AcquisitionDateTime`` from DICOM tags (or file modification time as fallback) to split same-day scans into separate timepoints. + +Debugging and Inspection +------------------------- + +Use ``summarize_timepoints()`` to inspect detected series before organizing: + +.. code-block:: python + + from pytheranostics.dicomtools import summarize_timepoints + + # Get summary of all detected series + summary = summarize_timepoints( + storage_root="/path/to/dicom/files", + patient_id_filter=["PATIENT001"] + ) + + # Summary shows: study_date, modality, series_number, datetime, and gaps + for patient_id, entries in summary.items(): + print(f"\n{patient_id}:") + for entry in entries: + print(f" {entry['study_date']} - {entry['modality']} " + f"Series{entry['series_number']} at {entry['datetime']} " + f"(gap: {entry['delta_hours']:.1f}h)") + +Example output:: + + PATIENT001: + 20190409 - CT Series2 at 2019-04-09 11:34:57 (gap: None) + 20190409 - NM Series5 at 2019-04-09 16:06:50 (gap: 4.5h) + 20190409 - CT Series2 at 2019-04-09 16:26:59 (gap: 0.3h) + 20190410 - CT Series2 at 2019-04-10 10:15:23 (gap: 17.8h) + 20190413 - NM Series4 at 2019-04-13 14:22:10 (gap: 76.1h) + +This helps diagnose issues like: + +* Missing timepoints +* Incorrectly merged same-day scans +* Unexpected gaps between acquisitions + +Parameters Reference +-------------------- + +``organize_folder_by_cycles()`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:storage_root: Root directory to scan recursively for ``.dcm`` files +:output_base: Base directory for organized output (defaults to ``storage_root``) +:cycle_gap_days: Gap threshold (days) to start a new cycle (default: 15) +:timepoint_separation_days: Gap threshold (days) to start a new timepoint within a cycle (default: 1, can be fractional like 0.1 for 2.4 hours) +:move: If ``True``, move files; if ``False``, copy files (default: ``True``) +:patient_id_filter: List of PatientIDs to process; if ``None``, process all (default: ``None``) + +Returns a nested dictionary: ``{PatientID: {"CycleX": {"tpY": [Path, ...]}}}}`` + +``summarize_timepoints()`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:storage_root: Root directory to scan for DICOM files +:patient_id_filter: Optional list of PatientIDs to summarize + +Returns: ``{PatientID: [{study_date, modality, series_number, datetime, delta_hours}, ...]}`` + +Advanced: Integration with DICOM Receiver +------------------------------------------ + +The organizer can be triggered automatically after receiving DICOM files via C-STORE: + +.. code-block:: python + + from pytheranostics.dicomtools.dicom_receiver import DICOMReceiver + + receiver = DICOMReceiver( + ae_title="PYTHERANOSTICS", + port=11112, + storage_root="/path/to/storage", + auto_organize=True, # Enable auto-organize + auto_organize_output_base="/path/to/organized", + auto_organize_cycle_gap_days=15, + auto_organize_timepoint_separation_days=0.2, # 4.8 hours + auto_organize_debounce_seconds=60 # Wait 60s after last file + ) + + receiver.start() + +The receiver will automatically call ``organize_folder_by_cycles()`` 60 seconds after the last DICOM file is received for each patient. + +See Also +-------- + +* :doc:`../Data_Ingestion_Examples/Data_Ingestion_Examples` - General data ingestion workflows +* :doc:`../getting_started/project_setup_tutorial` - Initial project setup diff --git a/docs/source/tutorials/index.rst b/docs/source/tutorials/index.rst index 7d02989..e55dd70 100644 --- a/docs/source/tutorials/index.rst +++ b/docs/source/tutorials/index.rst @@ -7,7 +7,8 @@ Hands-on walkthroughs that demonstrate common PyTheranostics workflows. :maxdepth: 1 getting_started/project_setup_tutorial + dicom_organization/dicom_organization + Data_Ingestion_Examples/Data_Ingestion_Examples segmentation/total_segmentator_tutorial SPECT2SUV/SPECT2SUV ROI_Mapping_Tutorial/ROI_Mapping_Tutorial - Data_Ingestion_Examples/Data_Ingestion_Examples From edb702b1bf28c7bf4aa985ea82df573c5d9d3420 Mon Sep 17 00:00:00 2001 From: "Carlos F. Uribe" Date: Sat, 24 Jan 2026 16:41:20 -0800 Subject: [PATCH 3/8] Update pytheranostics/dicomtools/dicom_organizer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pytheranostics/dicomtools/dicom_organizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytheranostics/dicomtools/dicom_organizer.py b/pytheranostics/dicomtools/dicom_organizer.py index 2f03040..c93736c 100644 --- a/pytheranostics/dicomtools/dicom_organizer.py +++ b/pytheranostics/dicomtools/dicom_organizer.py @@ -79,7 +79,7 @@ def organize_folder_by_cycles( timepoint_separation_days: float = 1, move: bool = True, patient_id_filter: Optional[List[str]] = None, -) -> Dict[str, Dict[str, List[Path]]]: +) -> Dict[str, Dict[str, Dict[str, List[Path]]]]: """Organize a folder of DICOM files into Patient/Cycle/Timepoint structure. This scans ``storage_root`` recursively for ``*.dcm`` files, groups them by From 080936ab8a372886f9abbb2dff7884ff1bec4f5f Mon Sep 17 00:00:00 2001 From: "Carlos F. Uribe" Date: Sat, 24 Jan 2026 16:42:21 -0800 Subject: [PATCH 4/8] Update docs/source/tutorials/dicom_organization/dicom_organization.rst Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- docs/source/tutorials/dicom_organization/dicom_organization.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/tutorials/dicom_organization/dicom_organization.rst b/docs/source/tutorials/dicom_organization/dicom_organization.rst index cf1025b..b522204 100644 --- a/docs/source/tutorials/dicom_organization/dicom_organization.rst +++ b/docs/source/tutorials/dicom_organization/dicom_organization.rst @@ -84,7 +84,7 @@ For protocols with multiple scans on the same day (e.g., morning CT and afternoo timepoint_separation_days=0.2 # 0.2 days ≈ 4.8 hours ) -This uses ``AcquisitionDateTime`` from DICOM tags (or file modification time as fallback) to split same-day scans into separate timepoints. +This uses DICOM acquisition date/time tags (e.g., ``AcquisitionDate``/``AcquisitionTime``, or related series/content/study date/time tags) with file modification time as a fallback to split same-day scans into separate timepoints. Debugging and Inspection ------------------------- From cc45416e8b32ef7c2c8cbc554ac1c8b63b2875f9 Mon Sep 17 00:00:00 2001 From: "Carlos F. Uribe" Date: Sat, 24 Jan 2026 16:43:09 -0800 Subject: [PATCH 5/8] Update pytheranostics/dicomtools/dicom_organizer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pytheranostics/dicomtools/dicom_organizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytheranostics/dicomtools/dicom_organizer.py b/pytheranostics/dicomtools/dicom_organizer.py index c93736c..1d563bc 100644 --- a/pytheranostics/dicomtools/dicom_organizer.py +++ b/pytheranostics/dicomtools/dicom_organizer.py @@ -222,7 +222,7 @@ def _read_minimal(dcm_path: Path) -> Optional[pydicom.Dataset]: subgroups.append(current_group) # Create a series_entry per subgroup - for sg_idx, sg_files in enumerate(subgroups): + for _, sg_files in enumerate(subgroups): sg_dts = [fdt for f, fdt in file_dts if f in sg_files] sg_rep_dt = min(sg_dts) if sg_dts else rep_dt series_entries.append( From d8e7be6a96ce83d6d00ca70d6f319f8c2e24206c Mon Sep 17 00:00:00 2001 From: "Carlos F. Uribe" Date: Sat, 24 Jan 2026 16:43:36 -0800 Subject: [PATCH 6/8] Update docs/source/tutorials/dicom_organization/dicom_organization.rst Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../tutorials/dicom_organization/dicom_organization.rst | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/source/tutorials/dicom_organization/dicom_organization.rst b/docs/source/tutorials/dicom_organization/dicom_organization.rst index b522204..6255b40 100644 --- a/docs/source/tutorials/dicom_organization/dicom_organization.rst +++ b/docs/source/tutorials/dicom_organization/dicom_organization.rst @@ -105,14 +105,16 @@ Use ``summarize_timepoints()`` to inspect detected series before organizing: for patient_id, entries in summary.items(): print(f"\n{patient_id}:") for entry in entries: + gap = entry['delta_hours'] + gap_str = "N/A" if gap is None else f"{gap:.1f}h" print(f" {entry['study_date']} - {entry['modality']} " f"Series{entry['series_number']} at {entry['datetime']} " - f"(gap: {entry['delta_hours']:.1f}h)") + f"(gap: {gap_str})") Example output:: PATIENT001: - 20190409 - CT Series2 at 2019-04-09 11:34:57 (gap: None) + 20190409 - CT Series2 at 2019-04-09 11:34:57 (gap: N/A) 20190409 - NM Series5 at 2019-04-09 16:06:50 (gap: 4.5h) 20190409 - CT Series2 at 2019-04-09 16:26:59 (gap: 0.3h) 20190410 - CT Series2 at 2019-04-10 10:15:23 (gap: 17.8h) From deaa98fbc21a2a42cd630f545f06ded3923baa89 Mon Sep 17 00:00:00 2001 From: "Carlos F. Uribe" Date: Sat, 24 Jan 2026 16:44:01 -0800 Subject: [PATCH 7/8] Update docs/source/tutorials/dicom_organization/dicom_organization.rst Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- docs/source/tutorials/dicom_organization/dicom_organization.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/tutorials/dicom_organization/dicom_organization.rst b/docs/source/tutorials/dicom_organization/dicom_organization.rst index 6255b40..dee59a7 100644 --- a/docs/source/tutorials/dicom_organization/dicom_organization.rst +++ b/docs/source/tutorials/dicom_organization/dicom_organization.rst @@ -139,7 +139,7 @@ Parameters Reference :move: If ``True``, move files; if ``False``, copy files (default: ``True``) :patient_id_filter: List of PatientIDs to process; if ``None``, process all (default: ``None``) -Returns a nested dictionary: ``{PatientID: {"CycleX": {"tpY": [Path, ...]}}}}`` +Returns a nested dictionary: ``{PatientID: {"CycleX": {"tpY": [Path, ...]}}}`` ``summarize_timepoints()`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~ From e13c672f35674634052febcf60d5594b53e50939 Mon Sep 17 00:00:00 2001 From: "Carlos F. Uribe" Date: Sat, 24 Jan 2026 16:44:23 -0800 Subject: [PATCH 8/8] Update pytheranostics/dicomtools/dicom_organizer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pytheranostics/dicomtools/dicom_organizer.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pytheranostics/dicomtools/dicom_organizer.py b/pytheranostics/dicomtools/dicom_organizer.py index 1d563bc..6670f88 100644 --- a/pytheranostics/dicomtools/dicom_organizer.py +++ b/pytheranostics/dicomtools/dicom_organizer.py @@ -311,13 +311,21 @@ def _read_minimal(dcm_path: Path) -> Optional[pydicom.Dataset]: if src_dir.exists() and not any(src_dir.iterdir()): src_dir.rmdir() except Exception: - pass + logger.warning( + "Failed to remove source directory %s during cleanup", + src_dir, + exc_info=True, + ) try: parent1 = src_dir.parent if parent1.exists() and not any(parent1.iterdir()): parent1.rmdir() except Exception: - pass + logger.warning( + "Failed to remove parent directory %s during cleanup", + parent1, + exc_info=True, + ) return results