Source code for fs.mountfs

"""Manage other filesystems as a folder hierarchy.
"""

from __future__ import absolute_import, print_function, unicode_literals

import typing

from six import text_type

from . import errors
from .base import FS
from .memoryfs import MemoryFS
from .mode import validate_open_mode, validate_openbin_mode
from .path import abspath, forcedir, normpath

if typing.TYPE_CHECKING:
    from typing import (
        IO,
        Any,
        BinaryIO,
        Collection,
        Iterator,
        List,
        MutableSequence,
        Optional,
        Text,
        Tuple,
        Union,
    )

    from .enums import ResourceType
    from .info import Info, RawInfo
    from .permissions import Permissions
    from .subfs import SubFS

    M = typing.TypeVar("M", bound="MountFS")


[docs]class MountError(Exception): """Thrown when mounts conflict."""
[docs]class MountFS(FS): """A virtual filesystem that maps directories on to other file-systems.""" _meta = { "virtual": True, "read_only": False, "unicode_paths": True, "case_insensitive": False, "invalid_path_chars": "\0", }
[docs] def __init__(self, auto_close=True): # type: (bool) -> None """Create a new `MountFS` instance. Arguments: auto_close (bool): If `True` (the default), the child filesystems will be closed when `MountFS` is closed. """ super(MountFS, self).__init__() self.auto_close = auto_close self.default_fs = MemoryFS() # type: FS self.mounts = [] # type: MutableSequence[Tuple[Text, FS]]
def __repr__(self): # type: () -> str return "MountFS(auto_close={!r})".format(self.auto_close) def __str__(self): # type: () -> str return "<mountfs>" def _delegate(self, path): # type: (Text) -> Tuple[FS, Text] """Get the delegate FS for a given path. Arguments: path (str): A path. Returns: (FS, str): a tuple of ``(<fs>, <path>)`` for a mounted filesystem, or ``(None, None)`` if no filesystem is mounted on the given ``path``. """ _path = forcedir(abspath(normpath(path))) is_mounted = _path.startswith for mount_path, fs in self.mounts: if is_mounted(mount_path): return fs, _path[len(mount_path) :].rstrip("/") return self.default_fs, path
[docs] def mount(self, path, fs): # type: (Text, Union[FS, Text]) -> None """Mounts a host FS object on a given path. Arguments: path (str): A path within the MountFS. fs (FS or str): A filesystem (instance or URL) to mount. """ if isinstance(fs, text_type): from .opener import open_fs fs = open_fs(fs) if not isinstance(fs, FS): raise TypeError("fs argument must be an FS object or a FS URL") if fs is self: raise ValueError("Unable to mount self") _path = forcedir(abspath(normpath(path))) for mount_path, _ in self.mounts: if _path.startswith(mount_path): raise MountError("mount point overlaps existing mount") self.mounts.append((_path, fs)) self.default_fs.makedirs(_path, recreate=True)
[docs] def close(self): # type: () -> None # Explicitly closes children if requested if self.auto_close: for _path, fs in self.mounts: fs.close() del self.mounts[:] self.default_fs.close() super(MountFS, self).close()
[docs] def desc(self, path): # type: (Text) -> Text if not self.exists(path): raise errors.ResourceNotFound(path) fs, delegate_path = self._delegate(path) if fs is self.default_fs: fs = self return "{path} on {fs}".format(fs=fs, path=delegate_path)
[docs] def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info self.check() fs, _path = self._delegate(path) return fs.getinfo(_path, namespaces=namespaces)
[docs] def listdir(self, path): # type: (Text) -> List[Text] self.check() fs, _path = self._delegate(path) return fs.listdir(_path)
[docs] def makedir(self, path, permissions=None, recreate=False): # type: (Text, Optional[Permissions], bool) -> SubFS[FS] self.check() fs, _path = self._delegate(path) return fs.makedir(_path, permissions=permissions, recreate=recreate)
[docs] def openbin(self, path, mode="r", buffering=-1, **kwargs): # type: (Text, Text, int, **Any) -> BinaryIO validate_openbin_mode(mode) self.check() fs, _path = self._delegate(path) return fs.openbin(_path, mode=mode, buffering=-1, **kwargs)
[docs] def remove(self, path): # type: (Text) -> None self.check() fs, _path = self._delegate(path) return fs.remove(_path)
[docs] def removedir(self, path): # type: (Text) -> None self.check() path = normpath(path) if path in ("", "/"): raise errors.RemoveRootError(path) fs, _path = self._delegate(path) return fs.removedir(_path)
[docs] def readbytes(self, path): # type: (Text) -> bytes self.check() fs, _path = self._delegate(path) return fs.readbytes(_path)
[docs] def download(self, path, file, chunk_size=None, **options): # type: (Text, BinaryIO, Optional[int], **Any) -> None fs, _path = self._delegate(path) return fs.download(_path, file, chunk_size=chunk_size, **options)
[docs] def readtext( self, path, # type: Text encoding=None, # type: Optional[Text] errors=None, # type: Optional[Text] newline="", # type: Text ): # type: (...) -> Text self.check() fs, _path = self._delegate(path) return fs.readtext(_path, encoding=encoding, errors=errors, newline=newline)
[docs] def getsize(self, path): # type: (Text) -> int self.check() fs, _path = self._delegate(path) return fs.getsize(_path)
[docs] def getsyspath(self, path): # type: (Text) -> Text self.check() fs, _path = self._delegate(path) return fs.getsyspath(_path)
[docs] def gettype(self, path): # type: (Text) -> ResourceType self.check() fs, _path = self._delegate(path) return fs.gettype(_path)
[docs] def geturl(self, path, purpose="download"): # type: (Text, Text) -> Text self.check() fs, _path = self._delegate(path) return fs.geturl(_path, purpose=purpose)
[docs] def hasurl(self, path, purpose="download"): # type: (Text, Text) -> bool self.check() fs, _path = self._delegate(path) return fs.hasurl(_path, purpose=purpose)
[docs] def isdir(self, path): # type: (Text) -> bool self.check() fs, _path = self._delegate(path) return fs.isdir(_path)
[docs] def isfile(self, path): # type: (Text) -> bool self.check() fs, _path = self._delegate(path) return fs.isfile(_path)
[docs] def scandir( self, path, # type: Text namespaces=None, # type: Optional[Collection[Text]] page=None, # type: Optional[Tuple[int, int]] ): # type: (...) -> Iterator[Info] self.check() fs, _path = self._delegate(path) return fs.scandir(_path, namespaces=namespaces, page=page)
[docs] def setinfo(self, path, info): # type: (Text, RawInfo) -> None self.check() fs, _path = self._delegate(path) return fs.setinfo(_path, info)
[docs] def validatepath(self, path): # type: (Text) -> Text self.check() fs, _path = self._delegate(path) fs.validatepath(_path) path = abspath(normpath(path)) return path
[docs] def open( self, path, # type: Text mode="r", # type: Text buffering=-1, # type: int encoding=None, # type: Optional[Text] errors=None, # type: Optional[Text] newline="", # type: Text **options # type: Any ): # type: (...) -> IO validate_open_mode(mode) self.check() fs, _path = self._delegate(path) return fs.open( _path, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, **options )
[docs] def upload(self, path, file, chunk_size=None, **options): # type: (Text, BinaryIO, Optional[int], **Any) -> None self.check() fs, _path = self._delegate(path) return fs.upload(_path, file, chunk_size=chunk_size, **options)
[docs] def writebytes(self, path, contents): # type: (Text, bytes) -> None self.check() fs, _path = self._delegate(path) return fs.writebytes(_path, contents)
[docs] def writetext( self, path, # type: Text contents, # type: Text encoding="utf-8", # type: Text errors=None, # type: Optional[Text] newline="", # type: Text ): # type: (...) -> None fs, _path = self._delegate(path) return fs.writetext( _path, contents, encoding=encoding, errors=errors, newline=newline )