Source code for fs.wrap

"""Collection of useful `~fs.wrapfs.WrapFS` subclasses.

Here's an example that opens a filesystem then makes it *read only*::

    >>> from fs import open_fs
    >>> from fs.wrap import read_only
    >>> projects_fs = open_fs('~/projects')
    >>> read_only_projects_fs = read_only(projects_fs)
    >>> read_only_projects_fs.remove('__init__.py')
    Traceback (most recent call last):
      ...
    fs.errors.ResourceReadOnly: resource '__init__.py' is read only

"""

from __future__ import print_function
from __future__ import unicode_literals

import typing

from .wrapfs import WrapFS
from .path import abspath, normpath, split
from .errors import ResourceReadOnly, ResourceNotFound
from .info import Info
from .mode import check_writable

if False:  # typing.TYPE_CHECKING
    from datetime import datetime
    from typing import (
        Any,
        BinaryIO,
        Collection,
        Dict,
        Iterator,
        IO,
        Optional,
        Text,
        Tuple,
    )
    from .base import FS
    from .info import Info, RawInfo
    from .subfs import SubFS
    from .permissions import Permissions


_W = typing.TypeVar("_W", bound="WrapFS")
_T = typing.TypeVar("_T", bound="FS")
_F = typing.TypeVar("_F", bound="FS", covariant=True)


[docs]def read_only(fs): # type: (_T) -> WrapReadOnly[_T] """Make a read-only filesystem. Arguments: fs (FS): A filesystem instance. Returns: FS: A read only version of ``fs`` """ return WrapReadOnly(fs)
[docs]def cache_directory(fs): # type: (_T) -> WrapCachedDir[_T] """Make a filesystem that caches directory information. Arguments: fs (FS): A filesystem instance. Returns: FS: A filesystem that caches results of `~FS.scandir`, `~FS.isdir` and other methods which read directory information. """ return WrapCachedDir(fs)
[docs]class WrapCachedDir(WrapFS[_F], typing.Generic[_F]): """Caches filesystem directory information. This filesystem caches directory information retrieved from a scandir call. This *may* speed up code that calls `~FS.isdir`, `~FS.isfile`, or `~FS.gettype` too frequently. Note: Using this wrap will prevent changes to directory information being visible to the filesystem object. Consequently it is best used only in a fairly limited scope where you don't expected anything on the filesystem to change. """ wrap_name = "cached-dir" def __init__(self, wrap_fs): # type: (_F) -> None super(WrapCachedDir, self).__init__(wrap_fs) self._cache = {} # type: Dict[Tuple[Text, frozenset], Dict[Text, Info]] def scandir( self, path, # type: Text namespaces=None, # type: Optional[Collection[Text]] page=None, # type: Optional[Tuple[int, int]] ): # type: (...) -> Iterator[Info] _path = abspath(normpath(path)) cache_key = (_path, frozenset(namespaces or ())) if cache_key not in self._cache: _scan_result = self._wrap_fs.scandir(path, namespaces=namespaces, page=page) _dir = {info.name: info for info in _scan_result} self._cache[cache_key] = _dir gen_scandir = iter(self._cache[cache_key].values()) return gen_scandir def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info _path = abspath(normpath(path)) if _path == "/": return Info({"basic": {"name": "", "is_dir": True}}) dir_path, resource_name = split(_path) cache_key = (dir_path, frozenset(namespaces or ())) if cache_key not in self._cache: self.scandir(dir_path, namespaces=namespaces) _dir = self._cache[cache_key] try: info = _dir[resource_name] except KeyError: raise ResourceNotFound(path) return info def isdir(self, path): # type: (Text) -> bool # FIXME(@althonos): this raises an error on non-existing file ! return self.getinfo(path).is_dir def isfile(self, path): # type: (Text) -> bool # FIXME(@althonos): this raises an error on non-existing file ! return not self.getinfo(path).is_dir
[docs]class WrapReadOnly(WrapFS[_F], typing.Generic[_F]): """Makes a Filesystem read-only. Any call that would would write data or modify the filesystem in any way will raise a `~fs.errors.ResourceReadOnly` exception. """ wrap_name = "read-only" def appendbytes(self, path, data): # type: (Text, bytes) -> None self.check() raise ResourceReadOnly(path) def appendtext( self, path, # type: Text text, # type: Text encoding="utf-8", # type: Text errors=None, # type: Optional[Text] newline="", # type: Text ): # type: (...) -> None self.check() raise ResourceReadOnly(path) def makedir( self, # type: _W path, # type: Text permissions=None, # type: Optional[Permissions] recreate=False, # type: bool ): # type: (...) -> SubFS[_W] self.check() raise ResourceReadOnly(path) def move(self, src_path, dst_path, overwrite=False): # type: (Text, Text, bool) -> None self.check() raise ResourceReadOnly(dst_path) def openbin(self, path, mode="r", buffering=-1, **options): # type: (Text, Text, int, **Any) -> BinaryIO self.check() if check_writable(mode): raise ResourceReadOnly(path) return self._wrap_fs.openbin(path, mode=mode, buffering=-1, **options) def remove(self, path): # type: (Text) -> None self.check() raise ResourceReadOnly(path) def removedir(self, path): # type: (Text) -> None self.check() raise ResourceReadOnly(path) def setinfo(self, path, info): # type: (Text, RawInfo) -> None self.check() raise ResourceReadOnly(path) def writetext( self, path, # type: Text contents, # type: Text encoding="utf-8", # type: Text errors=None, # type: Optional[Text] newline="", # type: Text ): # type: (...) -> None self.check() raise ResourceReadOnly(path) def settimes(self, path, accessed=None, modified=None): # type: (Text, Optional[datetime], Optional[datetime]) -> None self.check() raise ResourceReadOnly(path) def copy(self, src_path, dst_path, overwrite=False): # type: (Text, Text, bool) -> None self.check() raise ResourceReadOnly(dst_path) def create(self, path, wipe=False): # type: (Text, bool) -> bool self.check() raise ResourceReadOnly(path) def makedirs( self, # type: _W path, # type: Text permissions=None, # type: Optional[Permissions] recreate=False, # type: bool ): # type: (...) -> SubFS[_W] self.check() raise ResourceReadOnly(path) def open( self, path, # type: Text mode="r", # type: Text buffering=-1, # type: int encoding=None, # type: Optional[Text] errors=None, # type: Optional[Text] newline="", # type: Text line_buffering=False, # type: bool **options # type: Any ): # type: (...) -> IO self.check() if check_writable(mode): raise ResourceReadOnly(path) return self._wrap_fs.open( path, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, line_buffering=line_buffering, **options ) def writebytes(self, path, contents): # type: (Text, bytes) -> None self.check() raise ResourceReadOnly(path) def upload(self, path, file, chunk_size=None, **options): # type: (Text, BinaryIO, Optional[int], **Any) -> None self.check() raise ResourceReadOnly(path) def writefile( self, path, # type: Text file, # type: IO encoding=None, # type: Optional[Text] errors=None, # type: Optional[Text] newline="", # type: Text ): # type: (...) -> None self.check() raise ResourceReadOnly(path) def touch(self, path): # type: (Text) -> None self.check() raise ResourceReadOnly(path)