"""pado image abstraction to hide image loading implementation"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import TYPE_CHECKING
from typing import Any
from typing import Collection
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import tiffslide
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.memory import MemoryFileSystem
from numpy.typing import NDArray
from pydantic import BaseModel
from pydantic import ByteSize
from pydantic import Extra
from pydantic import PositiveFloat
from pydantic import PositiveInt
from pydantic import validator
from pydantic.color import Color
from tifffile import ZarrTiffStore
from tiffslide import TiffSlide
# noinspection PyProtectedMember
from tiffslide._zarr import get_zarr_chunk_sizes
from pado.images.utils import MPP
from pado.images.utils import IntPoint
from pado.images.utils import IntSize
from pado.io.checksum import Checksum
from pado.io.checksum import compare_checksums
from pado.io.checksum import compute_checksum
from pado.io.files import update_fs_storage_options
from pado.io.files import urlpathlike_get_fs_cls
from pado.io.files import urlpathlike_is_localfile
from pado.io.files import urlpathlike_local_via_fs
from pado.io.files import urlpathlike_to_fs_and_path
from pado.io.files import urlpathlike_to_fsspec
from pado.io.files import urlpathlike_to_string
from pado.io.paths import get_dataset_fs
from pado.types import UrlpathLike
if TYPE_CHECKING:
import numpy as np
import PIL.Image
from pado.dataset import PadoDataset
from pado.images.ids import ImageId
try:
import cv2
except ImportError:
cv2 = None
_log = logging.getLogger(__name__)
# --- metadata and info models ---
[docs]class FileInfo(BaseModel):
"""information related to the file on disk"""
size_bytes: ByteSize
md5_computed: Optional[str] = None
time_last_access: Optional[datetime] = None
time_last_modified: Optional[datetime] = None
time_status_changed: Optional[datetime] = None
[docs]class PadoInfo(BaseModel):
"""information regarding the file loading"""
urlpath: str
pado_image_backend: str
pado_image_backend_version: str
class _SerializedImage(ImageMetadata, FileInfo, PadoInfo):
class Config:
extra = Extra.forbid
[docs]class Image:
"""pado.img.Image is a wrapper around whole slide image data"""
__slots__ = (
"urlpath",
"_metadata",
"_file_info",
"_slide",
) # prevent attribute errors during refactor
__fields__: tuple[str, ...] = tuple(_SerializedImage.__fields__)
def __init__(
self,
urlpath: UrlpathLike,
*,
load_metadata: bool = False,
load_file_info: bool = False,
checksum: bool | str = False,
):
"""instantiate an image from an urlpath"""
self.urlpath = urlpath
self._metadata: Optional[ImageMetadata] = None
self._file_info: Optional[FileInfo] = None
# file handles
self._slide: Optional[TiffSlide] = None
# optional load on init
if load_metadata or load_file_info or checksum:
with self:
if load_metadata:
self._metadata = self._load_metadata()
if load_file_info or checksum:
self._file_info = self._load_file_info(checksum=checksum)
[docs] @classmethod
def from_obj(cls, obj: Any) -> Image:
"""instantiate an image from an object, i.e. a pd.Series"""
md = _SerializedImage.parse_obj(obj)
# get metadata
metadata = ImageMetadata.parse_obj(md)
file_info = FileInfo.parse_obj(md)
pado_info = PadoInfo.parse_obj(md)
# get extra data
inst = cls(pado_info.urlpath)
inst._metadata = metadata
inst._file_info = file_info
# todo: warn if tiffslide version difference
# pado_info ...
return inst
[docs] def to_record(
self,
image_id: ImageId | None = None,
*,
urlpath_ignore_options: Collection[str] = (),
**kwargs: Any,
) -> dict[str, Any]:
"""return a record for serializing"""
pado_info = PadoInfo(
urlpath=urlpathlike_to_string(
self.urlpath, ignore_options=urlpath_ignore_options
),
pado_image_backend=TiffSlide.__class__.__qualname__,
pado_image_backend_version=tiffslide.__version__,
)
return _SerializedImage.parse_obj(
{
**pado_info.dict(),
**self.metadata.dict(),
**self.file_info.dict(),
}
).dict()
def __enter__(self) -> Image:
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __del__(self):
self.close()
@property
def is_open(self):
return self._slide is not None
[docs] def open(
self,
*,
via: AbstractFileSystem | None = None,
storage_options: dict[str, Any] | None = None,
) -> Image:
"""open an image instance
This will instantiate the filesystem. Dependent on the
filesystem this will establish connections to servers, etc.
If open has been called, following calls will be no-ops.
Parameters
----------
via:
allows to provide a filesystem that will be used instead of
the Image.urlpath's filesystem to access the path.
storage_options:
allows providing storage options for the filesystem used to
access the image.
Returns
-------
self:
returns the opened image instance
"""
if not self._slide:
if via is None or isinstance(via, MemoryFileSystem):
of = urlpathlike_to_fsspec(
self.urlpath, storage_options=storage_options
)
elif isinstance(via, AbstractFileSystem):
of = urlpathlike_local_via_fs(
self.urlpath,
fs=update_fs_storage_options(via, storage_options=storage_options),
)
else:
raise TypeError(
f"via not an AbstractFileSystem, got {type(via).__name__}"
)
try:
self._slide = TiffSlide(of)
except Exception as e:
_log.error(f"{self.urlpath!r} with error {e!r}")
self.close()
raise
return self
[docs] def via(
self,
ds: PadoDataset,
*,
storage_options: dict[str, Any] | None = None,
) -> Image:
"""open an image instance via a pado dataset
Similar behavior to .open() with the difference that only if
the dataset is accessed remotely and the images are referenced
locally (so on the same remote) will the image be accessed via
the dataset filesystem.
A common example is a pado dataset stored on a server with the
images stored next to it on the server filesystem. If this
dataset is now accessed via ssh, the images will be accessible
via ssh too.
Parameters
----------
ds:
this pado dataset's filesystem will be used for access
storage_options:
allows providing storage options for the filesystem used to
access the image.
Returns
-------
self:
returns the opened image instance
"""
ds_fs = get_dataset_fs(ds)
# check if we are accessing a dataset remotely, that has references to
# files locally. For now access via ssh is the primary use case for this.
if not isinstance(ds_fs, LocalFileSystem):
im_fs_cls = urlpathlike_get_fs_cls(self.urlpath)
if issubclass(im_fs_cls, LocalFileSystem):
self.open(via=ds_fs, storage_options=storage_options)
return self
self.open() # to make .via()'s behavior consistent we have to call open here
return self
[docs] def close(self):
"""close and image instance"""
if self._slide:
self._slide.close()
self._slide = None
def __repr__(self):
return f"{type(self).__name__}({self.urlpath!r})"
def __eq__(self, other: Any) -> bool:
"""compare if two images are identical"""
if not isinstance(other, Image):
return False
# if checksum available for both
if self.file_info.md5_computed and other.file_info.md5_computed:
try:
return compare_checksums(
self.file_info.md5_computed, other.file_info.md5_computed
)
except ValueError:
pass
if self.file_info.size_bytes != other.file_info.size_bytes:
return False
return self.metadata == other.metadata
def _load_metadata(self, *, force: bool = False) -> ImageMetadata:
"""load the metadata from the file"""
if self._metadata is None or force:
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
slide = self._slide
props = slide.properties
dimensions = slide.dimensions
_used_keys: Dict[str, Any] = {}
def pget(key):
return _used_keys.setdefault(key, props.get(key))
return ImageMetadata(
width=dimensions[0],
height=dimensions[1],
objective_power=pget(tiffslide.PROPERTY_NAME_OBJECTIVE_POWER),
mpp_x=pget(tiffslide.PROPERTY_NAME_MPP_X),
mpp_y=pget(tiffslide.PROPERTY_NAME_MPP_Y),
downsamples=list(slide.level_downsamples),
vendor=pget(tiffslide.PROPERTY_NAME_VENDOR),
background_color=pget(tiffslide.PROPERTY_NAME_BACKGROUND_COLOR),
quickhash1=pget(tiffslide.PROPERTY_NAME_QUICKHASH1),
comment=pget(tiffslide.PROPERTY_NAME_COMMENT),
bounds_x=pget(tiffslide.PROPERTY_NAME_BOUNDS_X),
bounds_y=pget(tiffslide.PROPERTY_NAME_BOUNDS_Y),
bounds_width=pget(tiffslide.PROPERTY_NAME_BOUNDS_WIDTH),
bounds_height=pget(tiffslide.PROPERTY_NAME_BOUNDS_HEIGHT),
extra_json=json.dumps(
{
key: value
for key, value in sorted(props.items())
if key not in _used_keys
}
),
)
else:
return self._metadata
def _load_file_info(
self, *, force: bool = False, checksum: bool | str = False
) -> FileInfo:
"""load the file information from the file"""
if self._file_info is None or force:
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
fs, path = urlpathlike_to_fs_and_path(self.urlpath)
if checksum is True:
checksums = compute_checksum(self.urlpath, available_only=not force)
_checksum = Checksum.join_checksums(checksums)
elif checksum is False:
_checksum = None
elif isinstance(checksum, str):
checksums = Checksum.from_str(checksum, unpack_single=False)
_checksum = Checksum.join_checksums(checksums)
else:
raise TypeError(
f"checksum must be bool or str, got: {type(checksum).__name__!r}"
)
info = fs.info(path)
return FileInfo(
size_bytes=info["size"],
md5_computed=_checksum,
time_last_access=info.get("atime"),
time_last_modified=info.get("mtime"),
time_status_changed=info.get("created"),
)
else:
return self._file_info
@property
def metadata(self) -> ImageMetadata:
"""the image metadata"""
if self._metadata is None:
# we need to load the image metadata
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
self._metadata = self._load_metadata()
return self._metadata
@property
def file_info(self) -> FileInfo:
"""stats regarding the image file"""
if self._file_info is None:
# we need to load the file_info data
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
self._file_info = self._load_file_info(checksum=False)
return self._file_info
@property
def level_count(self) -> int:
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
return self._slide.level_count
@property
def level_dimensions(self) -> Dict[int, IntSize]:
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
dims = self._slide.level_dimensions
down = self._slide.level_downsamples
mpp0 = self.mpp
return {
lvl: IntSize(x, y, mpp0.scale(ds))
for lvl, ((x, y), ds) in enumerate(zip(dims, down))
}
@property
def level_mpp(self) -> Dict[int, MPP]:
return {
lvl: self.mpp.scale(ds) for lvl, ds in enumerate(self.metadata.downsamples)
}
@property
def mpp(self) -> MPP:
return MPP(self.metadata.mpp_x, self.metadata.mpp_y)
@property
def dimensions(self) -> IntSize:
return IntSize(
x=self.metadata.width,
y=self.metadata.height,
mpp=self.mpp,
)
def get_thumbnail(self, size: Union[IntSize, Tuple[int, int]]) -> PIL.Image.Image:
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
if isinstance(size, tuple):
_, _ = size
elif isinstance(size, IntSize):
size = size.as_tuple()
else:
raise TypeError(
f"expected tuple or IntSize, got {size!r} of cls {type(size).__name__}"
)
return self._slide.get_thumbnail(size=size, use_embedded=True)
[docs] def get_array(
self,
location: IntPoint,
region: IntSize,
level: int,
*,
runtime_type_checks: bool = True,
) -> np.ndarray:
"""return array from a defined level"""
if runtime_type_checks:
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
# location
if not isinstance(location, IntPoint):
raise TypeError(
f"location requires IntPoint, got: {location!r} of {type(location).__name__}"
)
elif location.mpp is not None and location.mpp != self.mpp:
_guess = next( # improve error for user
(idx for idx, mpp in self.level_mpp.items() if mpp == location.mpp),
"level-not-in-image",
)
raise ValueError(
f"location not at level 0, got {location!r} at {_guess}"
)
# level (indirectly)
try:
level_mpp = self.level_mpp[level]
except KeyError:
raise ValueError(f"level error: 0 <= {level} <= {self.level_count}")
# region
if not isinstance(region, IntSize):
raise TypeError(
f"region requires IntSize, got: {region!r} of {type(region).__name__}"
)
elif region.mpp is not None and region.mpp != level_mpp:
_guess = next( # improve error for user
(idx for idx, mpp in self.level_mpp.items() if mpp == region.mpp),
"level-not-in-image",
)
raise ValueError(
f"region not at level {level}, got {region!r} at {_guess}"
)
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
return self._slide.read_region(
location.as_tuple(), level, region.as_tuple(), as_array=True
)
[docs] def get_array_at_mpp(
self, location: IntPoint, region: IntSize, target_mpp: MPP
) -> np.ndarray:
"""return array from a defined mpp and a position (in the target mpp)"""
if self._slide is None:
raise RuntimeError("need to open slide")
if location.mpp != target_mpp:
raise ValueError(
f"location.mpp != target_mpp -> {location.mpp!r} != {target_mpp!r}"
)
if target_mpp.x != target_mpp.y:
raise NotImplementedError("currently assuming same x and y mpp")
if region.mpp is None:
pass
elif region.mpp != target_mpp:
raise ValueError(
f"region.mpp != target_mpp -> {region.mpp!r} != {target_mpp!r}"
)
# we find the corresponding location at level0
lvl0_xy = _scale_xy(
location,
mpp_current=target_mpp,
mpp_target=self.mpp,
)
region_wh = region.as_tuple()
for lvl_best, mpp_best in self.level_mpp.items():
if target_mpp > mpp_best or target_mpp == mpp_best:
break
else:
raise NotImplementedError(
f"requesting a smaller mpp {target_mpp!r} "
f"than provided in the image {self.level_mpp.items()!r}"
)
if target_mpp == mpp_best:
# no need to rescale
array = self._slide.read_region(
location=lvl0_xy, level=lvl_best, size=region_wh, as_array=True
)
else:
# we need to rescale to the target_mpp
region_best = _scale_xy(
region, mpp_current=location.mpp, mpp_target=mpp_best
)
array = self._slide.read_region(
location=lvl0_xy, level=lvl_best, size=region_best, as_array=True
)
if array.shape[0:2:-1] != region_wh:
array = cv2.resize(array, dsize=region_wh)
return array
[docs] def get_zarr_store(
self,
level: int,
*,
chunkmode: int = 0,
zattrs: dict[str, Any] | None = None,
) -> ZarrTiffStore:
"""return the entire level as a zarr store"""
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
return self._slide.ts_tifffile.aszarr(
key=None,
series=None,
level=level,
chunkmode=chunkmode,
zattrs=zattrs,
)
[docs] def get_chunk_sizes(
self,
level: int = 0,
) -> NDArray[np.int_]:
"""return a chunk bytesize array"""
if self._slide is None:
raise RuntimeError(f"{self!r} not opened and not in context manager")
axes = self._slide.properties["tiffslide.series-axes"]
if axes == "YXS":
sum_axis = 2
elif axes == "CYX":
sum_axis = 0
else:
raise NotImplementedError(f"axes: {axes!r}")
return get_zarr_chunk_sizes(
self._slide.zarr_group, level=level, sum_axis=sum_axis
)
[docs] def is_local(self, must_exist=True) -> bool:
"""Return True if the image is stored locally"""
return urlpathlike_is_localfile(self.urlpath, must_exist=must_exist)
def _scale_xy(
to_transform: Union[IntPoint, IntSize], mpp_current: MPP, mpp_target: MPP
):
pos_x, pos_y = to_transform.as_tuple()
mpp_x_current, mpp_y_current = mpp_current.as_tuple()
mpp_x_target, mpp_y_target = mpp_target.as_tuple()
x = int(round(pos_x * mpp_x_current / mpp_x_target))
y = int(round(pos_y * mpp_y_current / mpp_y_target))
return x, y