"""Manage the filesystem provided by your OS.
In essence, an `OSFS` is a thin layer over the `io` and `os` modules
of the Python standard library.
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import errno
import io
import itertools
import logging
import os
import platform
import shutil
import stat
import sys
import tempfile
import typing
import six
try:
from os import scandir
except ImportError:
try:
from scandir import scandir # type: ignore
except ImportError: # pragma: no cover
scandir = None # pragma: no cover
try:
from os import sendfile
except ImportError:
try:
from sendfile import sendfile # type: ignore
except ImportError:
sendfile = None # pragma: no cover
from . import errors
from .errors import FileExists
from .base import FS
from .enums import ResourceType
from ._fscompat import fsencode, fsdecode, fspath
from .info import Info
from .path import basename, dirname
from .permissions import Permissions
from .error_tools import convert_os_errors
from .mode import Mode, validate_open_mode
from .errors import FileExpected, NoURL
if False: # typing.TYPE_CHECKING
from typing import (
Any,
BinaryIO,
Callable,
Collection,
Dict,
Iterator,
IO,
List,
Optional,
SupportsInt,
Text,
Tuple,
)
from .base import _OpendirFactory
from .info import RawInfo
from .subfs import SubFS
_O = typing.TypeVar("_O", bound="OSFS")
log = logging.getLogger("fs.osfs")
_WINDOWS_PLATFORM = platform.system() == "Windows"
[docs]@six.python_2_unicode_compatible
class OSFS(FS):
"""Create an OSFS.
Arguments:
root_path (str or ~os.PathLike): An OS path or path-like object to
the location on your HD you wish to manage.
create (bool): Set to `True` to create the root directory if it
does not already exist, otherwise the directory should exist
prior to creating the ``OSFS`` instance (defaults to `False`).
create_mode (int): The permissions that will be used to create
the directory if ``create`` is `True` and the path doesn't
exist, defaults to ``0o777``.
Raises:
`fs.errors.CreateFailed`: If ``root_path`` does not
exist, or could not be created.
Examples:
>>> current_directory_fs = OSFS('.')
>>> home_fs = OSFS('~/')
>>> windows_system32_fs = OSFS('c://system32')
"""
def __init__(
self,
root_path, # type: Text
create=False, # type: bool
create_mode=0o777, # type: SupportsInt
):
# type: (...) -> None
"""Create an OSFS instance.
"""
super(OSFS, self).__init__()
if isinstance(root_path, bytes):
root_path = fsdecode(root_path)
self.root_path = root_path
_drive, _root_path = os.path.splitdrive(fsdecode(fspath(root_path)))
_root_path = _drive + (_root_path or "/") if _drive else _root_path
_root_path = os.path.expanduser(os.path.expandvars(_root_path))
_root_path = os.path.normpath(os.path.abspath(_root_path))
self._root_path = _root_path
if create:
try:
if not os.path.isdir(_root_path):
os.makedirs(_root_path, mode=int(create_mode))
except OSError as error:
raise errors.CreateFailed(
"unable to create {} ({})".format(root_path, error), error
)
else:
if not os.path.isdir(_root_path):
raise errors.CreateFailed("root path does not exist")
_meta = self._meta = {
"network": False,
"read_only": False,
"supports_rename": True,
"thread_safe": True,
"unicode_paths": os.path.supports_unicode_filenames,
"virtual": False,
}
try:
# https://stackoverflow.com/questions/7870041/check-if-file-system-is-case-insensitive-in-python
# I don't know of a better way of detecting case insensitivity of a filesystem
with tempfile.NamedTemporaryFile(prefix="TmP") as _tmp_file:
_meta["case_insensitive"] = os.path.exists(_tmp_file.name.lower())
except Exception:
if platform.system() != "Darwin":
_meta["case_insensitive"] = os.path.normcase("Aa") == "aa"
if _WINDOWS_PLATFORM: # pragma: no cover
_meta["invalid_path_chars"] = (
"".join(six.unichr(n) for n in range(31)) + '\\:*?"<>|'
)
else:
_meta["invalid_path_chars"] = "\0"
if "PC_PATH_MAX" in os.pathconf_names:
try:
_meta["max_sys_path_length"] = os.pathconf(
fsencode(_root_path), os.pathconf_names["PC_PATH_MAX"]
)
except OSError: # pragma: no cover
# The above fails with nfs mounts on OSX. Go figure.
pass
def __repr__(self):
# type: () -> str
_fmt = "{}({!r})"
_class_name = self.__class__.__name__
return _fmt.format(_class_name, self.root_path)
def __str__(self):
# type: () -> str
fmt = "<{} '{}'>"
_class_name = self.__class__.__name__
return fmt.format(_class_name.lower(), self.root_path)
def _to_sys_path(self, path):
# type: (Text) -> Text
"""Convert a FS path to a path on the OS.
"""
sys_path = fsencode(
os.path.join(self._root_path, path.lstrip("/").replace("/", os.sep))
)
return sys_path
@classmethod
def _make_details_from_stat(cls, stat_result):
# type: (os.stat_result) -> Dict[Text, object]
"""Make a *details* info dict from an `os.stat_result` object.
"""
details = {
"_write": ["accessed", "modified"],
"accessed": stat_result.st_atime,
"modified": stat_result.st_mtime,
"size": stat_result.st_size,
"type": int(cls._get_type_from_stat(stat_result)),
}
# On other Unix systems (such as FreeBSD), the following
# attributes may be available (but may be only filled out if
# root tries to use them):
details["created"] = getattr(stat_result, "st_birthtime", None)
ctime_key = "created" if _WINDOWS_PLATFORM else "metadata_changed"
details[ctime_key] = stat_result.st_ctime
return details
@classmethod
def _make_access_from_stat(cls, stat_result):
# type: (os.stat_result) -> Dict[Text, object]
"""Make an *access* info dict from an `os.stat_result` object.
"""
access = {} # type: Dict[Text, object]
access["permissions"] = Permissions(mode=stat_result.st_mode).dump()
access["gid"] = gid = stat_result.st_gid
access["uid"] = uid = stat_result.st_uid
if not _WINDOWS_PLATFORM:
import grp
import pwd
try:
access["group"] = grp.getgrgid(gid).gr_name
except KeyError: # pragma: no cover
pass
try:
access["user"] = pwd.getpwuid(uid).pw_name
except KeyError: # pragma: no cover
pass
return access
STAT_TO_RESOURCE_TYPE = {
stat.S_IFDIR: ResourceType.directory,
stat.S_IFCHR: ResourceType.character,
stat.S_IFBLK: ResourceType.block_special_file,
stat.S_IFREG: ResourceType.file,
stat.S_IFIFO: ResourceType.fifo,
stat.S_IFLNK: ResourceType.symlink,
stat.S_IFSOCK: ResourceType.socket,
}
@classmethod
def _get_type_from_stat(cls, _stat):
# type: (os.stat_result) -> ResourceType
"""Get the resource type from an `os.stat_result` object.
"""
st_mode = _stat.st_mode
st_type = stat.S_IFMT(st_mode)
return cls.STAT_TO_RESOURCE_TYPE.get(st_type, ResourceType.unknown)
# --------------------------------------------------------
# Required Methods
# --------------------------------------------------------
def _gettarget(self, sys_path):
# type: (Text) -> Optional[Text]
if hasattr(os, "readlink"):
try:
if _WINDOWS_PLATFORM: # pragma: no cover
target = os.readlink(sys_path)
else:
target = os.readlink(fsencode(sys_path))
except OSError:
pass
else:
return target
return None
def _make_link_info(self, sys_path):
# type: (Text) -> Dict[Text, object]
_target = self._gettarget(sys_path)
return {"target": _target}
def getinfo(self, path, namespaces=None):
# type: (Text, Optional[Collection[Text]]) -> Info
self.check()
namespaces = namespaces or ()
_path = self.validatepath(path)
sys_path = self.getsyspath(_path)
_lstat = None
with convert_os_errors("getinfo", path):
_stat = os.stat(fsencode(sys_path))
if "lstat" in namespaces:
_lstat = os.lstat(fsencode(sys_path))
info = {
"basic": {"name": basename(_path), "is_dir": stat.S_ISDIR(_stat.st_mode)}
}
if "details" in namespaces:
info["details"] = self._make_details_from_stat(_stat)
if "stat" in namespaces:
info["stat"] = {
k: getattr(_stat, k) for k in dir(_stat) if k.startswith("st_")
}
if "lstat" in namespaces:
info["lstat"] = {
k: getattr(_lstat, k) for k in dir(_lstat) if k.startswith("st_")
}
if "link" in namespaces:
info["link"] = self._make_link_info(sys_path)
if "access" in namespaces:
info["access"] = self._make_access_from_stat(_stat)
return Info(info)
def listdir(self, path):
# type: (Text) -> List[Text]
self.check()
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
with convert_os_errors("listdir", path, directory=True):
names = os.listdir(fsencode(sys_path))
return [fsdecode(name) for name in names]
# return names
def makedir(
self, # type: _O
path, # type: Text
permissions=None, # type: Optional[Permissions]
recreate=False, # type: bool
):
# type: (...) -> SubFS[_O]
self.check()
mode = Permissions.get_mode(permissions)
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
with convert_os_errors("makedir", path, directory=True):
try:
os.mkdir(sys_path, mode)
except OSError as error:
if error.errno == errno.ENOENT:
raise errors.ResourceNotFound(path)
elif error.errno == errno.EEXIST and recreate:
pass
else:
raise
return self.opendir(_path)
def openbin(self, path, mode="r", buffering=-1, **options):
# type: (Text, Text, int, **Any) -> BinaryIO
_mode = Mode(mode)
_mode.validate_bin()
self.check()
_path = self.validatepath(path)
if _path == "/":
raise errors.FileExpected(path)
sys_path = self._to_sys_path(_path)
with convert_os_errors("openbin", path):
if six.PY2 and _mode.exclusive:
sys_path = os.open(sys_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
binary_file = io.open(
sys_path, mode=_mode.to_platform_bin(), buffering=buffering, **options
)
return binary_file # type: ignore
def remove(self, path):
# type: (Text) -> None
self.check()
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
with convert_os_errors("remove", path):
try:
os.remove(sys_path)
except OSError as error:
if error.errno == errno.EACCES and sys.platform == "win32":
# sometimes windows says this for attempts to remove a dir
if os.path.isdir(sys_path): # pragma: no cover
raise errors.FileExpected(path)
if error.errno == errno.EPERM and sys.platform == "darwin":
# sometimes OSX says this for attempts to remove a dir
if os.path.isdir(sys_path): # pragma: no cover
raise errors.FileExpected(path)
raise
def removedir(self, path):
# type: (Text) -> None
self.check()
_path = self.validatepath(path)
if _path == "/":
raise errors.RemoveRootError()
sys_path = self._to_sys_path(path)
with convert_os_errors("removedir", path, directory=True):
os.rmdir(sys_path)
# --------------------------------------------------------
# Optional Methods
# --------------------------------------------------------
# --- Type hint for opendir ------------------------------
if False: # typing.TYPE_CHECKING
def opendir(self, path, factory=None):
# type: (_O, Text, Optional[_OpendirFactory]) -> SubFS[_O]
pass
# --- Backport of os.sendfile for Python < 3.8 -----------
def _check_copy(self, src_path, dst_path, overwrite=False):
# validate individual paths
_src_path = self.validatepath(src_path)
_dst_path = self.validatepath(dst_path)
# check src_path exists and is a file
if self.gettype(src_path) is not ResourceType.file:
raise errors.FileExpected(src_path)
# check dst_path does not exist if we are not overwriting
if not overwrite and self.exists(_dst_path):
raise errors.DestinationExists(dst_path)
# check parent dir of _dst_path exists and is a directory
if self.gettype(dirname(dst_path)) is not ResourceType.directory:
raise errors.DirectoryExpected(dirname(dst_path))
return _src_path, _dst_path
if sys.version_info[:2] < (3, 8) and sendfile is not None:
_sendfile_error_codes = frozenset(
{
errno.EIO,
errno.EINVAL,
errno.ENOSYS,
errno.ENOTSUP, # type: ignore
errno.EBADF,
errno.ENOTSOCK,
errno.EOPNOTSUPP,
}
)
def copy(self, src_path, dst_path, overwrite=False):
# type: (Text, Text, bool) -> None
with self._lock:
# validate and canonicalise paths
_src_path, _dst_path = self._check_copy(src_path, dst_path, overwrite)
_src_sys, _dst_sys = (
self.getsyspath(_src_path),
self.getsyspath(_dst_path),
)
# attempt using sendfile
try:
# initialise variables to pass to sendfile
# open files to obtain a file descriptor
with io.open(_src_sys, "r") as src:
with io.open(_dst_sys, "w") as dst:
fd_src, fd_dst = src.fileno(), dst.fileno()
sent = maxsize = os.fstat(fd_src).st_size
offset = 0
while sent > 0:
sent = sendfile(fd_dst, fd_src, offset, maxsize)
offset += sent
except OSError as e:
# the error is not a simple "sendfile not supported" error
if e.errno not in self._sendfile_error_codes:
raise
# fallback using the shutil implementation
shutil.copy2(_src_sys, _dst_sys)
else:
def copy(self, src_path, dst_path, overwrite=False):
# type: (Text, Text, bool) -> None
with self._lock:
_src_path, _dst_path = self._check_copy(src_path, dst_path, overwrite)
shutil.copy2(self.getsyspath(_src_path), self.getsyspath(_dst_path))
# --- Backport of os.scandir for Python < 3.5 ------------
if scandir:
def _scandir(self, path, namespaces=None):
# type: (Text, Optional[Collection[Text]]) -> Iterator[Info]
self.check()
namespaces = namespaces or ()
_path = self.validatepath(path)
if _WINDOWS_PLATFORM:
sys_path = os.path.join(
self._root_path, path.lstrip("/").replace("/", os.sep)
)
else:
sys_path = self._to_sys_path(_path)
with convert_os_errors("scandir", path, directory=True):
for dir_entry in scandir(sys_path):
info = {
"basic": {
"name": fsdecode(dir_entry.name),
"is_dir": dir_entry.is_dir(),
}
}
if "details" in namespaces:
stat_result = dir_entry.stat()
info["details"] = self._make_details_from_stat(stat_result)
if "stat" in namespaces:
stat_result = dir_entry.stat()
info["stat"] = {
k: getattr(stat_result, k)
for k in dir(stat_result)
if k.startswith("st_")
}
if "lstat" in namespaces:
lstat_result = dir_entry.stat(follow_symlinks=False)
info["lstat"] = {
k: getattr(lstat_result, k)
for k in dir(lstat_result)
if k.startswith("st_")
}
if "link" in namespaces:
info["link"] = self._make_link_info(
os.path.join(sys_path, dir_entry.name)
)
if "access" in namespaces:
stat_result = dir_entry.stat()
info["access"] = self._make_access_from_stat(stat_result)
yield Info(info)
else:
def _scandir(self, path, namespaces=None):
# type: (Text, Optional[Collection[Text]]) -> Iterator[Info]
self.check()
namespaces = namespaces or ()
_path = self.validatepath(path)
sys_path = self.getsyspath(_path)
_sys_path = fsencode(sys_path)
with convert_os_errors("scandir", path, directory=True):
for entry_name in os.listdir(sys_path):
_entry_name = fsdecode(entry_name)
entry_path = os.path.join(sys_path, _entry_name)
stat_result = os.stat(fsencode(entry_path))
info = {
"basic": {
"name": _entry_name,
"is_dir": stat.S_ISDIR(stat_result.st_mode),
}
} # type: Dict[Text, Dict[Text, Any]]
if "details" in namespaces:
info["details"] = self._make_details_from_stat(stat_result)
if "stat" in namespaces:
info["stat"] = {
k: getattr(stat_result, k)
for k in dir(stat_result)
if k.startswith("st_")
}
if "lstat" in namespaces:
lstat_result = os.lstat(entry_path)
info["lstat"] = {
k: getattr(lstat_result, k)
for k in dir(lstat_result)
if k.startswith("st_")
}
if "link" in namespaces:
info["link"] = self._make_link_info(
os.path.join(sys_path, entry_name)
)
if "access" in namespaces:
info["access"] = self._make_access_from_stat(stat_result)
yield Info(info)
def scandir(
self,
path, # type: Text
namespaces=None, # type: Optional[Collection[Text]]
page=None, # type: Optional[Tuple[int, int]]
):
# type: (...) -> Iterator[Info]
iter_info = self._scandir(path, namespaces=namespaces)
if page is not None:
start, end = page
iter_info = itertools.islice(iter_info, start, end)
return iter_info
# --- Miscellaneous --------------------------------------
def getsyspath(self, path):
# type: (Text) -> Text
sys_path = os.path.join(self._root_path, path.lstrip("/").replace("/", os.sep))
return sys_path
def geturl(self, path, purpose="download"):
# type: (Text, Text) -> Text
if purpose != "download":
raise NoURL(path, purpose)
return "file://" + self.getsyspath(path)
def gettype(self, path):
# type: (Text) -> ResourceType
self.check()
sys_path = self._to_sys_path(path)
with convert_os_errors("gettype", path):
stat = os.stat(sys_path)
resource_type = self._get_type_from_stat(stat)
return resource_type
def islink(self, path):
# type: (Text) -> bool
self.check()
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
if not self.exists(path):
raise errors.ResourceNotFound(path)
with convert_os_errors("islink", path):
return os.path.islink(sys_path)
def open(
self,
path, # type: Text
mode="r", # type: Text
buffering=-1, # type: int
encoding=None, # type: Optional[Text]
errors=None, # type: Optional[Text]
newline="", # type: Text
line_buffering=False, # type: bool
**options # type: Any
):
# type: (...) -> IO
_mode = Mode(mode)
validate_open_mode(mode)
self.check()
_path = self.validatepath(path)
if _path == "/":
raise FileExpected(path)
sys_path = self._to_sys_path(_path)
with convert_os_errors("open", path):
if six.PY2 and _mode.exclusive:
sys_path = os.open(sys_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
_encoding = encoding or "utf-8"
return io.open(
sys_path,
mode=_mode.to_platform(),
buffering=buffering,
encoding=None if _mode.binary else _encoding,
errors=errors,
newline=None if _mode.binary else newline,
**options
)
def setinfo(self, path, info):
# type: (Text, RawInfo) -> None
self.check()
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
if not os.path.exists(sys_path):
raise errors.ResourceNotFound(path)
if "details" in info:
details = info["details"]
if "accessed" in details or "modified" in details:
_accessed = typing.cast(int, details.get("accessed"))
_modified = typing.cast(int, details.get("modified", _accessed))
accessed = int(_modified if _accessed is None else _accessed)
modified = int(_modified)
if accessed is not None or modified is not None:
with convert_os_errors("setinfo", path):
os.utime(sys_path, (accessed, modified))
[docs] def validatepath(self, path):
# type: (Text) -> Text
"""Check path may be encoded, in addition to usual checks."""
try:
fsencode(path)
except UnicodeEncodeError as error:
raise errors.InvalidCharsInPath(
path,
msg="path '{path}' could not be encoded for the filesystem (check LANG env var); {error}".format(
path=path, error=error
),
)
return super(OSFS, self).validatepath(path)