Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • QIM/tools/qim3d
1 result
Select Git revision
Loading items
Show changes
Commits on Source (14)
%% Cell type:markdown id: tags:
# Image annotation tool
This notebook shows how the annotation interface can be used to create masks for images
%% Cell type:code id: tags:
``` python
import qim3d
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
# Load 2D example image
img = qim3d.examples.blobs_256x256
# Display image
plt.imshow(img)
plt.show()
```
%% Cell type:code id: tags:
``` python
# Start annotation tool
interface = qim3d.gui.annotation_tool.Interface()
interface.max_masks = 4
# We can directly pass the image we loaded to the interface
interface.launch(img=img)
```
%% Cell type:code id: tags:
``` python
# When 'prepare mask for download' is pressed once, the mask can be retrieved with the get_result() method
mask = interface.get_result()
```
%% Cell type:markdown id: tags:
## Check the obtained mask
%% Cell type:code id: tags:
``` python
print (f"Original image shape..: {img.shape}")
print (f"Mask image shape......: {mask.shape}")
print (f"\nNumber of masks: {np.max(mask)}")
```
%% Cell type:markdown id: tags:
## Show the masked regions
%% Cell type:code id: tags:
``` python
%matplotlib inline
nmasks = np.max(mask)
fig, axs = plt.subplots(nrows=1, ncols=nmasks+2, figsize=(12,3))
# Show original image
axs[0].imshow(img)
axs[0].set_title("Original")
axs[0].axis('off')
# Show masks
cmap = mpl.colormaps["rainbow"].copy()
cmap.set_under(color='black') # Sets the background to black
axs[1].imshow(mask, interpolation='none', cmap=cmap, vmin=1, vmax=nmasks+1)
axs[1].set_title("Masks")
axs[1].axis('off')
# Show masked regions
for idx in np.arange(2, nmasks+2):
mask_id = idx-1
submask = mask.copy()
submask[submask != mask_id] = 0
masked_img = img.copy()
masked_img[submask==0] = 0
axs[idx].imshow(masked_img)
axs[idx].set_title(f"Mask {mask_id}")
axs[idx].axis('off')
plt.show()
```
import qim3d.io
import qim3d.gui
import qim3d.viz
import qim3d.utils
import qim3d.models
import qim3d.io as io
import qim3d.gui as gui
import qim3d.viz as viz
import qim3d.utils as utils
import qim3d.models as models
import logging
examples = qim3d.io.ImgExamples()
\ No newline at end of file
examples = io.ImgExamples()
downloader = io.Downloader()
from . import data_explorer
from . import iso3d
from . import local_thickness
from . import annotation_tool
\ No newline at end of file
import tifffile
import os
import numpy as np
import gradio as gr
from qim3d.io import load # load or DataLoader?
class Interface:
def __init__(self):
self.verbose = False
self.title = "Annotation tool"
#self.plot_height = 768
self.height = 1024
#self.width = 960
self.max_masks = 3
self.mask_opacity = 0.5
self.cmy_hex = ['#00ffff','#ff00ff','#ffff00'] # Colors for max_masks>3?
# CSS path
current_dir = os.path.dirname(os.path.abspath(__file__))
self.css_path = os.path.join(current_dir, "..", "css", "gradio.css")
def launch(self, img=None, **kwargs):
# Create gradio interfaces
self.interface = self.create_interface(img=img)
# Set gradio verbose level
if self.verbose:
quiet = False
else:
quiet = True
self.interface.launch(
quiet=quiet,
height=self.height,
#width=self.width,
show_tips=False,
**kwargs
)
return
def get_result(self):
# Get the temporary files from gradio
temp_sets = self.interface.temp_file_sets
for temp_set in temp_sets:
if "mask" in str(temp_set):
# Get the list of the temporary files
temp_path_list = list(temp_set)
# Files are not in creation order,
# so we need to get find the latest
creation_time_list = []
for path in temp_path_list:
creation_time_list.append(os.path.getctime(path))
# Get index for the latest file
file_idx = np.argmax(creation_time_list)
# Load the temporary file
mask = load(temp_path_list[file_idx])
return mask
def create_interface(self, img=None):
with gr.Blocks(css=self.css_path) as gradio_interface:
masks_state = gr.State(value={})
counts = gr.Number(value=1,visible=False)
with gr.Row():
with gr.Column(scale=1,min_width=320):
upload_img_btn = gr.UploadButton(
label='Upload image',
file_types=['image'],
interactive=True if img is None else False
)
clear_img_btn = gr.Button(
value='Clear image',
interactive=False if img is None else True
)
with gr.Row():
with gr.Column(scale=2,min_width=32):
selected_mask = gr.Radio(
choices = ["Mask 1"],
value = "Mask 1",
label="Choose which mask to draw",
scale=1
)
with gr.Column(scale=1,min_width=64):
add_mask_btn = gr.Button(
value='Add mask',
scale=2,
)
with gr.Row():
prep_dl_btn = gr.Button(
value='Prepare mask for download',
visible=False if img is None else True
)
with gr.Row():
save_output = gr.File(
show_label=True,
label="Output file",
visible=False,
)
with gr.Column(scale=4):
with gr.Row():
input_img = gr.Image(
label="Input",
tool='sketch',
value=img,
height=600,
width=600,
brush_color='#00ffff',
mask_opacity=self.mask_opacity,
interactive=False if img is None else True
)
output_masks = []
for mask_idx in range(self.max_masks):
with gr.Row(): # make a new row for every mask
output_mask=gr.Image(
label=f"Mask {mask_idx+1}",
visible=True if mask_idx==0 else False,
image_mode='L',
height=600,
width=600,
interactive=False if img is None else True, # If statement added bc of bug after Gradio 3.44.x
show_download_button=False
)
output_masks.append(output_mask)
# Operations
operations = Operations(max_masks=self.max_masks,cmy_hex=self.cmy_hex)
# Update component configuration when image is uploaded
upload_img_btn.upload(fn=operations.upload_img_update,
inputs=upload_img_btn,
outputs=[input_img,clear_img_btn,upload_img_btn,prep_dl_btn] + output_masks
)
# Add mask below when 'add mask' button is clicked
add_mask_btn.click(
fn=operations.increment_mask,
inputs=counts,
outputs=[counts, selected_mask] + output_masks
)
# Draw mask when input image is edited
input_img.edit(
fn=operations.update_masks,
inputs=[input_img,selected_mask,masks_state,upload_img_btn],
outputs=output_masks
)
# Update brush color according to radio setting
selected_mask.change(
fn=operations.update_brush_color,
inputs=selected_mask,outputs=input_img
)
# Make file download visible
prep_dl_btn.click(
fn=operations.save_mask,
inputs=output_masks,
outputs=[save_output,save_output]
)
# Update 'Add mask' button interactivit according to the current count
counts.change(
fn=operations.set_add_mask_btn_interactivity,
inputs=counts,
outputs=add_mask_btn
)
# Reset component configuration when image is cleared
clear_img_btn.click(
fn=operations.clear_img_update,
inputs=None,
outputs=[selected_mask,prep_dl_btn,save_output,counts,input_img,upload_img_btn,clear_img_btn] + output_masks
)
return gradio_interface
class Operations:
def __init__(self, max_masks, cmy_hex):
self.max_masks = max_masks
self.cmy_hex = cmy_hex
def update_masks(self,input_img,selected_mask,masks_state,file):
# Binarize mask (it is not per default due to anti-aliasing)
input_mask = input_img['mask']
input_mask[input_mask>0]=255
try:
file_name = file.name
except AttributeError:
file_name = 'nb_img'
# Add new file to state dictionary when this function sees it first time
if file_name not in masks_state.keys():
masks_state[file_name]=[[] for _ in range(self.max_masks)]
# Get index of currently selected and non-selected masks
sel_mask_idx = int(selected_mask[-1])-1
nonsel_mask_idxs = [mask_idx for mask_idx in list(range(self.max_masks)) if mask_idx != sel_mask_idx]
# Add background to state first time function is invoked in current session
if len(masks_state[file_name][0])==0:
for i in range(len(masks_state[file_name])):
masks_state[file_name][i].append(input_mask)
# Check for discrepancy between what is drawn and what is shown as output masks
masks_state_combined = 0
for i in range(len(masks_state[file_name])):
masks_state_combined+=masks_state[file_name][i][-1]
discrepancy = masks_state_combined!=input_mask
if np.any(discrepancy): # Correct discrepancy in output masks
for i in range(self.max_masks):
masks_state[file_name][i][-1][discrepancy]=0
# Add most recent change in input to currently selected mask
mask2append = input_mask
for mask_idx in nonsel_mask_idxs:
mask2append -= masks_state[file_name][mask_idx][-1]
masks_state[file_name][sel_mask_idx].append(mask2append)
return [masks_state[file_name][i][-1] for i in range(self.max_masks)]
def save_mask(self,*masks):
# Go from multi-channel to single-channel mask
stacked_masks = np.stack(masks,axis=-1)
final_mask = np.zeros_like(masks[0])
final_mask[np.where(stacked_masks==255)[:2]]=np.where(stacked_masks==255)[-1]+1
# Save output image in a temp space (and to current directory which is a bug)
filename = "mask.tif"
tifffile.imwrite(filename,final_mask)
save_output_update = gr.File(visible=True)
return save_output_update, filename
def increment_mask(self,counts):
# increment count by 1
counts+=1
counts=int(counts)
counts_update = gr.Number(value=counts)
selected_mask_update = gr.Radio(value = f"Mask {counts}", choices = [f"Mask {i+1}" for i in range(counts)])
output_masks_update = [gr.Image(visible=True)]*counts + [gr.Image(visible=False)]*(self.max_masks-counts)
return [counts_update, selected_mask_update] + output_masks_update
def update_brush_color(self,selected_mask):
sel_mask_idx = int(selected_mask[-1])-1
if sel_mask_idx<len(self.cmy_hex):
input_img_update = gr.Image(brush_color=self.cmy_hex[sel_mask_idx])
else:
input_img_update = gr.Image(brush_color='#000000') # Return black brush
return input_img_update
def set_add_mask_btn_interactivity(self,counts):
add_mask_btn_update = gr.Button(interactive=True) if counts<self.max_masks else gr.Button(interactive=False)
return add_mask_btn_update
def clear_img_update(self):
selected_mask_update = gr.Radio(choices = ["Mask 1"], value = "Mask 1") # Reset radio component to only show 'Mask 1'
prep_dl_btn_update = gr.Button(visible=False) # Make 'Prepare mask for download' button invisible
save_output_update = gr.File(visible=False) # Make File save box invisible
counts_update = gr.Number(value=1) # Reset invisible counter to 1
input_img_update = gr.Image(value=None,interactive=False) # Set input image component to non-interactive (so a new image cannot be uploaded directly in the component)
upload_img_btn_update = gr.Button(interactive=True) # Make 'Upload image' button interactive
clear_img_btn_update = gr.Button(interactive=False) # Make 'Clear image' button non-interactive
output_masks_update = [gr.Image(value=None,visible=True if i==0 else False,interactive=False) for i in range(self.max_masks)] # Remove drawn masks and set as invisible except mask 1. 'interactive=False' added bc of bug after Gradio 3.44.x
return [selected_mask_update,
prep_dl_btn_update,
save_output_update,
counts_update,
input_img_update,
upload_img_btn_update,
clear_img_btn_update] + output_masks_update
def upload_img_update(self,file):
input_img_update = gr.Image(value=load(file.name),interactive=True) # Upload image from button to Image components
clear_img_btn_update = gr.Button(interactive=True) # Make 'Clear image' button interactive
upload_img_btn_update = gr.Button(interactive=False) # Make 'Upload image' button non-interactive
prep_dl_btn_update = gr.Button(visible=True) # Make 'Prepare mask for download' button visible
output_masks_update = [gr.Image(interactive=True)]*self.max_masks # This line is added bc of bug in Gradio after 3.44.x
return [input_img_update,
clear_img_btn_update,
upload_img_btn_update,
prep_dl_btn_update] + output_masks_update
\ No newline at end of file
from .downloader import Downloader
from .load import DataLoader, load, ImgExamples
from .save import save
from .save import DataSaver, save
from .sync import Sync
from . import logger
\ No newline at end of file
"Class for downloading larger images from the QIM Data Repository"
import os
import urllib.request
from urllib.parse import quote
from tqdm import tqdm
from pathlib import Path
from qim3d.io.load import load
from qim3d.io.logger import log
import outputformat as ouf
class Downloader:
"""Class for downloading large data files available on the QIM data repository.
Attributes:
[folder_name_1] (str): folder class with the name of the first folder in the QIM data repository.
[folder_name_2] (str): folder class with the name of the second folder in the QIM data repository.
...
[folder_name_n] (str): folder class with the name of the n-th folder in the QIM data repository.
Example:
dl = Downloader()
# Downloads and Loads (optional) image:
img = dl.Corals.Coral2_DOWNSAMPLED(load = True)
"""
def __init__(self):
folders = _extract_names()
for idx, folder in enumerate(folders):
exec(f"self.{folder} = self._Myfolder(folder)")
class _Myfolder:
"""Class for extracting the files from each folder in the Downloader class.
Args:
folder(str): name of the folder of interest in the QIM data repository.
Methods:
_make_fn(folder,file): creates custom functions for each file found in the folder.
[file_name_1](load_file,optional): Function to download file number 1 in the given folder.
[file_name_2](load_file,optional): Function to download file number 2 in the given folder.
...
[file_name_n](load_file,optional): Function to download file number n in the given folder.
"""
def __init__(self, folder):
files = _extract_names(folder)
for idx, file in enumerate(files):
# Changes names to usable function name.
file_name = file
if ("%20" in file) or ("-" in file):
file_name = file_name.replace("%20", "_")
file_name = file_name.replace("-", "_")
setattr(self, f'{file_name.split(".")[0]}', self._make_fn(folder, file))
def _make_fn(self, folder, file):
"""Private method that returns a function. The function downloads the chosen file from the folder.
Args:
folder(str): Folder where the file is located.
file(str): Name of the file to be downloaded.
Returns:
function: the function used to download the file.
"""
url_dl = "https://archive.compute.dtu.dk/download/public/projects/viscomp_data_repository"
def _download(load_file=False, virtual_stack=True):
"""Downloads the file and optionally also loads it.
Args:
load_file(bool,optional): Whether to simply download or also load the file.
Returns:
virtual_stack: The loaded image.
"""
download_file(url_dl, folder, file)
if load_file == True:
log.info(f"\nLoading {file}")
file_path = os.path.join(folder, file)
return load(path=file_path, virtual_stack=virtual_stack)
return _download
def _update_progress(pbar, blocknum, bs):
"""
Helper function for the ´download_file()´ function. Updates the progress bar.
"""
pbar.update(blocknum * bs - pbar.n)
def _get_file_size(url):
"""
Helper function for the ´download_file()´ function. Finds the size of the file.
"""
return int(urllib.request.urlopen(url).info().get("Content-Length", -1))
def download_file(path, name, file):
"""Downloads the file from path / name / file.
Args:
path(str): path to the folders available.
name(str): name of the folder of interest.
file(str): name of the file to be downloaded.
"""
if not os.path.exists(name):
os.makedirs(name)
url = os.path.join(path, name, file).replace("\\", "/") # if user is on windows
file_path = os.path.join(name, file)
if os.path.exists(file_path):
log.warning(f"File already downloaded:\n{os.path.abspath(file_path)}")
return
else:
log.info(
f"Downloading {ouf.b(file, return_str=True)}\n{os.path.join(path,name,file)}"
)
if " " in url:
url = quote(url, safe=":/")
with tqdm(
total=_get_file_size(url),
unit="B",
unit_scale=True,
unit_divisor=1024,
ncols=80,
) as pbar:
urllib.request.urlretrieve(
url,
file_path,
reporthook=lambda blocknum, bs, total: _update_progress(pbar, blocknum, bs),
)
def _extract_html(url):
"""Extracts the html content of a webpage in "utf-8"
Args:
url(str): url to the location where all the data is stored.
Returns:
html_content(str): decoded html.
"""
try:
with urllib.request.urlopen(url) as response:
html_content = response.read().decode(
"utf-8"
) # Assuming the content is in UTF-8 encoding
except urllib.error.URLError as e:
log.warning(f"Failed to retrieve data from {url}. Error: {e}")
return html_content
def _extract_names(name=None):
"""Extracts the names of the folders and files.
Finds the names of either the folders if no name is given,
or all the names of all files in the given folder.
Args:
name(str,optional): name of the folder from which the names should be extracted.
Returns:
list: If name is None, returns a list of all folders available.
If name is not None, returns a list of all files available in the given 'name' folder.
"""
url = "https://archive.compute.dtu.dk/files/public/projects/viscomp_data_repository"
if name:
datapath = os.path.join(url, name).replace("\\", "/")
html_content = _extract_html(datapath)
data_split = html_content.split(
"files/public/projects/viscomp_data_repository/"
)[3:]
data_files = [
element.split(" ")[0][(len(name) + 1) : -3] for element in data_split
]
return data_files
else:
html_content = _extract_html(url)
split = html_content.split('"icon-folder-open">')[2:]
folders = [element.split(" ")[0][4:-4] for element in split]
return folders
"""Provides functionality for loading data from various file formats."""
import os
import sys
import difflib
import tifffile
import h5py
import nibabel as nib
import numpy as np
from pathlib import Path
import qim3d
from qim3d.io.logger import log
from qim3d.utils.internal_tools import sizeof
from qim3d.utils.internal_tools import sizeof, stringify_path
from qim3d.utils.system import Memory
class DataLoader:
"""Utility class for loading data from different file formats.
Args:
virtual_stack (bool, optional): Specifies whether to use virtual stack
when loading files. Default is False.
Attributes:
virtual_stack (bool): Specifies whether virtual stack is enabled.
dataset_name (str): Specifies the name of the dataset to be loaded
(only relevant for HDF5 files)
return_metadata (bool): Specifies if metadata is returned or not
(only relevant for HDF5, TXRM/TXM/XRM and NIfTI files)
contains (str): Specifies a part of the name that is common for the
TIFF file stack to be loaded (only relevant for TIFF stacks)
Methods:
load_tiff(path): Load a TIFF file from the specified path.
load_h5(path): Load an HDF5 file from the specified path.
load_tiff_stack(path): Load a stack of TIFF files from the specified path.
load_txrm(path): Load a TXRM/TXM/XRM file from the specified path
load(path): Load a file or directory based on the given path.
Raises:
ValueError: If the file format is not supported or the path is invalid.
load(path): Load a file or directory based on the given path
Example:
loader = DataLoader(virtual_stack=True)
loader = qim3d.io.DataLoader(virtual_stack=True)
data = loader.load_tiff("image.tif")
"""
......@@ -42,15 +41,14 @@ class DataLoader:
"""Initializes a new instance of the DataLoader class.
Args:
path (str): The path to the file or directory.
virtual_stack (bool, optional): Specifies whether to use virtual
stack when loading files. Default is False.
dataset_name (str, optional): Specifies the name of the dataset to be loaded
in case multiple dataset exist within the same file. Default is None (only for HDF5 files)
return_metadata (bool, optional): Specifies whether to return metadata or not. Default is False (only for HDF5 files)
contains (str, optional): Specifies a part of the name that is common for the TIFF file stack to be loaded (only for TIFF stacks)
return_metadata (bool, optional): Specifies whether to return metadata or not. Default is False (only for HDF5 and TXRM/TXM/XRM files)
contains (str, optional): Specifies a part of the name that is common for the TIFF file stack to be loaded (only for TIFF stacks).
Default is None.
"""
# Virtual stack is False by default
self.virtual_stack = kwargs.get("virtual_stack", False)
self.dataset_name = kwargs.get("dataset_name", None)
self.return_metadata = kwargs.get("return_metadata", False)
......@@ -83,7 +81,7 @@ class DataLoader:
Returns:
numpy.ndarray or tuple: The loaded volume as a NumPy array.
If 'return_metadata' is True, returns a tuple (volume, metadata).
If 'self.return_metadata' is True, returns a tuple (volume, metadata).
Raises:
ValueError: If the specified dataset_name is not found or is invalid.
......@@ -217,7 +215,7 @@ class DataLoader:
Returns:
numpy.ndarray or tuple: The loaded volume as a NumPy array.
If 'return_metadata' is True, returns a tuple (volume, metadata).
If 'self.return_metadata' is True, returns a tuple (volume, metadata).
Raises:
ValueError: If the dxchange library is not installed
......@@ -247,24 +245,55 @@ class DataLoader:
else:
return vol
def load_nifti(self,path):
"""Load a NIfTI file from the specified path.
Args:
path (str): The path to the NIfTI file.
Returns:
numpy.ndarray or tuple: The loaded volume as a NumPy array.
If 'self.return_metadata' is True, returns a tuple (volume, metadata).
"""
data = nib.load(path)
# Get image array proxy
vol = data.dataobj
if not self.virtual_stack:
vol = np.asarray(vol,dtype=data.get_data_dtype())
if self.return_metadata:
metadata = {}
for key in data.header:
metadata[key]=data.header[key]
return vol, metadata
else:
return vol
def load(self, path):
"""
Load a file or directory based on the given path.
Args:
path (str): The path to the file or directory.
path (str or os.PathLike): The path to the file or directory.
Returns:
numpy.ndarray: The loaded volume as a NumPy array.
Raises:
ValueError: If the format is not supported or the path is invalid.
FileNotFoundError: If the file or directory does not exist.
ValueError: If the format is not supported
ValueError: If the file or directory does not exist.
Example:
loader = DataLoader()
loader = qim3d.io.DataLoader()
data = loader.load("image.tif")
"""
path = stringify_path(path)
# Load a file
if os.path.isfile(path):
# Choose the loader based on the file extension
......@@ -274,6 +303,8 @@ class DataLoader:
return self.load_h5(path)
elif path.endswith((".txrm", ".txm", ".xrm")):
return self.load_txrm(path)
elif path.endswith((".nii",".nii.gz")):
return self.load_nifti(path)
else:
raise ValueError("Unsupported file format")
......@@ -285,7 +316,7 @@ class DataLoader:
else:
# Find the closest matching path to warn the user
parent_dir = os.path.dirname(path) or '.'
parent_files = os.listdir(parent_dir)
parent_files = os.listdir(parent_dir) if os.path.isdir(parent_dir) else ''
valid_paths = [os.path.join(parent_dir, file) for file in parent_files]
similar_paths = difflib.get_close_matches(path, valid_paths)
if similar_paths:
......@@ -314,27 +345,25 @@ def load(
Load data from the specified file or directory.
Args:
path (str): The path to the file or directory.
path (str or os.PathLike): The path to the file or directory.
virtual_stack (bool, optional): Specifies whether to use virtual
stack when loading TIFF and HDF5 files. Default is False.
stack when loading files. Default is False.
dataset_name (str, optional): Specifies the name of the dataset to be loaded
in case multiple dataset exist within the same file. Default is None (only for HDF5 files)
return_metadata (bool, optional): Specifies whether to return metadata or not. Default is False (only for HDF5 and TXRM files)
contains (str, optional): Specifies a part of the name that is common for the TIFF file stack to be loaded (only for TIFF stacks)
return_metadata (bool, optional): Specifies whether to return metadata or not. Default is False (only for HDF5 and TXRM/TXM/XRM files)
contains (str, optional): Specifies a part of the name that is common for the TIFF file stack to be loaded (only for TIFF stacks).
Default is None.
**kwargs: Additional keyword arguments to be passed
to the DataLoader constructor.
Returns:
numpy.ndarray: The loaded volume as a NumPy array.
Raises:
ValueError: If the file format is not supported or the path is invalid.
NotImplementedError: If loading from a directory is not implemented yet.
FileNotFoundError: If the file or directory does not exist.
If 'return_metadata' is True and file format is either HDF5 or TXRM/TXM/XRM, returns a tuple (volume, metadata).
Example:
data = load("image.tif", virtual_stack=True)
data = qim3d.io.load("image.tif", virtual_stack=True)
"""
loader = DataLoader(
virtual_stack=virtual_stack,
dataset_name=dataset_name,
......@@ -365,10 +394,11 @@ class ImgExamples:
def __init__(self):
img_examples_path = Path(qim3d.__file__).parents[0] / "img_examples"
img_paths = list(img_examples_path.glob("*.tif"))
img_names = []
for path in img_paths:
img_names.append(path.stem)
# Generate loader for each image found
for idx, name in enumerate(img_names):
exec(f"self.{name} = qim3d.io.load('{img_paths[idx]}')")
exec(f"self.{name} = qim3d.io.load(path = img_paths[idx])")
"""Provides functionality for saving data to various file formats."""
import os
import tifffile
import numpy as np
from qim3d.io.logger import log
class DataSaver:
def __init__(self):
self.verbose = False
"""Utility class for saving data to different file formats.
Attributes:
replace (bool): Specifies if an existing file with identical path is replaced.
compression (bool): Specifies if the file is with Deflate compression.
Methods:
save_tiff(path,data): Save data to a TIFF file to the given path.
load(path,data): Save data to the given path.
Example:
image = qim3d.examples.blobs_256x256
saver = qim3d.io.DataSaver(compression=True)
saver.save_tiff("image.tif",image)
"""
def __init__(self, **kwargs):
"""Initializes a new instance of the DataSaver class.
Args:
replace (bool, optional): Specifies if an existing file with identical path should be replaced.
Default is False.
compression (bool, optional): Specifies if the file should be saved with Deflate compression.
Default is False.
"""
self.replace = kwargs.get("replace",False)
self.compression = kwargs.get("compression",False)
def save_tiff(self,path,data):
"""Save data to a TIFF file to the given path.
Args:
path (str): The path to save file to
data (numpy.ndarray): The data to be saved
"""
tifffile.imwrite(path,data,compression=self.compression)
def save(self, path, data):
raise NotImplementedError("Save is not implemented yet")
"""Save data to the given path.
Args:
path (str): The path to save file to
data (numpy.ndarray): The data to be saved
Raises:
ValueError: If the file format is not supported.
ValueError: If the specified folder does not exist.
ValueError: If a file extension is not provided.
ValueError: if a file with the specified path already exists and replace=False.
Example:
image = qim3d.examples.blobs_256x256
saver = qim3d.io.DataSaver(compression=True)
saver.save("image.tif",image)
"""
folder = os.path.dirname(path) or '.'
# Check if directory exists
if os.path.isdir(folder):
_, ext = os.path.splitext(path)
# Check if provided path contains file extension
if ext:
# Check if a file with the given path already exists
if os.path.isfile(path) and not self.replace:
raise ValueError("A file with the provided path already exists. To replace it set 'replace=True'")
if path.endswith((".tif",".tiff")):
return self.save_tiff(path,data)
else:
raise ValueError("Unsupported file format")
else:
raise ValueError('Please provide a file extension')
else:
raise ValueError(f'The directory {folder} does not exist. Please provide a valid directory')
def save(path,
data,
replace=False,
compression=False,
**kwargs
):
"""Save data to a specified file path.
Args:
path (str): The path to save file to
data (numpy.ndarray): The data to be saved
replace (bool, optional): Specifies if an existing file with identical path should be replaced.
Default is False.
compression (bool, optional): Specifies if the file should be saved with Deflate compression (lossless).
Default is False.
**kwargs: Additional keyword arguments to be passed to the DataSaver constructor
Example:
image = qim3d.examples.blobs_256x256
qim3d.io.save("image.tif",image,compression=True)
"""
def save(path, data):
return DataSaver().save(path, data)
DataSaver(replace=replace, compression=compression, **kwargs).save(path, data)
\ No newline at end of file
import qim3d
import multiprocessing
import time
def test_starting_class():
app = qim3d.gui.annotation_tool.Interface()
assert app.title == "Annotation tool"
def test_app_launch():
ip = "0.0.0.0"
port = 65432
def start_server(ip, port):
app = qim3d.gui.annotation_tool.Interface()
app.launch(server_name=ip, server_port=port)
proc = multiprocessing.Process(target=start_server, args=(ip, port))
proc.start()
# App is running in a separate process
# So we try to get a response for a while
max_checks = 5
check = 0
server_running = False
while check < max_checks and not server_running:
server_running = qim3d.utils.internal_tools.is_server_running(ip, port)
time.sleep(1)
check += 1
# Terminate tre process before assertions
proc.terminate()
assert server_running is True
import qim3d
import os
def test_download():
folder = 'Cowry_Shell'
file = 'Cowry_DOWNSAMPLED.tif'
path = os.path.join(folder,file)
dl = qim3d.io.Downloader()
dl.Cowry_Shell.Cowry_DOWNSAMPLED()
img = qim3d.io.load(path)
# Remove temp file
os.remove(path)
os.rmdir(folder)
assert img.shape == (500,350,350)
def test_get_file_size_right():
coal_file = 'https://archive.compute.dtu.dk/download/public/projects/viscomp_data_repository/Coal/CoalBrikett.tif'
size = qim3d.io.downloader._get_file_size(coal_file)
assert size == 2_400_082_900
def test_get_file_size_wrong():
file_to_folder = 'https://archive.compute.dtu.dk/files/public/projects/viscomp_data_repository/'
size = qim3d.io.downloader._get_file_size(file_to_folder)
assert size == -1
def test_extract_html():
url = 'https://archive.compute.dtu.dk/files/public/projects/viscomp_data_repository'
html = qim3d.io.downloader._extract_html(url)
assert 'data-path="/files/public/projects/viscomp_data_repository"' in html
import qim3d
import numpy as np
from pathlib import Path
import os
import pytest
# Load blobs volume into memory
vol = qim3d.examples.blobs_256x256
# Ceate memory map to blobs
blobs_path = Path(qim3d.__file__).parents[0] / "img_examples" / "blobs_256x256.tif"
vol_memmap = qim3d.io.load(blobs_path,virtual_stack=True)
def test_load_shape():
assert vol.shape == vol_memmap.shape == (256,256)
def test_load_type():
assert isinstance(vol,np.ndarray)
def test_load_type_memmap():
assert isinstance(vol_memmap,np.memmap)
def test_invalid_path():
invalid_path = os.path.join('this','path','doesnt','exist.tif')
with pytest.raises(ValueError,match='Invalid path'):
qim3d.io.load(invalid_path)
def test_did_you_mean():
# Remove last two characters from the path
blobs_path_misspelled = str(blobs_path)[:-2]
with pytest.raises(ValueError,match=f"Invalid path.\nDid you mean '{blobs_path}'?"):
qim3d.io.load(blobs_path_misspelled)
\ No newline at end of file
import qim3d
import tempfile
import numpy as np
import os
import hashlib
import pytest
def test_image_exist():
# Create random test image
test_image = np.random.randint(0,256,(100,100,100),'uint8')
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,"test_image.tif")
# Save to temporary directory
qim3d.io.save(image_path,test_image)
# Assert that test image has been saved
assert os.path.exists(image_path)
def test_compression():
# Get test image (should not be random in order for compression to function)
test_image = qim3d.examples.blobs_256x256
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,"test_image.tif")
compressed_image_path = os.path.join(temp_dir,"compressed_test_image.tif")
# Save to temporary directory with and without compression
qim3d.io.save(image_path,test_image)
qim3d.io.save(compressed_image_path,test_image,compression=True)
# Compute file sizes
file_size = os.path.getsize(image_path)
compressed_file_size = os.path.getsize(compressed_image_path)
# Assert that compressed file size is smaller than non-compressed file size
assert compressed_file_size < file_size
def test_image_matching():
# Create random test image
original_image = np.random.randint(0,256,(100,100,100),'uint8')
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,"original_image.tif")
# Save to temporary directory
qim3d.io.save(image_path,original_image)
# Load from temporary directory
saved_image = qim3d.io.load(image_path)
# Get hashes
original_hash = calculate_image_hash(original_image)
saved_hash = calculate_image_hash(saved_image)
# Assert that original image is identical to saved_image
assert original_hash == saved_hash
def test_compressed_image_matching():
# Get test image (should not be random in order for compression to function)
original_image = qim3d.examples.blobs_256x256
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,"original_image.tif")
# Save to temporary directory
qim3d.io.save(image_path,original_image,compression=True)
# Load from temporary directory
saved_image_compressed = qim3d.io.load(image_path)
# Get hashes
original_hash = calculate_image_hash(original_image)
compressed_hash = calculate_image_hash(saved_image_compressed)
# Assert that original image is identical to saved_image
assert original_hash == compressed_hash
def test_file_replace():
# Create random test image
test_image1 = np.random.randint(0,256,(100,100,100),'uint8')
test_image2 = np.random.randint(0,256,(100,100,100),'uint8')
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,"test_image.tif")
# Save first test image to temporary directory
qim3d.io.save(image_path,test_image1)
# Get hash
hash1 = calculate_image_hash(qim3d.io.load(image_path))
# Replace existing file
qim3d.io.save(image_path,test_image2,replace=True)
# Get hash again
hash2 = calculate_image_hash(qim3d.io.load(image_path))
# Assert that the file was modified by checking if the second modification time is newer than the first
assert hash1 != hash2
def test_file_already_exists():
# Create random test image
test_image1 = np.random.randint(0,256,(100,100,100),'uint8')
test_image2 = np.random.randint(0,256,(100,100,100),'uint8')
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,"test_image.tif")
# Save first test image to temporary directory
qim3d.io.save(image_path,test_image1)
with pytest.raises(ValueError,match="A file with the provided path already exists. To replace it set 'replace=True'"):
# Try to save another image to the existing path
qim3d.io.save(image_path,test_image2)
def test_no_file_ext():
# Create random test image
test_image = np.random.randint(0,256,(100,100,100),'uint8')
# Create filename without extension
filename = 'test_image'
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,filename)
with pytest.raises(ValueError,match='Please provide a file extension'):
# Try to save the test image to a path witout file extension
qim3d.io.save(image_path,test_image)
def test_folder_doesnt_exist():
# Create random test image
test_image = np.random.randint(0,256,(100,100,100),'uint8')
# Create invalid path
invalid_path = os.path.join('this','path','doesnt','exist.tif')
with pytest.raises(ValueError,match=f'The directory {os.path.dirname(invalid_path)} does not exist. Please provide a valid directory'):
# Try to save test image to an invalid path
qim3d.io.save(invalid_path,test_image)
def test_unsupported_file_format():
# Create random test image
test_image = np.random.randint(0,256,(100,100,100),'uint8')
# Create filename with unsupported format
filename = 'test_image.unsupported'
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
image_path = os.path.join(temp_dir,filename)
with pytest.raises(ValueError,match='Unsupported file format'):
# Try to save test image with an unsupported file extension
qim3d.io.save(image_path,test_image)
def calculate_image_hash(image):
image_bytes = image.tobytes()
hash_object = hashlib.md5(image_bytes)
return hash_object.hexdigest()
\ No newline at end of file
import qim3d
import os
import re
from pathlib import Path
def test_mock_plot():
......@@ -35,3 +36,18 @@ def test_get_local_ip():
local_ip = qim3d.utils.internal_tools.get_local_ip()
assert validate_ip(local_ip) == True
def test_stringify_path1():
"""Test that the function converts os.PathLike objects to strings
"""
blobs_path = Path(qim3d.__file__).parents[0] / "img_examples" / "blobs_256x256.tif"
assert str(blobs_path) == qim3d.utils.internal_tools.stringify_path(blobs_path)
def test_stringify_path2():
"""Test that the function returns input unchanged if input is a string
"""
# Create test_path
test_path = os.path.join('this','path','doesnt','exist.tif')
assert test_path == qim3d.utils.internal_tools.stringify_path(test_path)
......@@ -38,7 +38,13 @@ def _make_request(doi, header):
def _log_and_get_text(doi, header):
response = _make_request(doi, header)
if response:
if response and response.encoding:
# Explicitly decode the response content using the specified encoding
text = response.content.decode(response.encoding)
log.info(text)
return text
elif response:
# If encoding is not specified, default to UTF-8
text = response.text
log.info(text)
return text
......
......@@ -7,6 +7,7 @@ import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import socket
import os
......@@ -176,3 +177,10 @@ def is_server_running(ip, port):
return True
except:
return False
def stringify_path(path):
"""Converts an os.PathLike object to a string
"""
if isinstance(path,os.PathLike):
path = path.__fspath__()
return path
\ No newline at end of file