Skip to content

fix_worker_issue#570

Merged
Nicholas-Schaub merged 17 commits intoPolusAI:masterfrom
hamshkhawar:fix_worker_omeconverter
Mar 6, 2026
Merged

fix_worker_issue#570
Nicholas-Schaub merged 17 commits intoPolusAI:masterfrom
hamshkhawar:fix_worker_omeconverter

Conversation

@hamshkhawar
Copy link
Member

Toil throws an error if the minimum cpu workers are more than one. So this is a fix for running jobs with Toil

ndonyapour added a commit to ndonyapour/image-tools that referenced this pull request Feb 27, 2026
Copy link
Contributor

@Nicholas-Schaub Nicholas-Schaub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It looks like we removed preadator. I think that's okay, but let's not lose the focus of what preadator was doing. It allowed multiple images to be processed in parallel using multiple processes with each BioReader/BioWriter pair using multiple threads. In this code, there seems to be no distinction between the two.
  2. It's unclear to me why we try to do a full direct read and write of a file, and then if theres an error fall back to a tiled read. Even when we do a tiled read, we still load the full image into memory by storing it in "final_image", which defeats the purpose of doing tiled reads and writes. Then we don't even do a tiled write.

I realize this change was made somewhere along the way, but we cannot lose one of the primary criteria why this tool exists: a scalable way to convert images to ome format that can run more or less on any hardware.

) as executor:
threads = []
# Use ProcessPoolExecutor for multiprocessing
with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure to draw a distinction between number of thread and number of processes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added ENV variables separately


if platform.startswith("linux"):
NUM_THREADS = len(os.sched_getaffinity(0)) // 2 # type: ignore
if not NUM_THREADS_ENV or NUM_THREADS_ENV == "1":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this. If we define the number of threads in the environment as 1, we want to override it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have redefined it. It wasn't overriding the ENV variable but was setting it to half of the cpu cores if not defined

if br.Z > 1:
suffix_parts.append(f"_z{z}")
if num_series > 1:
suffix_parts.append(f"_level_{idx}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to break with how we define other components. For example, "z{z}" instead of "z_{z}".

Should we maybe do "s{idx}"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each series is saved independently, I renamed level to s

"""Process a single view (t,c,z) from a BioReader."""
try:
# Try direct read first
final_image = br[:, :, z, c, t]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a complete rewrite of our scalable read/write algorithm. We need to restore our previous method because it was designed for scalability.

),
try:
# Explicitly set the series/level parameter when opening the file
with BioReader(inp_image, max_workers=NUM_THREADS, level=idx) as br:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember, workers here is actually threads and not processes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two separate ENV variable now set to define the
NUM_WORKERS for defining separate processes
NUM_THREADS for each processes uses internally

Comment on lines +199 to +205
write_image(
br=br,
c=c,
image=final_image,
out_path=out_path,
max_workers=NUM_THREADS,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Reading and writing a whole plane is not viable for certain image types like whole brain slices, pathology, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 250 to 256
@@ -141,6 +254,14 @@ def convert_image(
out_path=out_path,
max_workers=NUM_THREADS,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this isn't your code, but we shouldn't be trying to store the entire image output in memory when we may have memory bound nodes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now writing each tile separately

Comment on lines +290 to +300
with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor:
futures = []
for files in fps():
file = files[1][0]
threads.append(
executor.submit(convert_image, file, file_extension, out_dir),
)
futures.append(executor.submit(convert_image, file, POLUS_IMG_EXT, out_dir))

for f in tqdm(
as_completed(threads),
total=len(threads),
as_completed(futures),
total=len(futures),
mininterval=5,
desc=f"converting images to {file_extension}",
desc=f"converting images to {POLUS_IMG_EXT}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems redundant with what's in the main typer function. It seems like we should maybe remove the code in the main typer function and just call "batch_convert"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@hamshkhawar hamshkhawar force-pushed the fix_worker_omeconverter branch from 55e95ee to 745b2c2 Compare March 2, 2026 17:30
Copy link
Member Author

@hamshkhawar hamshkhawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nicholas-Schaub
I’ve addressed the comments and updated the dependencies and Python version. It appears some dependencies (e.g., Typer and others) require Python 3.10 and are not installed under Python 3.9 in GitHub Actions, which is causing the failures. We likely need to update the Python version used in the GitHub Actions workflow.

),
try:
# Explicitly set the series/level parameter when opening the file
with BioReader(inp_image, max_workers=NUM_THREADS, level=idx) as br:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two separate ENV variable now set to define the
NUM_WORKERS for defining separate processes
NUM_THREADS for each processes uses internally


if platform.startswith("linux"):
NUM_THREADS = len(os.sched_getaffinity(0)) // 2 # type: ignore
if not NUM_THREADS_ENV or NUM_THREADS_ENV == "1":
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have redefined it. It wasn't overriding the ENV variable but was setting it to half of the cpu cores if not defined

) as executor:
threads = []
# Use ProcessPoolExecutor for multiprocessing
with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added ENV variables separately

if br.Z > 1:
suffix_parts.append(f"_z{z}")
if num_series > 1:
suffix_parts.append(f"_level_{idx}")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each series is saved independently, I renamed level to s

Comment on lines +290 to +300
with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor:
futures = []
for files in fps():
file = files[1][0]
threads.append(
executor.submit(convert_image, file, file_extension, out_dir),
)
futures.append(executor.submit(convert_image, file, POLUS_IMG_EXT, out_dir))

for f in tqdm(
as_completed(threads),
total=len(threads),
as_completed(futures),
total=len(futures),
mininterval=5,
desc=f"converting images to {file_extension}",
desc=f"converting images to {POLUS_IMG_EXT}",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 250 to 256
@@ -141,6 +254,14 @@ def convert_image(
out_path=out_path,
max_workers=NUM_THREADS,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now writing each tile separately

Comment on lines +199 to +205
write_image(
br=br,
c=c,
image=final_image,
out_path=out_path,
max_workers=NUM_THREADS,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@Nicholas-Schaub Nicholas-Schaub merged commit c36895b into PolusAI:master Mar 6, 2026
3 of 4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants