Source code for pado.io.paths

"""helpers for dealing with fsspec OpenFile"""
from __future__ import annotations

from operator import itemgetter
from typing import TYPE_CHECKING
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple

from fsspec import AbstractFileSystem
from fsspec.core import OpenFile
from tqdm import tqdm

from pado.io.files import fsopen
from pado.io.files import urlpathlike_to_path_parts
from pado.types import FsspecIOMode
from pado.types import UrlpathLike

if TYPE_CHECKING:
    from pado.dataset import PadoDataset


__all__ = [
    "match_partial_paths_reversed",
    "search_dataset",
    "get_dataset_fs",
]


[docs]def match_partial_paths_reversed( current_urlpaths: Sequence[UrlpathLike], new_urlpaths: Sequence[UrlpathLike], *, ignore_ambiguous: bool = False, progress: bool = False, ) -> Sequence[UrlpathLike]: """match paths for re-assigning files returns a new sequence of paths replacing current with new ones raises ValueError in case match is ambiguous """ current_path_parts = {} for _idx, urlpathlike in enumerate(current_urlpaths): _parts = urlpathlike_to_path_parts(urlpathlike) current_path_parts[_parts] = { "index": _idx, "cur": urlpathlike, "new": None, } parts = set(current_path_parts) def match( x: Tuple[str, ...], s: Set[Tuple[str, ...]], idx: int ) -> Optional[Tuple[str, ...]]: try: xi = x[idx] # raises index error when out of parts to match except IndexError: if ignore_ambiguous: return None else: raise ValueError(f"ambiguous: {x!r} -> {s!r}") sj = {sx for sx in s if sx[idx] == xi or xi is None} if len(sj) == 1: return sj.pop() elif len(sj) == 0: return None else: return match(x, sj, idx - 1) if progress: new_urlpaths = tqdm(new_urlpaths, desc="trying to match new files") for new_up in new_urlpaths: new_parts = urlpathlike_to_path_parts(new_up) m = match(new_parts, parts, -1) if m: current_path_parts[m]["new"] = new_up return [ x["new"] or x["cur"] # type: ignore for x in sorted(current_path_parts.values(), key=itemgetter("index")) ]
[docs]def search_dataset( ds: PadoDataset, glob: str, *, mode: FsspecIOMode = "rb" ) -> list[OpenFile]: """search for files in a pado dataset root path""" # noinspection PyProtectedMember fs, get_fspath = ds._fs, ds._get_fspath return [fsopen(fs, p, mode=mode) for p in fs.glob(get_fspath(glob))]
[docs]def get_dataset_fs(ds: PadoDataset) -> AbstractFileSystem: """return the dataset's filesystem""" # noinspection PyProtectedMember try: return ds._fs except AttributeError: raise TypeError(f"not a PadoDataset, got {type(ds).__name__}")