Skip to content
Snippets Groups Projects
Commit 85014a4b authored by fima's avatar fima :beers:
Browse files

Merge branch 'omezarr-progressbar' into 'main'

OME-ZARR Progress bar

See merge request !118
parents a38d5a03 a7bf4f42
No related branches found
No related tags found
1 merge request!118OME-ZARR Progress bar
......@@ -26,7 +26,7 @@ import qim3d
from qim3d.utils.logger import log
from qim3d.utils.misc import get_file_size, sizeof, stringify_path
from qim3d.utils.system import Memory
from qim3d.utils import ProgressBar
from qim3d.utils.progress_bar import FileLoadingProgressBar
import trimesh
dask.config.set(scheduler="processes")
......@@ -824,7 +824,7 @@ def load(
)
if progress_bar and os.name == 'posix':
with ProgressBar(path):
with FileLoadingProgressBar(path):
data = loader.load(path)
else:
data = loader.load(path)
......
......@@ -3,8 +3,12 @@ Exporting data to different formats.
"""
import os
import math
import shutil
import numpy as np
import zarr
import tqdm
from ome_zarr.io import parse_url
from ome_zarr.writer import (
write_image,
......@@ -16,10 +20,6 @@ from ome_zarr.writer import (
from ome_zarr.scale import dask_resize
from ome_zarr.reader import Reader
from ome_zarr import scale
import math
import shutil
from qim3d.utils.logger import log
from scipy.ndimage import zoom
from typing import Any, Callable, Iterator, List, Tuple, Union
import dask.array as da
......@@ -27,6 +27,10 @@ from skimage.transform import (
resize,
)
from qim3d.utils.logger import log
from qim3d.utils.progress_bar import OmeZarrExportProgressBar
from qim3d.utils.ome_zarr import get_n_chunks
ListOfArrayLike = Union[List[da.Array], List[np.ndarray]]
ArrayLike = Union[da.Array, np.ndarray]
......@@ -79,6 +83,8 @@ def export_ome_zarr(
order=0,
replace=False,
method="scaleZYX",
progress_bar:bool = True,
progress_bar_repeat_time = "auto",
):
"""
Export image data to OME-Zarr format with pyramidal downsampling.
......@@ -92,7 +98,7 @@ def export_ome_zarr(
downsample_rate (int, optional): Factor by which to downsample the data for each scale. Must be greater than 1. Defaults to 2.
order (int, optional): Interpolation order to use when downsampling. Defaults to 0 (nearest-neighbor).
replace (bool, optional): Whether to replace the existing directory if it already exists. Defaults to False.
progress_bar (bool, optional): Whether to display progress while writing data to disk. Defaults to True.
Raises:
ValueError: If the directory already exists and `replace` is False.
ValueError: If `downsample_rate` is less than or equal to 1.
......@@ -145,14 +151,23 @@ def export_ome_zarr(
mip, axes = _create_mip(image=data, fmt=fmt, scaler=scaler, axes="zyx")
log.info("Writing data to disk")
write_multiscale(
mip,
kwargs = dict(
pyramid=mip,
group=root,
fmt=fmt,
axes=axes,
name=None,
compute=True,
)
if progress_bar:
n_chunks = get_n_chunks(
shapes = (scaled_data.shape for scaled_data in mip),
dtypes = (scaled_data.dtype for scaled_data in mip)
)
with OmeZarrExportProgressBar(path = path, n_chunks = n_chunks, reapeat_time=progress_bar_repeat_time):
write_multiscale(**kwargs)
else:
write_multiscale(**kwargs)
log.info("All done!")
return
......
from . import doi
from .progress_bar import ProgressBar
from .system import Memory
from .misc import (
......
from zarr.util import normalize_chunks, normalize_dtype, normalize_shape
import numpy as np
def get_chunk_size(shape:tuple, dtype):
"""
How the chunk size is computed in zarr.storage.init_array_metadata which is ran in the chain of functions we use
in qim3d.io.export_ome_zarr function
Parameters
----------
- shape: shape of the data
- dtype: dtype of the data
"""
object_codec = None
dtype, object_codec = normalize_dtype(dtype, object_codec)
shape = normalize_shape(shape) + dtype.shape
dtype = dtype.base
chunks = None
chunks = normalize_chunks(chunks, shape, dtype.itemsize)
return chunks
def get_n_chunks(shapes:tuple, dtypes:tuple):
"""
Estimates how many chunks we will use in advence so we can pass this number to a progress bar and track how many
have been already written to disk
Parameters
----------
- shapes: list of shapes of the data for each scale
- dtype: dtype of the data
"""
n_chunks = 0
for shape, dtype in zip(shapes, dtypes):
chunk_size = np.array(get_chunk_size(shape, dtype))
shape = np.array(shape)
ratio = shape/chunk_size
n_chunks += np.prod(ratio)
return int(n_chunks)
from threading import Timer
import psutil
import sys
import os
from abc import ABC, abstractmethod
from tqdm.auto import tqdm
......@@ -21,21 +23,69 @@ class RepeatTimer(Timer):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
class ProgressBar(ABC):
def __init__(self,tqdm_kwargs:dict, repeat_time: float, *args, **kwargs):
"""
Context manager for ('with' statement) to track progress during a long progress over
which we don't have control (like loading a file) and thus can not insert the tqdm
updates into loop
Thus we have to run parallel thread with forced activation to check the state
Parameters:
------------
- tqdm_kwargs (dict): Passed directly to tqdm constructor
- repeat_time (float): How often the timer runs the function (in seconds)
"""
self.timer = RepeatTimer(repeat_time, self.update_pbar)
self.pbar = tqdm(**tqdm_kwargs)
self.last_update = 0
def update_pbar(self):
new_update = self.get_new_update()
update = new_update - self.last_update
try:
self.pbar.update(update)
except (
AttributeError
): # When we leave the context manager, we delete the pbar so it can not be updated anymore
# It's because it takes quite a long time for the timer to end and might update the pbar
# one more time before ending which messes up the whole thing
pass
self.last_update = new_update
@abstractmethod
def get_new_update(self):
pass
def __enter__(self):
self.timer.start()
def __exit__(self, exception_type, exception_value, exception_traceback):
self.timer.cancel()
self.pbar.clear()
self.pbar.n = self.pbar.total
self.pbar.display()
del self.pbar # So the update process can not update it anymore
class ProgressBar:
class FileLoadingProgressBar(ProgressBar):
def __init__(self, filename: str, repeat_time: float = 0.5, *args, **kwargs):
"""
Creates class for 'with' statement to track progress during loading a file into memory
Context manager ('with' statement) to track progress during loading a file into memory
Parameters:
------------
- filename (str): to get size of the file
- repeat_time (float, optional): How often the timer checks how many bytes were loaded. Even if very small,
it doesn't make the progress bar smoother as there are only few visible changes in number of read_chars.
Defaults to 0.25
Defaults to 0.5
"""
self.timer = RepeatTimer(repeat_time, self.memory_check)
self.pbar = tqdm(
tqdm_kwargs = dict(
total=get_file_size(filename),
desc="Loading: ",
unit="B",
......@@ -45,33 +95,77 @@ class ProgressBar:
bar_format="{l_bar}{bar}| {n_fmt}{unit}/{total_fmt}{unit} [{elapsed}<{remaining}, "
"{rate_fmt}{postfix}]",
)
self.last_memory = 0
super().__init__( tqdm_kwargs, repeat_time)
self.process = psutil.Process()
def memory_check(self):
def get_new_update(self):
counters = self.process.io_counters()
try:
memory = counters.read_chars
except AttributeError:
memory = counters.read_bytes + counters.other_bytes
return memory
try:
self.pbar.update(memory - self.last_memory)
except (
AttributeError
): # When we leave the context manager, we delete the pbar so it can not be updated anymore
# It's because it takes quite a long time for the timer to end and might update the pbar
# one more time before ending which messes up the whole thing
pass
class OmeZarrExportProgressBar(ProgressBar):
def __init__(self,path:str, n_chunks:int, reapeat_time="auto"):
"""
Context manager to track the exporting of OmeZarr files.
self.last_memory = memory
Parameters
----------
path : str
The folder path where the files will be saved.
n_chunks : int
The total number of chunks to track.
repeat_time : int or float, optional
The interval (in seconds) for updating the progress bar. Defaults to "auto", which
sets the update frequency based on the number of chunks.
"""
def __enter__(self):
self.timer.start()
def __exit__(self, exception_type, exception_value, exception_traceback):
self.timer.cancel()
self.pbar.clear()
self.pbar.n = self.pbar.total
self.pbar.display()
del self.pbar # So the update process can not update it anymore
# Calculate the repeat time for the progress bar
if reapeat_time == "auto":
# Approximate the repeat time based on the number of chunks
# This ratio is based on reading the HOA dataset over the network:
# 620,000 files took 300 seconds to read
# The ratio is little smaller than needed to avoid disk stress
reapeat_time = n_chunks / 1500
else:
reapeat_time = float(reapeat_time)
# We don't want to update the progress bar too often anyway
if reapeat_time < 0.5:
reapeat_time = 0.5
self.path = path
tqdm_kwargs = dict(
total = n_chunks,
unit = "Chunks",
desc = "Saving",
unit_scale = True
)
super().__init__(tqdm_kwargs, reapeat_time)
self.last_update = 0
def get_new_update(self):
def file_count(folder_path:str):
"""
Goes recursively through the folders and counts how many files are there,
Doesn't count metadata json files
"""
count = 0
for path in os.listdir(folder_path):
new_path = os.path.join(folder_path, path)
if os.path.isfile(new_path):
filename = os.path.basename(os.path.normpath(new_path))
if not filename.startswith("."):
count += 1
else:
count += file_count(new_path)
return count
return file_count(self.path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment