Source code for pado.annotations.annotation

from __future__ import annotations

import copy
import warnings
from reprlib import Repr
from typing import TYPE_CHECKING
from typing import Any
from typing import Iterable
from typing import MutableSequence
from typing import Optional
from typing import Union
from typing import overload

import numpy as np
import orjson
import pandas as pd
from pydantic.color import Color
from shapely.affinity import scale as shapely_scale
from shapely.affinity import translate as shapely_translate
from shapely.geometry.base import BaseGeometry
from shapely.strtree import STRtree
from shapely.wkt import loads as wkt_loads

from pado.annotations.formats import AnnotationModel
from pado.annotations.formats import AnnotationState
from pado.annotations.formats import AnnotationStyle
from pado.annotations.formats import Annotator
from pado.images.ids import ImageId

if TYPE_CHECKING:
    from pado.images.utils import MPP
    from pado.images.utils import IntPoint


[docs]class Annotation: """Annotation class""" image_id: Optional[ImageId] identifier: Optional[str] project: Optional[str] annotator: Annotator state: AnnotationState style: AnnotationStyle classification: str color: Color description: str comment: str geometry: BaseGeometry def __init__(self, model: AnnotationModel): # noinspection PyProtectedMember self.__dict__.update(model._iter()) self._model = model self._readonly = True def __setattr__(self, key, value): if getattr(self, "_readonly", False): raise AttributeError(f"{key} is readonly") super().__setattr__(key, value) def __repr__(self): return f"{type(self).__name__}(model={self._model!r})" def __eq__(self, other): if not isinstance(other, Annotation): return False return all( self.__dict__[k] == other.__dict__[k] for k in self.__dict__ if k not in {"_model", "color"} )
[docs] @classmethod def from_obj(cls, obj: Any) -> Annotation: """instantiate an annotation from an object, i.e. a pd.Series""" return cls(AnnotationModel.parse_obj(obj))
[docs] def to_record(self, image_id: Optional[ImageId] = None) -> dict: """return a record for serializing""" m = self._model dct = m.dict(exclude={"image_id", "color", "geometry"}) if m.image_id is not None and image_id is not None: if m.image_id != image_id: raise ValueError( f"Annotation has different image_id: has {m.image_id} requested {image_id}" ) _id = m.image_id or image_id dct["image_id"] = _id.to_str() if _id is not None else None dct["color"] = m.color.as_rgb() if m.color is not None else None dct["geometry"] = m.geometry.wkt return dct
_r = Repr() _r.maxlist = 3
[docs]class Annotations(MutableSequence[Annotation]): df: pd.DataFrame def __init__( self, df: Optional[pd.DataFrame] = None, *, image_id: Optional[ImageId] = None ) -> None: if df is None: self.df = pd.DataFrame(columns=AnnotationModel.__fields__) elif isinstance(df, pd.DataFrame): self.df = df else: raise TypeError(f"requires a pd.DataFrame, not {type(df).__name__}") self._image_id = image_id if image_id is not None: self._update_df_image_id(image_id) def __repr__(self): return f"{type(self).__name__}({_r.repr_list(self, 0)}, image_id={self._image_id!r})" # type: ignore def __eq__(self, other): if not isinstance(other, Annotations): return False return all(a == b for a, b in zip(self, other)) @property def image_id(self) -> Optional[ImageId]: return self._image_id @image_id.setter def image_id(self, value: ImageId): if not isinstance(value, ImageId): raise TypeError( f"{value!r} not of type ImageId, got {type(value).__name__}" ) self._update_df_image_id(image_id=value) self._image_id = value def _update_df_image_id(self, image_id: ImageId): """internal""" if self.df.empty: return ids = set(self.df["image_id"].unique()) if len(ids) > 2: raise ValueError(f"image_ids in provider not unique: {ids!r}") if None not in ids and image_id.to_str() in ids: return elif {None, image_id.to_str()}.issuperset(ids): self.df.loc[self.df["image_id"].isna(), "image_id"] = image_id.to_str() else: raise AssertionError(f"unexpected image_ids in Annotations.df: {ids!r}") @overload def __getitem__(self, index: int) -> Annotation: ... @overload def __getitem__(self, index: slice) -> Annotations: ... def __getitem__(self, index: Union[int, slice]) -> Union[Annotation, Annotations]: if isinstance(index, (int, np.int32, np.int64)): return Annotation.from_obj(self.df.iloc[index, :]) elif isinstance(index, slice): return Annotations(self.df.loc[index, :], image_id=self.image_id) else: raise TypeError( f"Annotations: indices must be integers or slices, not {type(index).__name__}" ) @overload def __setitem__(self, index: int, value: Annotation) -> None: ... @overload def __setitem__(self, index: slice, value: Iterable[Annotation]) -> None: ... def __setitem__( self, index: Union[int, slice], value: Union[Annotation, Iterable[Annotation]] ) -> None: if isinstance(index, int): assert isinstance(value, Annotation) self.df.iloc[index, :] = pd.DataFrame( [value.to_record(self._image_id)], columns=list(AnnotationModel.__fields__), ) elif isinstance(index, slice): assert hasattr(value, "__iter__") self.df.iloc[index, :] = pd.DataFrame( [x.to_record(self._image_id) for x in value], columns=list(AnnotationModel.__fields__), ) else: raise TypeError( f"Annotations: indices must be integers or slices, not {type(index).__name__}" ) def __delitem__(self, index: Union[int, slice]) -> None: if isinstance(index, int): self.df.drop(labels=index, axis=0, inplace=True) elif isinstance(index, slice): self.df.drop(labels=self.df.index[index], axis=0, inplace=True) else: raise TypeError( f"Annotations: indices must be integers or slices, not {type(index).__name__}" )
[docs] def insert(self, index: int, value: Annotation) -> None: if not isinstance(value, Annotation): raise TypeError( f"can only insert type Annotation, got {type(value).__name__!r}" ) df_a = self.df.iloc[:index, :] df_i = pd.DataFrame( [value.to_record(self._image_id)], columns=AnnotationModel.__fields__ ) df_b = self.df.iloc[index:, :] self.df = pd.concat([df_a, df_i, df_b])
def __len__(self) -> int: return len(self.df) @classmethod def from_records( cls, annotation_records: Iterable[dict], *, image_id: Optional[ImageId] = None ) -> Annotations: df = pd.DataFrame(list(annotation_records), columns=AnnotationModel.__fields__) return Annotations(df, image_id=image_id)
class AnnotationIndex: def __init__(self, geometries: list[BaseGeometry]) -> None: self.geometries = copy.copy(geometries) with warnings.catch_warnings(): warnings.simplefilter("ignore", FutureWarning) strtree = STRtree(geometries) self._strtree = strtree # noinspection PyShadowingNames @classmethod def from_annotations( cls, annotations: Annotations | None ) -> AnnotationIndex | None: if annotations is None: return None geometries = [a.geometry for a in annotations] return cls(geometries) def query_items(self, geom: BaseGeometry) -> list[int]: return list(self._strtree.query(geom)) def to_json(self, *, as_string: bool = False) -> str | dict: obj = { "type": "pado.annotations.annotation.AnnotationIndex", "version": 1, "geometries": [o.wkt for o in self.geometries], } if as_string: return orjson.dumps(obj, option=orjson.OPT_SERIALIZE_NUMPY).decode() else: return obj @classmethod def from_json(cls, obj: str | dict | None) -> AnnotationIndex | None: if obj is None: return None if isinstance(obj, str): obj = orjson.loads(obj.encode()) if not isinstance(obj, dict): raise TypeError("expected json str or dict") t = obj["type"] if t != "pado.annotations.annotation.AnnotationIndex": raise NotImplementedError(t) geometries = obj["geometries"] return cls([wkt_loads(o) for o in geometries]) def shapely_fix_shape( shape: BaseGeometry, buffer_size: tuple[int, int] ) -> BaseGeometry: shape = shape.buffer(buffer_size[0]) if not shape.is_valid: shape = shape.buffer(buffer_size[1]) return shape def ensure_validity( annotation: Annotation, ) -> Annotation: geom = annotation.geometry if not geom.is_valid: geom = shapely_fix_shape(geom, buffer_size=(0, 0)) annotation.geometry = geom return annotation def scale_annotation( annotation: Annotation, *, level0_mpp: MPP, target_mpp: MPP, ) -> Annotation: rescale = None # We rescale if target_mpp differs from slide_mpp if target_mpp != level0_mpp: rescale = dict( xfact=level0_mpp.x / target_mpp.x, yfact=level0_mpp.y / target_mpp.y, origin=(0, 0), ) geom = annotation.geometry if not geom.is_valid: geom = shapely_fix_shape(geom, buffer_size=(0, 0)) if rescale: geom = shapely_scale(geom, **rescale) if not geom.is_valid: geom = shapely_fix_shape(geom, buffer_size=(0, 0)) annotation.geometry = geom return annotation def translate_annotation(annotation: Annotation, *, location: IntPoint) -> Annotation: geom = shapely_translate(annotation.geometry, xoff=-location.x, yoff=-location.y) annotation.geometry = geom return annotation