"""Manage a volatile in-memory filesystem.
"""
from __future__ import absolute_import, unicode_literals
import typing
import contextlib
import io
import os
import six
import time
from collections import OrderedDict
from threading import RLock
from . import errors
from ._typing import overload
from .base import FS
from .copy import copy_modified_time
from .enums import ResourceType, Seek
from .info import Info
from .mode import Mode
from .path import isbase, iteratepath, normpath, split
if typing.TYPE_CHECKING:
from typing import (
Any,
BinaryIO,
Collection,
Dict,
Iterable,
Iterator,
List,
Optional,
SupportsInt,
Text,
Tuple,
Union,
)
import array
import mmap
from .base import _OpendirFactory
from .info import RawInfo
from .permissions import Permissions
from .subfs import SubFS
_M = typing.TypeVar("_M", bound="MemoryFS")
@six.python_2_unicode_compatible
class _MemoryFile(io.RawIOBase):
def __init__(self, path, memory_fs, mode, dir_entry):
# type: (Text, MemoryFS, Text, _DirEntry) -> None
super(_MemoryFile, self).__init__()
self._path = path
self._memory_fs = memory_fs
self._mode = Mode(mode)
self._dir_entry = dir_entry
# We are opening a file - dir_entry.bytes_file is not None
self._bytes_io = typing.cast(io.BytesIO, dir_entry.bytes_file)
self.accessed_time = time.time()
self.modified_time = time.time()
self.pos = 0
if self._mode.truncate:
with self._dir_entry.lock:
self._bytes_io.seek(0)
self._bytes_io.truncate()
elif self._mode.appending:
with self._dir_entry.lock:
self._bytes_io.seek(0, os.SEEK_END)
self.pos = self._bytes_io.tell()
def __str__(self):
# type: () -> str
_template = "<memoryfile '{path}' '{mode}'>"
return _template.format(path=self._path, mode=self._mode)
@property
def mode(self):
# type: () -> Text
return self._mode.to_platform_bin()
@contextlib.contextmanager
def _seek_lock(self):
# type: () -> Iterator[None]
with self._dir_entry.lock:
self._bytes_io.seek(self.pos)
yield
self.pos = self._bytes_io.tell()
def on_modify(self): # noqa: D401
# type: () -> None
"""Called when file data is modified."""
self._dir_entry.modified_time = self.modified_time = time.time()
def on_access(self): # noqa: D401
# type: () -> None
"""Called when file is accessed."""
self._dir_entry.accessed_time = self.accessed_time = time.time()
def flush(self):
# type: () -> None
pass
def __iter__(self):
# type: () -> typing.Iterator[bytes]
self._bytes_io.seek(self.pos)
for line in self._bytes_io:
yield line
def next(self):
# type: () -> bytes
with self._seek_lock():
self.on_access()
return next(self._bytes_io)
__next__ = next
def readline(self, size=None):
# type: (Optional[int]) -> bytes
if not self._mode.reading:
raise IOError("File not open for reading")
with self._seek_lock():
self.on_access()
return self._bytes_io.readline(size)
def close(self):
# type: () -> None
if not self.closed:
with self._dir_entry.lock:
self._dir_entry.remove_open_file(self)
super(_MemoryFile, self).close()
def read(self, size=None):
# type: (Optional[int]) -> bytes
if not self._mode.reading:
raise IOError("File not open for reading")
with self._seek_lock():
self.on_access()
return self._bytes_io.read(size)
def readable(self):
# type: () -> bool
return self._mode.reading
def readinto(self, buffer):
# type (bytearray) -> Optional[int]
if not self._mode.reading:
raise IOError("File not open for reading")
with self._seek_lock():
self.on_access()
return self._bytes_io.readinto(buffer)
def readlines(self, hint=-1):
# type: (int) -> List[bytes]
if not self._mode.reading:
raise IOError("File not open for reading")
with self._seek_lock():
self.on_access()
return self._bytes_io.readlines(hint)
def seekable(self):
# type: () -> bool
return True
def seek(self, pos, whence=Seek.set):
# type: (int, SupportsInt) -> int
# NOTE(@althonos): allows passing both Seek.set and os.SEEK_SET
with self._seek_lock():
self.on_access()
return self._bytes_io.seek(pos, int(whence))
def tell(self):
# type: () -> int
return self.pos
def truncate(self, size=None):
# type: (Optional[int]) -> int
with self._seek_lock():
self.on_modify()
new_size = self._bytes_io.truncate(size)
if size is not None and self._bytes_io.tell() < size:
file_size = self._bytes_io.seek(0, os.SEEK_END)
self._bytes_io.write(b"\0" * (size - file_size))
self._bytes_io.seek(-size + file_size, os.SEEK_END)
return size or new_size
def writable(self):
# type: () -> bool
return self._mode.writing
def write(self, data):
# type: (Union[bytes, memoryview, array.array[Any], mmap.mmap]) -> int
if not self._mode.writing:
raise IOError("File not open for writing")
with self._seek_lock():
self.on_modify()
return self._bytes_io.write(data)
def writelines(self, sequence):
# type: (Iterable[Union[bytes, memoryview, array.array[Any], mmap.mmap]]) -> None # noqa: E501
with self._seek_lock():
self.on_modify()
self._bytes_io.writelines(sequence)
class _DirEntry(object):
def __init__(self, resource_type, name):
# type: (ResourceType, Text) -> None
self.resource_type = resource_type
self.name = name
self._dir = OrderedDict() # type: typing.MutableMapping[Text, _DirEntry]
self._open_files = [] # type: typing.MutableSequence[_MemoryFile]
self._bytes_file = None # type: Optional[io.BytesIO]
self.lock = RLock()
current_time = time.time()
self.created_time = current_time
self.accessed_time = current_time
self.modified_time = current_time
if not self.is_dir:
self._bytes_file = io.BytesIO()
@property
def bytes_file(self):
# type: () -> Optional[io.BytesIO]
return self._bytes_file
@property
def is_dir(self):
# type: () -> bool
return self.resource_type == ResourceType.directory
@property
def size(self):
# type: () -> int
with self.lock:
if self.is_dir:
return 0
else:
_bytes_file = typing.cast(io.BytesIO, self._bytes_file)
_bytes_file.seek(0, os.SEEK_END)
return _bytes_file.tell()
@overload
def get_entry(self, name, default): # noqa: F811
# type: (Text, _DirEntry) -> _DirEntry
pass
@overload
def get_entry(self, name): # noqa: F811
# type: (Text) -> Optional[_DirEntry]
pass
@overload
def get_entry(self, name, default): # noqa: F811
# type: (Text, None) -> Optional[_DirEntry]
pass
def get_entry(self, name, default=None): # noqa: F811
# type: (Text, Optional[_DirEntry]) -> Optional[_DirEntry]
assert self.is_dir, "must be a directory"
return self._dir.get(name, default)
def set_entry(self, name, dir_entry):
# type: (Text, _DirEntry) -> None
self._dir[name] = dir_entry
def remove_entry(self, name):
# type: (Text) -> None
del self._dir[name]
def clear(self):
# type: () -> None
self._dir.clear()
def __contains__(self, name):
# type: (object) -> bool
return name in self._dir
def __len__(self):
# type: () -> int
return len(self._dir)
def list(self):
# type: () -> List[Text]
return list(self._dir.keys())
def add_open_file(self, memory_file):
# type: (_MemoryFile) -> None
self._open_files.append(memory_file)
def remove_open_file(self, memory_file):
# type: (_MemoryFile) -> None
self._open_files.remove(memory_file)
def to_info(self, namespaces=None):
# type: (Optional[Collection[Text]]) -> Info
namespaces = namespaces or ()
info = {"basic": {"name": self.name, "is_dir": self.is_dir}}
if "details" in namespaces:
info["details"] = {
"_write": ["accessed", "modified"],
"type": int(self.resource_type),
"size": self.size,
"accessed": self.accessed_time,
"modified": self.modified_time,
"created": self.created_time,
}
return Info(info)
[docs]@six.python_2_unicode_compatible
class MemoryFS(FS):
"""A filesystem that stored in memory.
Memory filesystems are useful for caches, temporary data stores,
unit testing, etc. Since all the data is in memory, they are very
fast, but non-permanent. The `MemoryFS` constructor takes no
arguments.
Examples:
Create with the constructor::
>>> from fs.memoryfs import MemoryFS
>>> mem_fs = MemoryFS()
Or via an FS URL::
>>> import fs
>>> mem_fs = fs.open_fs('mem://')
"""
_meta = {
"case_insensitive": False,
"invalid_path_chars": "\0",
"network": False,
"read_only": False,
"thread_safe": True,
"unicode_paths": True,
"virtual": False,
} # type: Dict[Text, Union[Text, int, bool, None]]
[docs] def __init__(self):
# type: () -> None
"""Create an in-memory filesystem."""
self._meta = self._meta.copy()
self.root = self._make_dir_entry(ResourceType.directory, "")
super(MemoryFS, self).__init__()
def __repr__(self):
# type: () -> str
return "MemoryFS()"
def __str__(self):
# type: () -> str
return "<memfs>"
def _make_dir_entry(self, resource_type, name):
# type: (ResourceType, Text) -> _DirEntry
return _DirEntry(resource_type, name)
def _get_dir_entry(self, dir_path):
# type: (Text) -> Optional[_DirEntry]
"""Get a directory entry, or `None` if one doesn't exist."""
with self._lock:
dir_path = normpath(dir_path)
current_entry = self.root # type: Optional[_DirEntry]
for path_component in iteratepath(dir_path):
if current_entry is None:
return None
if not current_entry.is_dir:
return None
current_entry = current_entry.get_entry(path_component)
return current_entry
[docs] def close(self):
# type: () -> None
if not self._closed:
del self.root
return super(MemoryFS, self).close()
[docs] def getinfo(self, path, namespaces=None):
# type: (Text, Optional[Collection[Text]]) -> Info
_path = self.validatepath(path)
dir_entry = self._get_dir_entry(_path)
if dir_entry is None:
raise errors.ResourceNotFound(path)
return dir_entry.to_info(namespaces=namespaces)
[docs] def listdir(self, path):
# type: (Text) -> List[Text]
self.check()
_path = self.validatepath(path)
with self._lock:
# locate and validate the entry corresponding to the given path
dir_entry = self._get_dir_entry(_path)
if dir_entry is None:
raise errors.ResourceNotFound(path)
if not dir_entry.is_dir:
raise errors.DirectoryExpected(path)
# return the filenames in the order they were created
return dir_entry.list()
if typing.TYPE_CHECKING:
def opendir(self, path, factory=None):
# type: (_M, Text, Optional[_OpendirFactory]) -> SubFS[_M]
pass
[docs] def makedir(
self, # type: _M
path, # type: Text
permissions=None, # type: Optional[Permissions]
recreate=False, # type: bool
):
# type: (...) -> SubFS[_M]
_path = self.validatepath(path)
with self._lock:
if _path == "/":
if recreate:
return self.opendir(path)
else:
raise errors.DirectoryExists(path)
dir_path, dir_name = split(_path)
parent_dir = self._get_dir_entry(dir_path)
if parent_dir is None:
raise errors.ResourceNotFound(path)
dir_entry = parent_dir.get_entry(dir_name)
if dir_entry is not None and not recreate:
raise errors.DirectoryExists(path)
if dir_entry is None:
new_dir = self._make_dir_entry(ResourceType.directory, dir_name)
parent_dir.set_entry(dir_name, new_dir)
return self.opendir(path)
[docs] def move(self, src_path, dst_path, overwrite=False, preserve_time=False):
src_dir, src_name = split(self.validatepath(src_path))
dst_dir, dst_name = split(self.validatepath(dst_path))
with self._lock:
src_dir_entry = self._get_dir_entry(src_dir)
if src_dir_entry is None or src_name not in src_dir_entry:
raise errors.ResourceNotFound(src_path)
src_entry = src_dir_entry.get_entry(src_name)
if src_entry.is_dir:
raise errors.FileExpected(src_path)
dst_dir_entry = self._get_dir_entry(dst_dir)
if dst_dir_entry is None:
raise errors.ResourceNotFound(dst_path)
elif not overwrite and dst_name in dst_dir_entry:
raise errors.DestinationExists(dst_path)
# handle moving a file onto itself
if src_dir == dst_dir and src_name == dst_name:
if overwrite:
return
raise errors.DestinationExists(dst_path)
# move the entry from the src folder to the dst folder
dst_dir_entry.set_entry(dst_name, src_entry)
src_dir_entry.remove_entry(src_name)
# make sure to update the entry name itself (see #509)
src_entry.name = dst_name
if preserve_time:
copy_modified_time(self, src_path, self, dst_path)
[docs] def movedir(self, src_path, dst_path, create=False, preserve_time=False):
_src_path = self.validatepath(src_path)
_dst_path = self.validatepath(dst_path)
dst_dir, dst_name = split(_dst_path)
src_dir, src_name = split(_src_path)
# move a dir onto itself
if _src_path == _dst_path:
return
# move a dir into itself
if isbase(_src_path, _dst_path):
raise errors.IllegalDestination(dst_path)
with self._lock:
src_dir_entry = self._get_dir_entry(src_dir)
if src_dir_entry is None or src_name not in src_dir_entry:
raise errors.ResourceNotFound(src_path)
src_entry = src_dir_entry.get_entry(src_name)
if not src_entry.is_dir:
raise errors.DirectoryExpected(src_path)
# move the entry from the src folder to the dst folder
dst_dir_entry = self._get_dir_entry(dst_dir)
if dst_dir_entry is None or (not create and dst_name not in dst_dir_entry):
raise errors.ResourceNotFound(dst_path)
# move the entry from the src folder to the dst folder
dst_dir_entry.set_entry(dst_name, src_entry)
src_dir_entry.remove_entry(src_name)
# make sure to update the entry name itself (see #509)
src_entry.name = dst_name
if preserve_time:
copy_modified_time(self, src_path, self, dst_path)
[docs] def openbin(self, path, mode="r", buffering=-1, **options):
# type: (Text, Text, int, **Any) -> BinaryIO
_mode = Mode(mode)
_mode.validate_bin()
_path = self.validatepath(path)
dir_path, file_name = split(_path)
if not file_name:
raise errors.FileExpected(path)
with self._lock:
parent_dir_entry = self._get_dir_entry(dir_path)
if parent_dir_entry is None or not parent_dir_entry.is_dir:
raise errors.ResourceNotFound(path)
if _mode.create:
if file_name not in parent_dir_entry:
file_dir_entry = self._make_dir_entry(ResourceType.file, file_name)
parent_dir_entry.set_entry(file_name, file_dir_entry)
else:
file_dir_entry = self._get_dir_entry(_path) # type: ignore
if _mode.exclusive:
raise errors.FileExists(path)
if file_dir_entry.is_dir:
raise errors.FileExpected(path)
mem_file = _MemoryFile(
path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry
)
file_dir_entry.add_open_file(mem_file)
return mem_file # type: ignore
if file_name not in parent_dir_entry:
raise errors.ResourceNotFound(path)
file_dir_entry = parent_dir_entry.get_entry(file_name) # type: ignore
if file_dir_entry.is_dir:
raise errors.FileExpected(path)
mem_file = _MemoryFile(
path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry
)
file_dir_entry.add_open_file(mem_file)
return mem_file # type: ignore
[docs] def remove(self, path):
# type: (Text) -> None
_path = self.validatepath(path)
with self._lock:
dir_path, file_name = split(_path)
parent_dir_entry = self._get_dir_entry(dir_path)
if parent_dir_entry is None or file_name not in parent_dir_entry:
raise errors.ResourceNotFound(path)
file_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path))
if file_dir_entry.is_dir:
raise errors.FileExpected(path)
parent_dir_entry.remove_entry(file_name)
[docs] def removedir(self, path):
# type: (Text) -> None
# make sure we are not removing root
_path = self.validatepath(path)
if _path == "/":
raise errors.RemoveRootError()
# make sure the directory is empty
if not self.isempty(path):
raise errors.DirectoryNotEmpty(path)
# we can now delegate to removetree since we confirmed that
# * path exists (isempty)
# * path is a folder (isempty)
# * path is not root
self.removetree(_path)
[docs] def removetree(self, path):
# type: (Text) -> None
_path = self.validatepath(path)
with self._lock:
if _path == "/":
self.root.clear()
return
dir_path, file_name = split(_path)
parent_dir_entry = self._get_dir_entry(dir_path)
if parent_dir_entry is None or file_name not in parent_dir_entry:
raise errors.ResourceNotFound(path)
dir_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path))
if not dir_dir_entry.is_dir:
raise errors.DirectoryExpected(path)
parent_dir_entry.remove_entry(file_name)
[docs] def scandir(
self,
path, # type: Text
namespaces=None, # type: Optional[Collection[Text]]
page=None, # type: Optional[Tuple[int, int]]
):
# type: (...) -> Iterator[Info]
self.check()
_path = self.validatepath(path)
with self._lock:
# locate and validate the entry corresponding to the given path
dir_entry = self._get_dir_entry(_path)
if dir_entry is None:
raise errors.ResourceNotFound(path)
if not dir_entry.is_dir:
raise errors.DirectoryExpected(path)
# if paging was requested, slice the filenames
filenames = dir_entry.list()
if page is not None:
start, end = page
filenames = filenames[start:end]
# yield info with the right namespaces
for name in filenames:
entry = typing.cast(_DirEntry, dir_entry.get_entry(name))
yield entry.to_info(namespaces=namespaces)
[docs] def setinfo(self, path, info):
# type: (Text, RawInfo) -> None
_path = self.validatepath(path)
with self._lock:
dir_path, file_name = split(_path)
parent_dir_entry = self._get_dir_entry(dir_path)
if parent_dir_entry is None or file_name not in parent_dir_entry:
raise errors.ResourceNotFound(path)
resource_entry = typing.cast(
_DirEntry, parent_dir_entry.get_entry(file_name)
)
if "details" in info:
details = info["details"]
if "accessed" in details:
resource_entry.accessed_time = details["accessed"] # type: ignore
if "modified" in details:
resource_entry.modified_time = details["modified"] # type: ignore