Source code for fs.memoryfs

"""Manage a volatile in-memory filesystem.
"""
from __future__ import absolute_import, unicode_literals

import typing

import contextlib
import io
import os
import six
import time
from collections import OrderedDict
from threading import RLock

from . import errors
from ._typing import overload
from .base import FS
from .copy import copy_modified_time
from .enums import ResourceType, Seek
from .info import Info
from .mode import Mode
from .path import isbase, iteratepath, normpath, split

if typing.TYPE_CHECKING:
    from typing import (
        Any,
        BinaryIO,
        Collection,
        Dict,
        Iterable,
        Iterator,
        List,
        Optional,
        SupportsInt,
        Text,
        Tuple,
        Union,
    )

    import array
    import mmap

    from .base import _OpendirFactory
    from .info import RawInfo
    from .permissions import Permissions
    from .subfs import SubFS

    _M = typing.TypeVar("_M", bound="MemoryFS")


@six.python_2_unicode_compatible
class _MemoryFile(io.RawIOBase):
    def __init__(self, path, memory_fs, mode, dir_entry):
        # type: (Text, MemoryFS, Text, _DirEntry) -> None
        super(_MemoryFile, self).__init__()
        self._path = path
        self._memory_fs = memory_fs
        self._mode = Mode(mode)
        self._dir_entry = dir_entry

        # We are opening a file - dir_entry.bytes_file is not None
        self._bytes_io = typing.cast(io.BytesIO, dir_entry.bytes_file)

        self.accessed_time = time.time()
        self.modified_time = time.time()
        self.pos = 0

        if self._mode.truncate:
            with self._dir_entry.lock:
                self._bytes_io.seek(0)
                self._bytes_io.truncate()
        elif self._mode.appending:
            with self._dir_entry.lock:
                self._bytes_io.seek(0, os.SEEK_END)
                self.pos = self._bytes_io.tell()

    def __str__(self):
        # type: () -> str
        _template = "<memoryfile '{path}' '{mode}'>"
        return _template.format(path=self._path, mode=self._mode)

    @property
    def mode(self):
        # type: () -> Text
        return self._mode.to_platform_bin()

    @contextlib.contextmanager
    def _seek_lock(self):
        # type: () -> Iterator[None]
        with self._dir_entry.lock:
            self._bytes_io.seek(self.pos)
            yield
            self.pos = self._bytes_io.tell()

    def on_modify(self):  # noqa: D401
        # type: () -> None
        """Called when file data is modified."""
        self._dir_entry.modified_time = self.modified_time = time.time()

    def on_access(self):  # noqa: D401
        # type: () -> None
        """Called when file is accessed."""
        self._dir_entry.accessed_time = self.accessed_time = time.time()

    def flush(self):
        # type: () -> None
        pass

    def __iter__(self):
        # type: () -> typing.Iterator[bytes]
        self._bytes_io.seek(self.pos)
        for line in self._bytes_io:
            yield line

    def next(self):
        # type: () -> bytes
        with self._seek_lock():
            self.on_access()
            return next(self._bytes_io)

    __next__ = next

    def readline(self, size=None):
        # type: (Optional[int]) -> bytes
        if not self._mode.reading:
            raise IOError("File not open for reading")
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.readline(size)

    def close(self):
        # type: () -> None
        if not self.closed:
            with self._dir_entry.lock:
                self._dir_entry.remove_open_file(self)
                super(_MemoryFile, self).close()

    def read(self, size=None):
        # type: (Optional[int]) -> bytes
        if not self._mode.reading:
            raise IOError("File not open for reading")
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.read(size)

    def readable(self):
        # type: () -> bool
        return self._mode.reading

    def readinto(self, buffer):
        # type (bytearray) -> Optional[int]
        if not self._mode.reading:
            raise IOError("File not open for reading")
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.readinto(buffer)

    def readlines(self, hint=-1):
        # type: (int) -> List[bytes]
        if not self._mode.reading:
            raise IOError("File not open for reading")
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.readlines(hint)

    def seekable(self):
        # type: () -> bool
        return True

    def seek(self, pos, whence=Seek.set):
        # type: (int, SupportsInt) -> int
        # NOTE(@althonos): allows passing both Seek.set and os.SEEK_SET
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.seek(pos, int(whence))

    def tell(self):
        # type: () -> int
        return self.pos

    def truncate(self, size=None):
        # type: (Optional[int]) -> int
        with self._seek_lock():
            self.on_modify()
            new_size = self._bytes_io.truncate(size)
            if size is not None and self._bytes_io.tell() < size:
                file_size = self._bytes_io.seek(0, os.SEEK_END)
                self._bytes_io.write(b"\0" * (size - file_size))
                self._bytes_io.seek(-size + file_size, os.SEEK_END)
            return size or new_size

    def writable(self):
        # type: () -> bool
        return self._mode.writing

    def write(self, data):
        # type: (Union[bytes, memoryview, array.array[Any], mmap.mmap]) -> int
        if not self._mode.writing:
            raise IOError("File not open for writing")
        with self._seek_lock():
            self.on_modify()
            return self._bytes_io.write(data)

    def writelines(self, sequence):
        # type: (Iterable[Union[bytes, memoryview, array.array[Any], mmap.mmap]]) -> None  # noqa: E501
        with self._seek_lock():
            self.on_modify()
            self._bytes_io.writelines(sequence)


class _DirEntry(object):
    def __init__(self, resource_type, name):
        # type: (ResourceType, Text) -> None
        self.resource_type = resource_type
        self.name = name
        self._dir = OrderedDict()  # type: typing.MutableMapping[Text, _DirEntry]
        self._open_files = []  # type: typing.MutableSequence[_MemoryFile]
        self._bytes_file = None  # type: Optional[io.BytesIO]
        self.lock = RLock()

        current_time = time.time()
        self.created_time = current_time
        self.accessed_time = current_time
        self.modified_time = current_time

        if not self.is_dir:
            self._bytes_file = io.BytesIO()

    @property
    def bytes_file(self):
        # type: () -> Optional[io.BytesIO]
        return self._bytes_file

    @property
    def is_dir(self):
        # type: () -> bool
        return self.resource_type == ResourceType.directory

    @property
    def size(self):
        # type: () -> int
        with self.lock:
            if self.is_dir:
                return 0
            else:
                _bytes_file = typing.cast(io.BytesIO, self._bytes_file)
                _bytes_file.seek(0, os.SEEK_END)
                return _bytes_file.tell()

    @overload
    def get_entry(self, name, default):  # noqa: F811
        # type: (Text, _DirEntry) -> _DirEntry
        pass

    @overload
    def get_entry(self, name):  # noqa: F811
        # type: (Text) -> Optional[_DirEntry]
        pass

    @overload
    def get_entry(self, name, default):  # noqa: F811
        # type: (Text, None) -> Optional[_DirEntry]
        pass

    def get_entry(self, name, default=None):  # noqa: F811
        # type: (Text, Optional[_DirEntry]) -> Optional[_DirEntry]
        assert self.is_dir, "must be a directory"
        return self._dir.get(name, default)

    def set_entry(self, name, dir_entry):
        # type: (Text, _DirEntry) -> None
        self._dir[name] = dir_entry

    def remove_entry(self, name):
        # type: (Text) -> None
        del self._dir[name]

    def clear(self):
        # type: () -> None
        self._dir.clear()

    def __contains__(self, name):
        # type: (object) -> bool
        return name in self._dir

    def __len__(self):
        # type: () -> int
        return len(self._dir)

    def list(self):
        # type: () -> List[Text]
        return list(self._dir.keys())

    def add_open_file(self, memory_file):
        # type: (_MemoryFile) -> None
        self._open_files.append(memory_file)

    def remove_open_file(self, memory_file):
        # type: (_MemoryFile) -> None
        self._open_files.remove(memory_file)

    def to_info(self, namespaces=None):
        # type: (Optional[Collection[Text]]) -> Info
        namespaces = namespaces or ()
        info = {"basic": {"name": self.name, "is_dir": self.is_dir}}
        if "details" in namespaces:
            info["details"] = {
                "_write": ["accessed", "modified"],
                "type": int(self.resource_type),
                "size": self.size,
                "accessed": self.accessed_time,
                "modified": self.modified_time,
                "created": self.created_time,
            }
        return Info(info)


[docs]@six.python_2_unicode_compatible class MemoryFS(FS): """A filesystem that stored in memory. Memory filesystems are useful for caches, temporary data stores, unit testing, etc. Since all the data is in memory, they are very fast, but non-permanent. The `MemoryFS` constructor takes no arguments. Examples: Create with the constructor:: >>> from fs.memoryfs import MemoryFS >>> mem_fs = MemoryFS() Or via an FS URL:: >>> import fs >>> mem_fs = fs.open_fs('mem://') """ _meta = { "case_insensitive": False, "invalid_path_chars": "\0", "network": False, "read_only": False, "thread_safe": True, "unicode_paths": True, "virtual": False, } # type: Dict[Text, Union[Text, int, bool, None]]
[docs] def __init__(self): # type: () -> None """Create an in-memory filesystem.""" self._meta = self._meta.copy() self.root = self._make_dir_entry(ResourceType.directory, "") super(MemoryFS, self).__init__()
def __repr__(self): # type: () -> str return "MemoryFS()" def __str__(self): # type: () -> str return "<memfs>" def _make_dir_entry(self, resource_type, name): # type: (ResourceType, Text) -> _DirEntry return _DirEntry(resource_type, name) def _get_dir_entry(self, dir_path): # type: (Text) -> Optional[_DirEntry] """Get a directory entry, or `None` if one doesn't exist.""" with self._lock: dir_path = normpath(dir_path) current_entry = self.root # type: Optional[_DirEntry] for path_component in iteratepath(dir_path): if current_entry is None: return None if not current_entry.is_dir: return None current_entry = current_entry.get_entry(path_component) return current_entry
[docs] def close(self): # type: () -> None if not self._closed: del self.root return super(MemoryFS, self).close()
[docs] def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info _path = self.validatepath(path) dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) return dir_entry.to_info(namespaces=namespaces)
[docs] def listdir(self, path): # type: (Text) -> List[Text] self.check() _path = self.validatepath(path) with self._lock: # locate and validate the entry corresponding to the given path dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) if not dir_entry.is_dir: raise errors.DirectoryExpected(path) # return the filenames in the order they were created return dir_entry.list()
if typing.TYPE_CHECKING: def opendir(self, path, factory=None): # type: (_M, Text, Optional[_OpendirFactory]) -> SubFS[_M] pass
[docs] def makedir( self, # type: _M path, # type: Text permissions=None, # type: Optional[Permissions] recreate=False, # type: bool ): # type: (...) -> SubFS[_M] _path = self.validatepath(path) with self._lock: if _path == "/": if recreate: return self.opendir(path) else: raise errors.DirectoryExists(path) dir_path, dir_name = split(_path) parent_dir = self._get_dir_entry(dir_path) if parent_dir is None: raise errors.ResourceNotFound(path) dir_entry = parent_dir.get_entry(dir_name) if dir_entry is not None and not recreate: raise errors.DirectoryExists(path) if dir_entry is None: new_dir = self._make_dir_entry(ResourceType.directory, dir_name) parent_dir.set_entry(dir_name, new_dir) return self.opendir(path)
[docs] def move(self, src_path, dst_path, overwrite=False, preserve_time=False): src_dir, src_name = split(self.validatepath(src_path)) dst_dir, dst_name = split(self.validatepath(dst_path)) with self._lock: src_dir_entry = self._get_dir_entry(src_dir) if src_dir_entry is None or src_name not in src_dir_entry: raise errors.ResourceNotFound(src_path) src_entry = src_dir_entry.get_entry(src_name) if src_entry.is_dir: raise errors.FileExpected(src_path) dst_dir_entry = self._get_dir_entry(dst_dir) if dst_dir_entry is None: raise errors.ResourceNotFound(dst_path) elif not overwrite and dst_name in dst_dir_entry: raise errors.DestinationExists(dst_path) # handle moving a file onto itself if src_dir == dst_dir and src_name == dst_name: if overwrite: return raise errors.DestinationExists(dst_path) # move the entry from the src folder to the dst folder dst_dir_entry.set_entry(dst_name, src_entry) src_dir_entry.remove_entry(src_name) # make sure to update the entry name itself (see #509) src_entry.name = dst_name if preserve_time: copy_modified_time(self, src_path, self, dst_path)
[docs] def movedir(self, src_path, dst_path, create=False, preserve_time=False): _src_path = self.validatepath(src_path) _dst_path = self.validatepath(dst_path) dst_dir, dst_name = split(_dst_path) src_dir, src_name = split(_src_path) # move a dir onto itself if _src_path == _dst_path: return # move a dir into itself if isbase(_src_path, _dst_path): raise errors.IllegalDestination(dst_path) with self._lock: src_dir_entry = self._get_dir_entry(src_dir) if src_dir_entry is None or src_name not in src_dir_entry: raise errors.ResourceNotFound(src_path) src_entry = src_dir_entry.get_entry(src_name) if not src_entry.is_dir: raise errors.DirectoryExpected(src_path) # move the entry from the src folder to the dst folder dst_dir_entry = self._get_dir_entry(dst_dir) if dst_dir_entry is None or (not create and dst_name not in dst_dir_entry): raise errors.ResourceNotFound(dst_path) # move the entry from the src folder to the dst folder dst_dir_entry.set_entry(dst_name, src_entry) src_dir_entry.remove_entry(src_name) # make sure to update the entry name itself (see #509) src_entry.name = dst_name if preserve_time: copy_modified_time(self, src_path, self, dst_path)
[docs] def openbin(self, path, mode="r", buffering=-1, **options): # type: (Text, Text, int, **Any) -> BinaryIO _mode = Mode(mode) _mode.validate_bin() _path = self.validatepath(path) dir_path, file_name = split(_path) if not file_name: raise errors.FileExpected(path) with self._lock: parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or not parent_dir_entry.is_dir: raise errors.ResourceNotFound(path) if _mode.create: if file_name not in parent_dir_entry: file_dir_entry = self._make_dir_entry(ResourceType.file, file_name) parent_dir_entry.set_entry(file_name, file_dir_entry) else: file_dir_entry = self._get_dir_entry(_path) # type: ignore if _mode.exclusive: raise errors.FileExists(path) if file_dir_entry.is_dir: raise errors.FileExpected(path) mem_file = _MemoryFile( path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry ) file_dir_entry.add_open_file(mem_file) return mem_file # type: ignore if file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) file_dir_entry = parent_dir_entry.get_entry(file_name) # type: ignore if file_dir_entry.is_dir: raise errors.FileExpected(path) mem_file = _MemoryFile( path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry ) file_dir_entry.add_open_file(mem_file) return mem_file # type: ignore
[docs] def remove(self, path): # type: (Text) -> None _path = self.validatepath(path) with self._lock: dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) file_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path)) if file_dir_entry.is_dir: raise errors.FileExpected(path) parent_dir_entry.remove_entry(file_name)
[docs] def removedir(self, path): # type: (Text) -> None # make sure we are not removing root _path = self.validatepath(path) if _path == "/": raise errors.RemoveRootError() # make sure the directory is empty if not self.isempty(path): raise errors.DirectoryNotEmpty(path) # we can now delegate to removetree since we confirmed that # * path exists (isempty) # * path is a folder (isempty) # * path is not root self.removetree(_path)
[docs] def removetree(self, path): # type: (Text) -> None _path = self.validatepath(path) with self._lock: if _path == "/": self.root.clear() return dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) dir_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path)) if not dir_dir_entry.is_dir: raise errors.DirectoryExpected(path) parent_dir_entry.remove_entry(file_name)
[docs] def scandir( self, path, # type: Text namespaces=None, # type: Optional[Collection[Text]] page=None, # type: Optional[Tuple[int, int]] ): # type: (...) -> Iterator[Info] self.check() _path = self.validatepath(path) with self._lock: # locate and validate the entry corresponding to the given path dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) if not dir_entry.is_dir: raise errors.DirectoryExpected(path) # if paging was requested, slice the filenames filenames = dir_entry.list() if page is not None: start, end = page filenames = filenames[start:end] # yield info with the right namespaces for name in filenames: entry = typing.cast(_DirEntry, dir_entry.get_entry(name)) yield entry.to_info(namespaces=namespaces)
[docs] def setinfo(self, path, info): # type: (Text, RawInfo) -> None _path = self.validatepath(path) with self._lock: dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) resource_entry = typing.cast( _DirEntry, parent_dir_entry.get_entry(file_name) ) if "details" in info: details = info["details"] if "accessed" in details: resource_entry.accessed_time = details["accessed"] # type: ignore if "modified" in details: resource_entry.modified_time = details["modified"] # type: ignore