"""file utility functions"""
from __future__ import annotations
import gzip
import inspect
import json
import lzma
import os
import pickle
import sys
import tarfile
import warnings
import zipfile
from ast import literal_eval
from contextlib import ExitStack
from contextlib import contextmanager
from typing import IO
from typing import Any
from typing import AnyStr
from typing import Collection
from typing import ContextManager
from typing import Iterator
from typing import NamedTuple
from typing import Tuple
from typing import Union
from typing import cast
import fsspec
from fsspec import AbstractFileSystem
from fsspec.core import OpenFile
from fsspec.core import strip_protocol
from fsspec.implementations.local import LocalFileSystem
from fsspec.registry import _import_class
from fsspec.registry import get_filesystem_class
from fsspec.utils import get_protocol
from fsspec.utils import infer_storage_options
from pado.types import FsspecIOMode
from pado.types import OpenFileLike
from pado.types import UrlpathLike
if sys.version_info[:2] >= (3, 10):
from typing import Literal
from typing import TypeGuard
from typing import get_args
else:
from typing_extensions import Literal
from typing_extensions import TypeGuard
from typing_extensions import get_args
__all__ = [
"find_files",
"is_fsspec_open_file_like",
"update_fs_storage_options",
"urlpathlike_local_via_fs",
"urlpathlike_get_fs_cls",
"urlpathlike_get_path",
"urlpathlike_get_storage_args_options",
"urlpathlike_is_localfile",
"urlpathlike_to_string",
"urlpathlike_to_fsspec",
"urlpathlike_to_fs_and_path",
"urlpathlike_to_path_parts",
"urlpathlike_to_localpath",
"urlpathlike_to_uri",
"fsopen",
"uncompressed",
]
_PADO_FSSPEC_PICKLE_PROTOCOL = 4
_PADO_ALLOW_PICKLED_URLPATHS = None
def _pado_pickle_load(obj: Any):
"""load a pickled urlpath fs"""
global _PADO_ALLOW_PICKLED_URLPATHS
if _PADO_ALLOW_PICKLED_URLPATHS is None:
from pado.settings import settings
_PADO_ALLOW_PICKLED_URLPATHS = settings.allow_pickled_urlpaths
if not _PADO_ALLOW_PICKLED_URLPATHS:
raise Exception(
"Loading pickled urlpaths is disabled. "
"Use PADO_ALLOW_PICKLED_URLPATHS=1 to enable, or set in your"
".pado.toml config file."
)
else:
return pickle.loads(literal_eval(obj)) # nosec B301
def _os_path_parts(pth: str) -> tuple[str, ...]:
"""os.path version of pathlib.Path().parts"""
remaining, part = os.path.split(pth)
if remaining == pth:
return (remaining,)
elif part == pth:
return (pth,)
else:
return (*_os_path_parts(remaining), part)
class _OpenFileAndParts(NamedTuple):
file: OpenFile
parts: Tuple[str, ...]
[docs]def find_files(
urlpath: UrlpathLike,
*,
glob: str = "**/*",
storage_options: dict[str, Any] | None = None,
) -> list[_OpenFileAndParts]:
"""iterate over the files with matching at all paths"""
if storage_options is None:
storage_options = {}
ofile = urlpathlike_to_fsspec(urlpath, storage_options=storage_options)
fs = ofile.fs
pth = ofile.path
if not fs.isdir(pth):
# todo: check if this makes sense for all fsspec...
raise NotADirectoryError(pth)
def split_into_parts(base, p):
rpth = os.path.relpath(p, base)
return tuple(os.path.normpath(rpth).split(os.sep))
return [
_OpenFileAndParts(
file=OpenFile(fs=fs, path=opth),
parts=split_into_parts(pth, opth),
)
for opth in fs.glob(os.path.join(pth, glob))
]
[docs]def is_fsspec_open_file_like(obj: Any) -> TypeGuard[OpenFileLike]:
"""test if an object is like a fsspec.core.OpenFile instance"""
# if isinstance(obj, fsspec.core.OpenFile) doesn't cut it...
# ... fsspec filesystems just need to quack OpenFile.
return (
isinstance(obj, OpenFileLike)
and isinstance(obj.fs, fsspec.AbstractFileSystem)
and isinstance(obj.path, str)
)
def _deserialize_fsspec_json(obj: Any) -> dict[str, Any] | None:
"""try to deserialize"""
try:
json_obj = json.loads(obj)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(json_obj, dict):
# this is unexpected, so its better to raise instead of returning None
raise TypeError(f"got json {json_obj!r} of type {type(json_obj)!r}")
return json_obj
def _get_fsspec_cls_from_serialized_fs(fs_obj: str) -> type[AbstractFileSystem]:
"""try to return the filesystem cls from a json string or pickle"""
try:
fs_dct = json.loads(fs_obj)
except json.JSONDecodeError:
# json_obj["fs"] is not json ...
fs = _pado_pickle_load(fs_obj)
return type(fs)
else:
# json_obj["fs"] is json
if not isinstance(fs_dct, dict):
raise TypeError(f"expected dict, got {fs_dct!r}")
protocol = fs_dct.pop("protocol")
try:
cls = _import_class(fs_dct.pop("cls"))
except (ImportError, ValueError, RuntimeError, KeyError):
cls = get_filesystem_class(protocol)
return cls
def _get_fsspec_storage_args_options_from_serialized_fs(
fs_obj: str,
) -> tuple[tuple[Any, ...], dict[str, Any]]:
"""try to return the filesystem storage args and options from a json string or pickle"""
try:
fs_dct = json.loads(fs_obj)
except json.JSONDecodeError:
# json_obj["fs"] is not json ...
fs = _pado_pickle_load(fs_obj)
return fs.storage_args, fs.storage_options
else:
# json_obj["fs"] is json
if not isinstance(fs_dct, dict):
raise TypeError(f"expected dict, got {fs_dct!r}")
_ = fs_dct.pop("protocol")
_ = fs_dct.pop("cls")
storage_args = fs_dct.pop("args")
return storage_args, fs_dct
def _get_constructor_default_options(cls: type) -> dict[str, Any]:
"""return the default options of the class constructor"""
arg_spec = inspect.getfullargspec(cls.__init__) # type: ignore
default_options = {}
for name, value in zip(
reversed(arg_spec.args),
reversed(arg_spec.defaults or ()),
):
default_options[name] = value
if arg_spec.kwonlydefaults:
default_options.update(arg_spec.kwonlydefaults)
return default_options
def _remove_duplicate_items(d0: dict[str, Any], d1: dict[str, Any]) -> dict[str, Any]:
"""performs d0 - d1 for all items only if d0[k] == d1[k]"""
return {k: v for k, v in d0.items() if not (k in d1 and d1[k] == v)}
def _pathlike_to_string(pathlike: os.PathLike[AnyStr] | AnyStr) -> str:
"""stringify a pathlike object"""
if isinstance(pathlike, os.PathLike):
pathlike = os.fspath(pathlike)
if isinstance(pathlike, bytes):
pathlike = pathlike.decode() # type: ignore
if "~" in pathlike: # type: ignore
pathlike = os.path.expanduser(pathlike)
if isinstance(pathlike, str):
return pathlike
else:
raise TypeError(f"can't stringify: {pathlike!r} of type {type(pathlike)!r}")
def _build_uri(
cls_name,
protocol,
path,
custom_args,
custom_options,
*,
ignore_options,
repr_fallback,
):
if ignore_options is True:
custom_options.clear()
elif ignore_options:
for key in ignore_options:
if key in custom_options:
del custom_options[key]
if not custom_args and not custom_options:
return f"{protocol}://{path}"
elif not custom_args:
# todo: ... can prettify some file systems
pass
if not repr_fallback:
raise ValueError("can't serialize urlpath to a pure uri")
# provide a repr like string
fs_params = [
*custom_args,
*(f"{k}={v!r}" for k, v in custom_options.items()),
]
return f"{cls_name}({', '.join(fs_params)}).open({path!r})"
[docs]def urlpathlike_to_uri(
urlpath: UrlpathLike,
*,
repr_fallback: bool = False,
ignore_options: Collection[str] | Literal[True] = (),
) -> str:
"""convert an urlpath-like object to an fsspec URI
Parameters
----------
urlpath :
an urlpath-like object
repr_fallback :
allow falling back to a repr like representation if encoding to a uri
is impossible due to `storage_args` or `storage_options`
ignore_options :
a set of options that should be omitted from the uri representation
Returns
-------
uri :
a URI string representation of the urlpath-like
"""
if isinstance(urlpath, str) and urlpath[0] == "{" and urlpath[-1] == "}":
obj = _deserialize_fsspec_json(urlpath)
if obj:
_fs = obj["fs"]
if isinstance(_fs, str):
_fs = json.loads(_fs)
cls_name = _fs["cls"]
custom_args = _fs.pop("args")
custom_options = _fs
path = obj["path"]
protocol = _fs.pop("protocol")
return _build_uri(
cls_name,
protocol,
path,
custom_args,
custom_options,
ignore_options=ignore_options,
repr_fallback=repr_fallback,
)
if is_fsspec_open_file_like(urlpath):
fs = urlpath.fs
path = urlpath.path
cls = type(fs)
cls_name = f"{cls.__module__}.{cls.__name__}"
custom_args = fs.storage_args
default_options = _get_constructor_default_options(cls)
custom_options = _remove_duplicate_items(fs.storage_options, default_options)
protocol = fs.protocol
if isinstance(protocol, (list, tuple)):
protocol = protocol[0]
return _build_uri(
cls_name,
protocol,
path,
custom_args,
custom_options,
ignore_options=ignore_options,
repr_fallback=repr_fallback,
)
else:
return _pathlike_to_string(urlpath) # type: ignore
[docs]def urlpathlike_to_string(
urlpath: UrlpathLike,
*,
ignore_options: Collection[str] = (),
) -> str:
"""convert an urlpath-like object and stringify it"""
if is_fsspec_open_file_like(urlpath):
fs: fsspec.AbstractFileSystem = urlpath.fs
path: str = urlpath.path
try:
serialized_fs = fs.to_json()
except NotImplementedError:
if ignore_options:
warnings.warn(
"ignore_options are not handled for FS that don't support to_json",
stacklevel=2,
)
serialized_fs = repr(
pickle.dumps(fs, protocol=_PADO_FSSPEC_PICKLE_PROTOCOL)
)
else:
if ignore_options:
d = json.loads(serialized_fs)
for opt in ignore_options:
d.pop(opt, None)
serialized_fs = json.dumps(d)
return json.dumps({"fs": serialized_fs, "path": path})
else:
if ignore_options:
warnings.warn(
"ignore_options are not handled for stringified UrlpathLike",
stacklevel=2,
)
return _pathlike_to_string(urlpath) # type: ignore
[docs]def urlpathlike_to_fsspec(
obj: UrlpathLike,
*,
mode: FsspecIOMode = "rb",
storage_options: dict[str, Any] | None = None,
) -> OpenFileLike:
"""use an urlpath-like object and return an fsspec.core.OpenFile"""
if is_fsspec_open_file_like(obj):
return obj
if storage_options is None:
storage_options = {}
try:
json_obj = json.loads(obj) # type: ignore
except (json.JSONDecodeError, TypeError):
if isinstance(obj, os.PathLike):
obj = os.fspath(obj)
if not isinstance(obj, str):
raise TypeError(f"got {obj!r} of type {type(obj)!r}")
return fsspec.open(obj, mode=mode, **storage_options)
else:
if not isinstance(json_obj, dict):
raise TypeError(f"got json {json_obj!r} of type {type(json_obj)!r}")
try:
_fs = json.loads(json_obj["fs"])
except json.JSONDecodeError:
# json_obj["fs"] is not json ...
if storage_options:
raise NotImplementedError(
"pickled filesystems can't change storage_options"
)
fs = _pado_pickle_load(json_obj["fs"])
else:
# json_obj["fs"] is json
if not isinstance(_fs, dict):
raise TypeError(f"expected dict, got {_fs!r}")
_fs.update(**storage_options)
fs = fsspec.AbstractFileSystem.from_json(json.dumps(_fs))
return fsopen(fs, json_obj["path"], mode=mode)
[docs]def urlpathlike_to_fs_and_path(
obj: UrlpathLike,
*,
storage_options: dict[str, Any] | None = None,
) -> Tuple[AbstractFileSystem, str]:
"""use an urlpath-like object and return an fsspec.AbstractFileSystem and a path"""
if is_fsspec_open_file_like(obj):
return obj.fs, obj.path
if storage_options is None:
storage_options = {}
try:
json_obj = json.loads(obj) # type: ignore
except (json.JSONDecodeError, TypeError):
if isinstance(obj, os.PathLike):
obj = os.fspath(obj)
if not isinstance(obj, str):
raise TypeError(f"got {obj!r} of type {type(obj)!r}")
fs, _, (path,) = fsspec.get_fs_token_paths(obj, storage_options=storage_options)
return fs, path
else:
if not isinstance(json_obj, dict):
raise TypeError(f"got json {json_obj!r} of type {type(json_obj)!r}")
try:
_fs = json.loads(json_obj["fs"])
except json.JSONDecodeError:
# json_obj["fs"] is not json ...
if storage_options:
raise NotImplementedError(
"pickled filesystems can't change storage_options"
)
fs = _pado_pickle_load(json_obj["fs"])
else:
# json_obj["fs"] is json
if not isinstance(_fs, dict):
raise TypeError(f"expected dict, got {_fs!r}")
_fs.update(**storage_options)
fs = fsspec.AbstractFileSystem.from_json(json.dumps(_fs))
return fs, json_obj["path"]
[docs]def urlpathlike_get_fs_cls(obj: UrlpathLike) -> type[AbstractFileSystem]:
"""get the urlpathlike filesystem class"""
if is_fsspec_open_file_like(obj):
return type(obj.fs)
obj_json = _deserialize_fsspec_json(obj)
if obj_json:
return _get_fsspec_cls_from_serialized_fs(obj_json["fs"])
if isinstance(obj, os.PathLike):
obj = os.fspath(obj)
if not isinstance(obj, str):
raise TypeError(f"got {obj!r} of type {type(obj)!r}")
return get_filesystem_class(get_protocol(obj))
[docs]def urlpathlike_get_path(
obj: UrlpathLike, *, fs_cls: type[AbstractFileSystem] | None = None
) -> str:
"""get the path from the urlpath"""
if is_fsspec_open_file_like(obj):
return obj.path
obj_json = _deserialize_fsspec_json(obj)
if obj_json:
return obj_json["path"]
if isinstance(obj, os.PathLike):
obj = os.fspath(obj)
if not isinstance(obj, str):
raise TypeError(f"got {obj!r} of type {type(obj)!r}")
if fs_cls is None:
fs_cls = get_filesystem_class(get_protocol(obj))
# noinspection PyProtectedMember
return fs_cls._strip_protocol(obj)
[docs]def urlpathlike_get_storage_args_options(
obj: UrlpathLike,
) -> tuple[tuple[Any, ...], dict[str, Any]]:
"""get the path from the urlpath"""
if is_fsspec_open_file_like(obj):
fs = obj.fs
return fs.storage_args, fs.storage_options
obj_json = _deserialize_fsspec_json(obj)
if obj_json:
fs_str = obj_json["fs"]
return _get_fsspec_storage_args_options_from_serialized_fs(fs_str)
if isinstance(obj, os.PathLike):
obj = os.fspath(obj)
if not isinstance(obj, str):
raise TypeError(f"got {obj!r} of type {type(obj)!r}")
return (), infer_storage_options(obj)
[docs]def urlpathlike_local_via_fs(
obj: UrlpathLike,
fs: AbstractFileSystem,
) -> OpenFileLike:
"""take an urlpath and access it via another fs"""
fs_cls = urlpathlike_get_fs_cls(obj)
if issubclass(fs_cls, LocalFileSystem):
path = urlpathlike_get_path(obj, fs_cls=fs_cls)
return fsopen(fs, path)
else:
return urlpathlike_to_fsspec(obj)
[docs]def urlpathlike_to_path_parts(obj: UrlpathLike) -> Tuple[str, ...]:
"""take an urlpathlike object and return the path parts
this does not instantiate the fsspec.AbstractFilesystem class.
(does not open connections, etc on instantiation)
"""
if is_fsspec_open_file_like(obj):
path = obj.path
else:
try:
json_obj = json.loads(obj) # type: ignore
except (json.JSONDecodeError, TypeError):
if isinstance(obj, os.PathLike):
obj = os.fspath(obj)
if not isinstance(obj, str):
raise TypeError(f"got {obj!r} of type {type(obj)!r}")
path = strip_protocol(obj)
else:
if not isinstance(json_obj, dict):
raise TypeError(f"got json {json_obj!r} of type {type(json_obj)!r}")
path = json_obj["path"]
return _os_path_parts(path)
[docs]def urlpathlike_to_localpath(
obj: UrlpathLike,
*,
mode: FsspecIOMode = "rb",
storage_options: dict[str, Any] | None = None,
) -> str:
"""take an urlpathlike object and return a local path"""
if "r" not in mode:
raise ValueError("urlpathlike_to_localpath only works for read modes")
of = urlpathlike_to_fsspec(obj, mode=mode, storage_options=storage_options)
if not getattr(of.fs, "local_file", False):
raise ValueError("FileSystem does not have attribute .local_file=True")
with of as f:
return f.name
[docs]def update_fs_storage_options(
fs: AbstractFileSystem, *, storage_options: dict[str, Any] | None
) -> AbstractFileSystem:
"""update the storage_options of an existing filesystem"""
if not storage_options:
return fs
make_instance, (cls, fs_args, fs_so) = fs.__reduce__()
fs_so.update(storage_options)
return make_instance(cls, fs_args, fs_so)
[docs]def fsopen(
fs: AbstractFileSystem,
path: str | os.PathLike[str],
*,
mode: FsspecIOMode = "rb",
) -> OpenFile:
"""small helper to support mode 'x' for fsspec filesystems"""
if mode not in get_args(FsspecIOMode):
raise ValueError("fsspec only supports a subset of IOModes")
if "x" in mode:
if fs.exists(path):
raise FileExistsError(f"{path!r} at {fs!r}")
else:
mode = mode.replace("x", "w") # type: ignore
return OpenFile(fs, path, mode=mode)
[docs]@contextmanager
def uncompressed(
file: Union[IO[bytes], ContextManager[IO[bytes]]]
) -> Iterator[IO[bytes]]:
"""contextmanager for reading nested compressed files
supported formats: GZIP, LZMA, ZIP, TAR
"""
GZIP = {b"\x1F\x8B"}
LZMA = {b"\xFD\x37\x7A\x58\x5A\x00"}
ZIP = {b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08"}
def is_tar(fileobj):
_pos = fileobj.tell()
try:
t = tarfile.open(fileobj=fileobj) # rewinds to 0 on failure
t.close()
return True
except tarfile.TarError:
return False
finally:
fileobj.seek(_pos)
with ExitStack() as stack:
f: IO[bytes]
if not (hasattr(file, "read") and hasattr(file, "seek")):
f = stack.enter_context(file)
else:
f = cast(IO[bytes], file)
pos = f.tell()
magic = f.read(8)
f.seek(pos)
if magic[:2] in GZIP:
fgz = cast(IO[bytes], stack.enter_context(gzip.open(f)))
yield stack.enter_context(uncompressed(fgz))
elif magic[:6] in LZMA:
fxz = cast(IO[bytes], stack.enter_context(lzma.open(f)))
yield stack.enter_context(uncompressed(fxz))
elif magic[:4] in ZIP:
fzip = stack.enter_context(zipfile.ZipFile(f))
paths = fzip.namelist()
if len(paths) != 1:
raise RuntimeError(
"zip must contain exactly one file: won't auto uncompress"
)
fzipped = stack.enter_context(fzip.open(paths[0], mode="r"))
yield stack.enter_context(uncompressed(fzipped))
elif is_tar(f):
ftar = stack.enter_context(tarfile.open(fileobj=f))
members = ftar.getmembers()
if len(members) != 1:
raise RuntimeError(
"tar must contain exactly one file: won't auto uncompress"
)
else:
ftar0 = ftar.extractfile(members[0])
if ftar0 is None:
raise RuntimeError("can't extract file from tar")
ftarred = stack.enter_context(ftar0)
yield stack.enter_context(uncompressed(ftarred))
else:
yield f
[docs]def urlpathlike_is_localfile(urlpath: UrlpathLike, must_exist: bool = True) -> bool:
"""Check whether an urlpath corresponds to a file on a local filesystem.
Parameters
----------
urlpath : UrlpathLike
URL to be checked
must_exist : bool, optional
If True, the file must exist on the local machine's filesystem. Default is True.
Returns
-------
bool
True if `urlpath` corresponds to a local file, False otherwise
"""
fs, pth = urlpathlike_to_fs_and_path(urlpath)
return isinstance(fs, LocalFileSystem) and (fs.exists(pth) or not must_exist)