Source code for fs.memoryfs

"""Manage a volatile in-memory filesystem.
"""
from __future__ import absolute_import
from __future__ import unicode_literals

import contextlib
import io
import os
import time
import typing
from collections import OrderedDict
from threading import RLock

import six

from . import errors
from .base import FS
from .enums import ResourceType, Seek
from .info import Info
from .mode import Mode
from .path import iteratepath
from .path import normpath
from .path import split
from ._typing import overload

if False:  # typing.TYPE_CHECKING
    from typing import (
        Any,
        BinaryIO,
        Collection,
        Dict,
        Iterator,
        List,
        Optional,
        SupportsInt,
        Union,
        Text,
    )
    from .base import _OpendirFactory
    from .info import RawInfo
    from .permissions import Permissions
    from .subfs import SubFS

    _M = typing.TypeVar("_M", bound="MemoryFS")


@six.python_2_unicode_compatible
class _MemoryFile(io.RawIOBase):
    def __init__(self, path, memory_fs, mode, dir_entry):
        # type: (Text, MemoryFS, Text, _DirEntry) -> None
        super(_MemoryFile, self).__init__()
        self._path = path
        self._memory_fs = memory_fs
        self._mode = Mode(mode)
        self._dir_entry = dir_entry

        # We are opening a file - dir_entry.bytes_file is not None
        self._bytes_io = typing.cast(io.BytesIO, dir_entry.bytes_file)

        self.accessed_time = time.time()
        self.modified_time = time.time()
        self.pos = 0

        if self._mode.truncate:
            with self._dir_entry.lock:
                self._bytes_io.seek(0)
                self._bytes_io.truncate()
        elif self._mode.appending:
            with self._dir_entry.lock:
                self._bytes_io.seek(0, os.SEEK_END)
                self.pos = self._bytes_io.tell()

    def __str__(self):
        # type: () -> str
        _template = "<memoryfile '{path}' '{mode}'>"
        return _template.format(path=self._path, mode=self._mode)

    @contextlib.contextmanager
    def _seek_lock(self):
        # type: () -> Iterator[None]
        with self._dir_entry.lock:
            self._bytes_io.seek(self.pos)
            yield
            self.pos = self._bytes_io.tell()

    def on_modify(self):  # noqa: D401
        # type: () -> None
        """Called when file data is modified.
        """
        self._dir_entry.modified_time = self.modified_time = time.time()

    def on_access(self):  # noqa: D401
        # type: () -> None
        """Called when file is accessed.
        """
        self._dir_entry.accessed_time = self.accessed_time = time.time()

    def flush(self):
        # type: () -> None
        pass

    def __iter__(self):
        # type: () -> typing.Iterator[bytes]
        self._bytes_io.seek(self.pos)
        for line in self._bytes_io:
            yield line

    def next(self):
        # type: () -> bytes
        with self._seek_lock():
            return next(self._bytes_io)

    __next__ = next

    def readline(self, size=-1):
        # type: (int) -> bytes
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.readline(size)

    def close(self):
        # type: () -> None
        if not self.closed:
            with self._dir_entry.lock:
                self._dir_entry.remove_open_file(self)
                super(_MemoryFile, self).close()

    def read(self, size=-1):
        # type: (Optional[int]) -> bytes
        if not self._mode.reading:
            raise IOError("File not open for reading")
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.read(size)

    def readable(self):
        # type: () -> bool
        return self._mode.reading

    def readlines(self, hint=-1):
        # type: (int) -> List[bytes]
        with self._seek_lock():
            return self._bytes_io.readlines(hint)

    def seekable(self):
        # type: () -> bool
        return True

    def seek(self, pos, whence=Seek.set):
        # type: (int, SupportsInt) -> int
        # NOTE(@althonos): allows passing both Seek.set and os.SEEK_SET
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.seek(pos, int(whence))

    def tell(self):
        # type: () -> int
        return self.pos

    def truncate(self, size=None):
        # type: (Optional[int]) -> int
        with self._seek_lock():
            self.on_modify()
            new_size = self._bytes_io.truncate(size)
            if size is not None and self._bytes_io.tell() < size:
                file_size = self._bytes_io.seek(0, os.SEEK_END)
                self._bytes_io.write(b"\0" * (size - file_size))
                self._bytes_io.seek(-size + file_size, os.SEEK_END)
            return size or new_size

    def writable(self):
        # type: () -> bool
        return self._mode.writing

    def write(self, data):
        # type: (bytes) -> int
        if not self._mode.writing:
            raise IOError("File not open for writing")
        with self._seek_lock():
            self.on_modify()
            return self._bytes_io.write(data)

    def writelines(self, sequence):  # type: ignore
        # type: (List[bytes]) -> None
        # FIXME(@althonos): For some reason the stub for IOBase.writelines
        #      is List[Any] ?! It should probably be Iterable[ByteString]
        with self._seek_lock():
            self.on_modify()
            self._bytes_io.writelines(sequence)


class _DirEntry(object):
    def __init__(self, resource_type, name):
        # type: (ResourceType, Text) -> None
        self.resource_type = resource_type
        self.name = name
        self._dir = OrderedDict()  # type: typing.MutableMapping[Text, _DirEntry]
        self._open_files = []  # type: typing.MutableSequence[_MemoryFile]
        self._bytes_file = None  # type: Optional[io.BytesIO]
        self.lock = RLock()

        current_time = time.time()
        self.created_time = current_time
        self.accessed_time = current_time
        self.modified_time = current_time

        if not self.is_dir:
            self._bytes_file = io.BytesIO()

    @property
    def bytes_file(self):
        # type: () -> Optional[io.BytesIO]
        return self._bytes_file

    @property
    def is_dir(self):
        # type: () -> bool
        return self.resource_type == ResourceType.directory

    @property
    def size(self):
        # type: () -> int
        with self.lock:
            if self.is_dir:
                return 0
            else:
                _bytes_file = typing.cast(io.BytesIO, self._bytes_file)
                _bytes_file.seek(0, os.SEEK_END)
                return _bytes_file.tell()

    @overload
    def get_entry(self, name, default):  # pragma: no cover
        # type: (Text, _DirEntry) -> _DirEntry
        pass

    @overload
    def get_entry(self, name):  # pragma: no cover
        # type: (Text) -> Optional[_DirEntry]
        pass

    @overload
    def get_entry(self, name, default):  # pragma: no cover
        # type: (Text, None) -> Optional[_DirEntry]
        pass

    def get_entry(self, name, default=None):
        # type: (Text, Optional[_DirEntry]) -> Optional[_DirEntry]
        assert self.is_dir, "must be a directory"
        return self._dir.get(name, default)

    def set_entry(self, name, dir_entry):
        # type: (Text, _DirEntry) -> None
        self._dir[name] = dir_entry

    def remove_entry(self, name):
        # type: (Text) -> None
        del self._dir[name]

    def __contains__(self, name):
        # type: (object) -> bool
        return name in self._dir

    def __len__(self):
        # type: () -> int
        return len(self._dir)

    def list(self):
        # type: () -> List[Text]
        return list(self._dir.keys())

    def add_open_file(self, memory_file):
        # type: (_MemoryFile) -> None
        self._open_files.append(memory_file)

    def remove_open_file(self, memory_file):
        # type: (_MemoryFile) -> None
        self._open_files.remove(memory_file)


[docs]@six.python_2_unicode_compatible class MemoryFS(FS): """A filesystem that stored in memory. Memory filesystems are useful for caches, temporary data stores, unit testing, etc. Since all the data is in memory, they are very fast, but non-permanent. The `MemoryFS` constructor takes no arguments. Example: >>> mem_fs = MemoryFS() Or via an FS URL: >>> import fs >>> mem_fs = fs.open_fs('mem://') """ _meta = { "case_insensitive": False, "invalid_path_chars": "\0", "network": False, "read_only": False, "thread_safe": True, "unicode_paths": True, "virtual": False, } # type: Dict[Text, Union[Text, int, bool, None]] def __init__(self): # type: () -> None """Create an in-memory filesystem. """ self._meta = self._meta.copy() self.root = self._make_dir_entry(ResourceType.directory, "") super(MemoryFS, self).__init__() def __repr__(self): # type: () -> str return "MemoryFS()" def __str__(self): # type: () -> str return "<memfs>" def _make_dir_entry(self, resource_type, name): # type: (ResourceType, Text) -> _DirEntry return _DirEntry(resource_type, name) def _get_dir_entry(self, dir_path): # type: (Text) -> Optional[_DirEntry] """Get a directory entry, or `None` if one doesn't exist. """ with self._lock: dir_path = normpath(dir_path) current_entry = self.root # type: Optional[_DirEntry] for path_component in iteratepath(dir_path): if current_entry is None: return None if not current_entry.is_dir: return None current_entry = current_entry.get_entry(path_component) return current_entry def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info namespaces = namespaces or () _path = self.validatepath(path) dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) info = {"basic": {"name": dir_entry.name, "is_dir": dir_entry.is_dir}} if "details" in namespaces: info["details"] = { "_write": ["accessed", "modified"], "type": int(dir_entry.resource_type), "size": dir_entry.size, "accessed": dir_entry.accessed_time, "modified": dir_entry.modified_time, "created": dir_entry.created_time, } return Info(info) def listdir(self, path): # type: (Text) -> List[Text] self.check() _path = self.validatepath(path) with self._lock: dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) if not dir_entry.is_dir: raise errors.DirectoryExpected(path) return dir_entry.list() if False: # typing.TYPE_CHECKING def opendir(self, path, factory=None): # type: (_M, Text, Optional[_OpendirFactory]) -> SubFS[_M] pass def makedir( self, # type: _M path, # type: Text permissions=None, # type: Optional[Permissions] recreate=False, # type: bool ): # type: (...) -> SubFS[_M] _path = self.validatepath(path) with self._lock: if _path == "/": if recreate: return self.opendir(path) else: raise errors.DirectoryExists(path) dir_path, dir_name = split(_path) parent_dir = self._get_dir_entry(dir_path) if parent_dir is None: raise errors.ResourceNotFound(path) dir_entry = parent_dir.get_entry(dir_name) if dir_entry is not None and not recreate: raise errors.DirectoryExists(path) if dir_entry is None: new_dir = self._make_dir_entry(ResourceType.directory, dir_name) parent_dir.set_entry(dir_name, new_dir) return self.opendir(path) def openbin(self, path, mode="r", buffering=-1, **options): # type: (Text, Text, int, **Any) -> BinaryIO _mode = Mode(mode) _mode.validate_bin() _path = self.validatepath(path) dir_path, file_name = split(_path) if not file_name: raise errors.FileExpected(path) with self._lock: parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or not parent_dir_entry.is_dir: raise errors.ResourceNotFound(path) if _mode.create: if file_name not in parent_dir_entry: file_dir_entry = self._make_dir_entry(ResourceType.file, file_name) parent_dir_entry.set_entry(file_name, file_dir_entry) else: file_dir_entry = self._get_dir_entry(_path) # type: ignore if _mode.exclusive: raise errors.FileExists(path) if file_dir_entry.is_dir: raise errors.FileExpected(path) mem_file = _MemoryFile( path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry ) file_dir_entry.add_open_file(mem_file) return mem_file # type: ignore if file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) file_dir_entry = parent_dir_entry.get_entry(file_name) # type: ignore if file_dir_entry.is_dir: raise errors.FileExpected(path) mem_file = _MemoryFile( path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry ) file_dir_entry.add_open_file(mem_file) return mem_file # type: ignore def remove(self, path): # type: (Text) -> None _path = self.validatepath(path) with self._lock: dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) file_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path)) if file_dir_entry.is_dir: raise errors.FileExpected(path) parent_dir_entry.remove_entry(file_name) def removedir(self, path): # type: (Text) -> None _path = self.validatepath(path) if _path == "/": raise errors.RemoveRootError() with self._lock: dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) dir_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path)) if not dir_dir_entry.is_dir: raise errors.DirectoryExpected(path) if len(dir_dir_entry): raise errors.DirectoryNotEmpty(path) parent_dir_entry.remove_entry(file_name) def setinfo(self, path, info): # type: (Text, RawInfo) -> None _path = self.validatepath(path) with self._lock: dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) if parent_dir_entry is None or file_name not in parent_dir_entry: raise errors.ResourceNotFound(path) resource_entry = typing.cast( _DirEntry, parent_dir_entry.get_entry(file_name) ) if "details" in info: details = info["details"] if "accessed" in details: resource_entry.accessed_time = details["accessed"] # type: ignore if "modified" in details: resource_entry.modified_time = details["modified"] # type: ignore