Source code for pado.shutil

from __future__ import annotations

import os.path
from contextlib import ExitStack
from functools import partial
from typing import Any
from typing import Callable

from fsspec import AbstractFileSystem

from pado.dataset import PadoDataset
from pado.images.providers import update_image_provider_urlpaths
from pado.io.files import urlpathlike_local_via_fs
from pado.predictions.providers import ImagePredictionProvider


[docs]def transfer( ds0: PadoDataset, ds1: PadoDataset, *, image_providers: bool = False, metadata_providers: bool = False, annotation_providers: bool = False, image_prediction_providers: bool = False, images_path: str | None = None, image_predictions_path: str | None = None, keep_individual_providers: bool = True, progress_callback: Callable[[str], Any] = lambda _: None, ) -> None: """move dataset contents from one to another""" if not isinstance(ds0, PadoDataset): raise TypeError(f"ds0={ds0!r} not a PadoDataset") if not isinstance(ds1, PadoDataset): raise TypeError(f"ds1={ds1!r} not a PadoDataset") elif ds1.readonly: raise ValueError(f"ds1 must be writable: {ds1!r}") # noinspection PyProtectedMember cp = partial( _transfer, ds0=ds0, ds1=ds1, progress_callback=progress_callback, keep_individual_providers=keep_individual_providers, ) if image_providers: cp(ds0.images, item_urlpath_destination=images_path) if metadata_providers: cp(ds0.metadata) if annotation_providers: cp(ds0.annotations) if image_prediction_providers: cp(ds0.predictions.images, item_urlpath_destination=image_predictions_path)
def _transfer( p, *, ds0: PadoDataset, ds1: PadoDataset, progress_callback, keep_individual_providers, item_urlpath_destination=None, chunked=False, ) -> None: # noinspection PyProtectedMember fs0: AbstractFileSystem = ds0._fs # noinspection PyProtectedMember fs1: AbstractFileSystem = ds1._fs # noinspection PyProtectedMember get_fspath1 = ds1._get_fspath if keep_individual_providers and hasattr(p, "providers"): providers = p.providers else: providers = [p] for _p in providers: progress_callback(repr(_p)) # copy items referenced by urlpath in the provider if requested if item_urlpath_destination is not None and hasattr(_p, "df"): # create destination dir dst_dir = get_fspath1(item_urlpath_destination) if not fs1.isdir(dst_dir): fs1.mkdir(dst_dir, create_parents=True) for item_urlpath_src in _p.df.urlpath: # we need to see access locally referenced items remotely, # if we access the dataset remotely of0 = urlpathlike_local_via_fs(item_urlpath_src, fs0) # create the remote path _, name = os.path.split(of0.path) item_urlpath_dst = get_fspath1(item_urlpath_destination, name) # skip if remote exists if fs1.isfile(item_urlpath_dst) and fs1.size(item_urlpath_dst) > 0: progress_callback(f". EXISTS {name}") continue else: of1 = fs1.open(item_urlpath_dst, mode="wb") # transfer file with ExitStack() as stack: try: f0 = stack.enter_context(of0) except FileNotFoundError: progress_callback(f"NOT FOUND {name}") continue else: progress_callback(name) f1 = stack.enter_context(of1) if chunked: f0_read = f0.read f1_write = f1.write buf_size = fs1.blocksize while True: progress_callback(".") buf = f0_read(buf_size) if not buf: break f1_write(buf) else: buf = f0.read() progress_callback(". received") f1.write(buf) progress_callback(". transferred") # re-associate the uploaded provider's item urlpaths _p = update_image_provider_urlpaths( fs1.open(get_fspath1(item_urlpath_destination)), search_glob="*.*", provider=_p, inplace=False, ignore_ambiguous=True, progress=True, provider_cls=type(_p), ) try: ds1.ingest_obj(_p) except FileExistsError: progress_callback(". PROVIDER EXISTED ALREADY, OVERWRITING") if not isinstance(_p, ImagePredictionProvider): raise RuntimeError("provider not an image provider") ds1.ingest_obj(_p, overwrite=True)