Conversation
Nicholas-Schaub
left a comment
There was a problem hiding this comment.
- It looks like we removed preadator. I think that's okay, but let's not lose the focus of what preadator was doing. It allowed multiple images to be processed in parallel using multiple processes with each BioReader/BioWriter pair using multiple threads. In this code, there seems to be no distinction between the two.
- It's unclear to me why we try to do a full direct read and write of a file, and then if theres an error fall back to a tiled read. Even when we do a tiled read, we still load the full image into memory by storing it in "final_image", which defeats the purpose of doing tiled reads and writes. Then we don't even do a tiled write.
I realize this change was made somewhere along the way, but we cannot lose one of the primary criteria why this tool exists: a scalable way to convert images to ome format that can run more or less on any hardware.
| ) as executor: | ||
| threads = [] | ||
| # Use ProcessPoolExecutor for multiprocessing | ||
| with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: |
There was a problem hiding this comment.
We should make sure to draw a distinction between number of thread and number of processes.
There was a problem hiding this comment.
I added ENV variables separately
|
|
||
| if platform.startswith("linux"): | ||
| NUM_THREADS = len(os.sched_getaffinity(0)) // 2 # type: ignore | ||
| if not NUM_THREADS_ENV or NUM_THREADS_ENV == "1": |
There was a problem hiding this comment.
I'm not sure I understand this. If we define the number of threads in the environment as 1, we want to override it?
There was a problem hiding this comment.
I have redefined it. It wasn't overriding the ENV variable but was setting it to half of the cpu cores if not defined
| if br.Z > 1: | ||
| suffix_parts.append(f"_z{z}") | ||
| if num_series > 1: | ||
| suffix_parts.append(f"_level_{idx}") |
There was a problem hiding this comment.
This seems to break with how we define other components. For example, "z{z}" instead of "z_{z}".
Should we maybe do "s{idx}"?
There was a problem hiding this comment.
Each series is saved independently, I renamed level to s
| """Process a single view (t,c,z) from a BioReader.""" | ||
| try: | ||
| # Try direct read first | ||
| final_image = br[:, :, z, c, t] |
There was a problem hiding this comment.
This is a complete rewrite of our scalable read/write algorithm. We need to restore our previous method because it was designed for scalability.
| ), | ||
| try: | ||
| # Explicitly set the series/level parameter when opening the file | ||
| with BioReader(inp_image, max_workers=NUM_THREADS, level=idx) as br: |
There was a problem hiding this comment.
Remember, workers here is actually threads and not processes.
There was a problem hiding this comment.
I have two separate ENV variable now set to define the
NUM_WORKERS for defining separate processes
NUM_THREADS for each processes uses internally
| write_image( | ||
| br=br, | ||
| c=c, | ||
| image=final_image, | ||
| out_path=out_path, | ||
| max_workers=NUM_THREADS, | ||
| ) |
There was a problem hiding this comment.
Same as above. Reading and writing a whole plane is not viable for certain image types like whole brain slices, pathology, etc.
| @@ -141,6 +254,14 @@ def convert_image( | |||
| out_path=out_path, | |||
| max_workers=NUM_THREADS, | |||
| ) | |||
There was a problem hiding this comment.
I realize this isn't your code, but we shouldn't be trying to store the entire image output in memory when we may have memory bound nodes.
There was a problem hiding this comment.
I am now writing each tile separately
| with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: | ||
| futures = [] | ||
| for files in fps(): | ||
| file = files[1][0] | ||
| threads.append( | ||
| executor.submit(convert_image, file, file_extension, out_dir), | ||
| ) | ||
| futures.append(executor.submit(convert_image, file, POLUS_IMG_EXT, out_dir)) | ||
|
|
||
| for f in tqdm( | ||
| as_completed(threads), | ||
| total=len(threads), | ||
| as_completed(futures), | ||
| total=len(futures), | ||
| mininterval=5, | ||
| desc=f"converting images to {file_extension}", | ||
| desc=f"converting images to {POLUS_IMG_EXT}", |
There was a problem hiding this comment.
This code seems redundant with what's in the main typer function. It seems like we should maybe remove the code in the main typer function and just call "batch_convert"
55e95ee to
745b2c2
Compare
hamshkhawar
left a comment
There was a problem hiding this comment.
@Nicholas-Schaub
I’ve addressed the comments and updated the dependencies and Python version. It appears some dependencies (e.g., Typer and others) require Python 3.10 and are not installed under Python 3.9 in GitHub Actions, which is causing the failures. We likely need to update the Python version used in the GitHub Actions workflow.
| ), | ||
| try: | ||
| # Explicitly set the series/level parameter when opening the file | ||
| with BioReader(inp_image, max_workers=NUM_THREADS, level=idx) as br: |
There was a problem hiding this comment.
I have two separate ENV variable now set to define the
NUM_WORKERS for defining separate processes
NUM_THREADS for each processes uses internally
|
|
||
| if platform.startswith("linux"): | ||
| NUM_THREADS = len(os.sched_getaffinity(0)) // 2 # type: ignore | ||
| if not NUM_THREADS_ENV or NUM_THREADS_ENV == "1": |
There was a problem hiding this comment.
I have redefined it. It wasn't overriding the ENV variable but was setting it to half of the cpu cores if not defined
| ) as executor: | ||
| threads = [] | ||
| # Use ProcessPoolExecutor for multiprocessing | ||
| with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: |
There was a problem hiding this comment.
I added ENV variables separately
| if br.Z > 1: | ||
| suffix_parts.append(f"_z{z}") | ||
| if num_series > 1: | ||
| suffix_parts.append(f"_level_{idx}") |
There was a problem hiding this comment.
Each series is saved independently, I renamed level to s
| with ProcessPoolExecutor(max_workers=NUM_THREADS) as executor: | ||
| futures = [] | ||
| for files in fps(): | ||
| file = files[1][0] | ||
| threads.append( | ||
| executor.submit(convert_image, file, file_extension, out_dir), | ||
| ) | ||
| futures.append(executor.submit(convert_image, file, POLUS_IMG_EXT, out_dir)) | ||
|
|
||
| for f in tqdm( | ||
| as_completed(threads), | ||
| total=len(threads), | ||
| as_completed(futures), | ||
| total=len(futures), | ||
| mininterval=5, | ||
| desc=f"converting images to {file_extension}", | ||
| desc=f"converting images to {POLUS_IMG_EXT}", |
| @@ -141,6 +254,14 @@ def convert_image( | |||
| out_path=out_path, | |||
| max_workers=NUM_THREADS, | |||
| ) | |||
There was a problem hiding this comment.
I am now writing each tile separately
| write_image( | ||
| br=br, | ||
| c=c, | ||
| image=final_image, | ||
| out_path=out_path, | ||
| max_workers=NUM_THREADS, | ||
| ) |
Toil throws an error if the minimum cpu workers are more than one. So this is a fix for running jobs with Toil