Source code for pado.metadata.providers

"""pado.meta.store

provides a single file parquet store for pd.DataFrames with per store metadata
"""
from __future__ import annotations

import uuid
from abc import ABC
from functools import lru_cache
from reprlib import Repr
from typing import Any
from typing import Callable
from typing import Collection
from typing import Dict
from typing import Iterator
from typing import MutableMapping
from typing import Optional
from typing import cast

import pandas as pd

from pado._compat import cached_property
from pado.collections import validate_dataframe_index
from pado.images.ids import GetImageIdFunc
from pado.images.ids import ImageId
from pado.io.store import Store
from pado.io.store import StoreType
from pado.types import UrlpathLike

# === storage =================================================================


[docs]class MetadataProviderStore(Store): """stores the metadata in a single file with per store metadata""" METADATA_KEY_PROVIDER_VERSION = "dataset_version" DATASET_VERSION = 1 def __init__(self, version: int = 1, store_type: StoreType = StoreType.METADATA): if store_type != StoreType.METADATA: raise ValueError("changing store_type in subclasses unsupported") super().__init__(version=version, store_type=store_type) def __metadata_set_hook__( self, dct: Dict[bytes, bytes], setter: Callable[[dict, str, Any], None] ) -> None: setter(dct, self.METADATA_KEY_PROVIDER_VERSION, self.DATASET_VERSION) def __metadata_get_hook__( self, dct: Dict[bytes, bytes], getter: Callable[[dict, str, Any], Any] ) -> Optional[dict]: dataset_version = getter(dct, self.METADATA_KEY_PROVIDER_VERSION, None) if dataset_version is None or dataset_version < self.DATASET_VERSION: raise RuntimeError("Please migrate MetadataProviderStore to newer version.") elif dataset_version > self.DATASET_VERSION: raise RuntimeError( "MetadataProviderStore is newer. Please upgrade pado to newer version." ) return {self.METADATA_KEY_PROVIDER_VERSION: dataset_version}
# === provider ================================================================
[docs]class BaseMetadataProvider(MutableMapping[ImageId, pd.DataFrame], ABC): """base class for metadata providers"""
_r = Repr() _r.maxdict = 4
[docs]class MetadataProvider(BaseMetadataProvider): df: pd.DataFrame identifier: str def __init__( self, provider: BaseMetadataProvider | pd.DataFrame | dict, *, identifier: Optional[str] = None, ) -> None: if isinstance(provider, MetadataProvider): self.df = provider.df.copy() self.identifier = str(identifier) if identifier else provider.identifier elif isinstance(provider, pd.DataFrame): validate_dataframe_index(provider) self.df = provider.copy() self.identifier = str(identifier) if identifier else str(uuid.uuid4()) elif isinstance(provider, (BaseMetadataProvider, dict)): if not provider: self.df = pd.DataFrame(index=[], data={}) self.identifier = str(identifier) if identifier else str(uuid.uuid4()) else: columns = set() dfs = [] for image_id, df in provider.items(): if df.empty: continue ids = set(df.index.unique()) if len(ids) > 2: raise ValueError(f"image_ids in provider not unique: {ids!r}") image_id_str = image_id.to_str() if {image_id_str} == ids: pass elif {None, image_id_str}.issuperset(ids): index = df.index.fillna(image_id_str) df = df.set_index(index) else: raise AssertionError(f"{image_id_str} with Index: {ids!r}") dfs.append(df) columns.add(frozenset(df.columns)) if len(columns) != 1: raise RuntimeError( f"dataframe columns in provider don't match {columns!r}" ) self.df = pd.concat(dfs) self.identifier = str(identifier) if identifier else str(uuid.uuid4()) else: raise TypeError( f"expected `BaseMetadataProvider`, got: {type(provider).__name__!r}" ) self.__getitem_cached__ = lru_cache(maxsize=None)(self.__getitem_uncached__) def __getitem__(self, image_id: ImageId) -> pd.DataFrame: return self.__getitem_cached__(image_id) def __getitem_uncached__(self, image_id: ImageId) -> pd.DataFrame: if not isinstance(image_id, ImageId): raise TypeError( f"keys must be ImageId instances, got {type(image_id).__name__!r}" ) return self.df.loc[[image_id.to_str()]] def __setitem__(self, image_id: ImageId, value: pd.DataFrame) -> None: if not isinstance(image_id, ImageId): raise TypeError( f"keys must be ImageId instances, got {type(image_id).__name__!r}" ) if not value.columns == self.df.columns: raise ValueError("dataframe columns do not match") self.df = pd.concat( [ self.df.drop(image_id.to_str()), value.set_index(pd.Index([image_id.to_str()] * len(value))), ] ) def __delitem__(self, image_id: ImageId) -> None: if not isinstance(image_id, ImageId): raise TypeError( f"keys must be ImageId instances, got {type(image_id).__name__!r}" ) self.df.drop(image_id.to_str(), inplace=True) def __repr__(self): _akw = [_r.repr_dict(cast(dict, self), 0)] if self.identifier is not None: _akw.append(f"identifier={self.identifier!r}") return f"{type(self).__name__}({', '.join(_akw)})" def __len__(self) -> int: return self.df.index.nunique(dropna=True) def __iter__(self) -> Iterator[ImageId]: return map(ImageId.from_str, self.df.index.unique()) def to_parquet( self, urlpath: UrlpathLike, *, storage_options: dict[str, Any] | None = None ) -> None: store = MetadataProviderStore() store.to_urlpath( self.df, urlpath, identifier=self.identifier, storage_options=storage_options, ) @classmethod def from_parquet(cls, urlpath: UrlpathLike) -> MetadataProvider: store = MetadataProviderStore() df, identifier, user_metadata = store.from_urlpath(urlpath) if { store.METADATA_KEY_STORE_TYPE, store.METADATA_KEY_STORE_VERSION, store.METADATA_KEY_PADO_VERSION, store.METADATA_KEY_PROVIDER_VERSION, store.METADATA_KEY_CREATED_AT, store.METADATA_KEY_CREATED_BY, } != set(user_metadata): raise NotImplementedError(f"currently unused {user_metadata!r}") inst = cls.__new__(cls) inst.df = df inst.identifier = identifier inst.__getitem_cached__ = lru_cache(maxsize=None)(inst.__getitem_uncached__) return inst
[docs]class GroupedMetadataProvider(MetadataProvider): # todo: deduplicate # noinspection PyMissingConstructor def __init__(self, *providers: BaseMetadataProvider): # super().__init__() ... violating Liskov anyways ... self.providers: list[MetadataProvider] = [] for p in providers: if not isinstance(p, MetadataProvider): p = MetadataProvider(p) if isinstance(p, GroupedMetadataProvider): self.providers.extend(p.providers) else: self.providers.append(p) self.is_standardized = len({tuple(p.df.columns) for p in self.providers}) == 1 self.identifier = "-".join(["grouped", *(p.identifier for p in self.providers)]) @cached_property def df(self): if not self.is_standardized: raise RuntimeError( "can't access a combined pd.DataFrame on a non standardized " ) return pd.concat([p.df for p in self.providers]) def __getitem__(self, image_id: ImageId) -> pd.DataFrame: for ap in self.providers: try: return ap[image_id] except KeyError: pass raise KeyError(image_id) def __setitem__(self, image_id: ImageId, value: pd.DataFrame) -> None: raise RuntimeError("can't add new item to GroupedImageProvider") def __delitem__(self, image_id: ImageId) -> None: raise RuntimeError("can't delete from {type(self).__name__}") def __len__(self) -> int: return len(set().union(*self.providers)) def __iter__(self) -> Iterator[ImageId]: d = {} for provider in reversed(self.providers): d.update(dict.fromkeys(provider)) return iter(d) def __repr__(self): return f'{type(self).__name__}({", ".join(map(repr, self.providers))})' def to_parquet( self, urlpath: UrlpathLike, *, storage_options: dict[str, Any] | None = None ) -> None: super().to_parquet(urlpath, storage_options=storage_options) @classmethod def from_parquet(cls, urlpath: UrlpathLike) -> MetadataProvider: raise NotImplementedError(f"unsupported operation for {cls.__name__!r}()")
# === manipulation ============================================================ MetadataFromFileFunc = Callable[[UrlpathLike], Optional[pd.DataFrame]]
[docs]def create_metadata_provider( search_urlpath: UrlpathLike, search_glob: str, *, output_urlpath: Optional[UrlpathLike], image_id_func: GetImageIdFunc, metadata_func: MetadataFromFileFunc, identifier: Optional[str] = None, resume: bool = False, valid_image_ids: Optional[Collection[ImageId]] = None, progress: bool = False, ) -> MetadataProvider: """create an metadata provider from a directory containing metadata""" raise NotImplementedError("todo")