Source code for fs.zipfs

"""Manage the filesystem in a Zip archive.
"""

from __future__ import print_function, unicode_literals

import sys
import typing

import six
import zipfile
from datetime import datetime

from . import errors
from ._url_tools import url_quote
from .base import FS
from .compress import write_zip
from .enums import ResourceType, Seek
from .info import Info
from .iotools import RawWrapper
from .memoryfs import MemoryFS
from .opener import open_fs
from .path import dirname, forcedir, normpath, relpath
from .permissions import Permissions
from .time import datetime_to_epoch
from .wrapfs import WrapFS

if typing.TYPE_CHECKING:
    from typing import (
        Any,
        BinaryIO,
        Collection,
        Dict,
        List,
        Optional,
        SupportsInt,
        Text,
        Tuple,
        Union,
    )

    from .info import RawInfo
    from .subfs import SubFS

    R = typing.TypeVar("R", bound="ReadZipFS")


class _ZipExtFile(RawWrapper):
    def __init__(self, fs, name):  # noqa: D107
        # type: (ReadZipFS, Text) -> None
        self._zip = _zip = fs._zip
        self._end = _zip.getinfo(name).file_size
        self._pos = 0
        super(_ZipExtFile, self).__init__(_zip.open(name), "r", name)

    # NOTE(@althonos): Starting from Python 3.7, files inside a Zip archive are
    #                  seekable provided they were opened from a seekable file
    #                  handle. Before that, we can emulate a seek using the
    #                  read method, although it adds a ton of overhead and is
    #                  way less efficient than extracting once to a BytesIO.
    if sys.version_info < (3, 7):

        def read(self, size=-1):
            # type: (int) -> bytes
            buf = self._f.read(-1 if size is None else size)
            self._pos += len(buf)
            return buf

        def read1(self, size=-1):
            # type: (int) -> bytes
            buf = self._f.read1(-1 if size is None else size)  # type: ignore
            self._pos += len(buf)
            return buf

        def tell(self):
            # type: () -> int
            return self._pos

        def seekable(self):
            return True

        def seek(self, offset, whence=Seek.set):
            # type: (int, SupportsInt) -> int
            """Change stream position.

            Change the stream position to the given byte offset. The
            offset is interpreted relative to the position indicated by
            ``whence``.

            Arguments:
                offset (int): the offset to the new position, in bytes.
                whence (int): the position reference. Possible values are:
                    * `Seek.set`: start of stream (the default).
                    * `Seek.current`: current position; offset may be negative.
                    * `Seek.end`: end of stream; offset must be negative.

            Returns:
                int: the new absolute position.

            Raises:
                ValueError: when ``whence`` is not known, or ``offset``
                    is invalid.

            Note:
                Zip compression does not support seeking, so the seeking
                is emulated. Seeking somewhere else than the current position
                will need to either:
                    * reopen the file and restart decompression
                    * read and discard data to advance in the file

            """
            _whence = int(whence)
            if _whence == Seek.current:
                offset += self._pos
            if _whence == Seek.current or _whence == Seek.set:
                if offset < 0:
                    raise ValueError("Negative seek position {}".format(offset))
            elif _whence == Seek.end:
                if offset > 0:
                    raise ValueError("Positive seek position {}".format(offset))
                offset += self._end
            else:
                raise ValueError(
                    "Invalid whence ({}, should be {}, {} or {})".format(
                        _whence, Seek.set, Seek.current, Seek.end
                    )
                )

            if offset < self._pos:
                self._f = self._zip.open(self.name)  # type: ignore
                self._pos = 0
            self.read(offset - self._pos)
            return self._pos

    else:

        def seek(self, offset, whence=Seek.set):
            # type: (int, SupportsInt) -> int
            """Change stream position.

            Change the stream position to the given byte offset. The
            offset is interpreted relative to the position indicated by
            ``whence``.

            Arguments:
                offset (int): the offset to the new position, in bytes.
                whence (int): the position reference. Possible values are:
                    * `Seek.set`: start of stream (the default).
                    * `Seek.current`: current position; offset may be negative.
                    * `Seek.end`: end of stream; offset must be negative.

            Returns:
                int: the new absolute position.

            Raises:
                ValueError: when ``whence`` is not known, or ``offset``
                    is invalid.

            """
            _whence = int(whence)
            _pos = self.tell()
            if _whence == Seek.set:
                if offset < 0:
                    raise ValueError("Negative seek position {}".format(offset))
            elif _whence == Seek.current:
                if _pos + offset < 0:
                    raise ValueError("Negative seek position {}".format(offset))
            elif _whence == Seek.end:
                if offset > 0:
                    raise ValueError("Positive seek position {}".format(offset))
            else:
                raise ValueError(
                    "Invalid whence ({}, should be {}, {} or {})".format(
                        _whence, Seek.set, Seek.current, Seek.end
                    )
                )

            return self._f.seek(offset, _whence)


[docs]class ZipFS(WrapFS): """Read and write zip files. There are two ways to open a `ZipFS` for the use cases of reading a zip file, and creating a new one. If you open the `ZipFS` with ``write`` set to `False` (the default) then the filesystem will be a read-only filesystem which maps to the files and directories within the zip file. Files are decompressed on the fly when you open them. Here's how you might extract and print a readme from a zip file:: with ZipFS('foo.zip') as zip_fs: readme = zip_fs.readtext('readme.txt') If you open the `ZipFS` with ``write`` set to `True`, then the `ZipFS` will be an empty temporary filesystem. Any files / directories you create in the `ZipFS` will be written in to a zip file when the `ZipFS` is closed. Here's how you might write a new zip file containing a ``readme.txt`` file:: with ZipFS('foo.zip', write=True) as new_zip: new_zip.writetext( 'readme.txt', 'This zip file was written by PyFilesystem' ) Arguments: file (str or io.IOBase): An OS filename, or an open file object. write (bool): Set to `True` to write a new zip file, or `False` (default) to read an existing zip file. compression (int): Compression to use (one of the constants defined in the `zipfile` module in the stdlib). temp_fs (str or FS): An FS URL or an FS instance to use to store data prior to zipping. Defaults to creating a new `~fs.tempfs.TempFS`. """ # TODO: __new__ returning different types may be too 'magical' def __new__( # type: ignore cls, file, # type: Union[Text, BinaryIO] write=False, # type: bool compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Union[Text, FS] ): # type: (...) -> FS # This magic returns a different class instance based on the # value of the ``write`` parameter. if write: return WriteZipFS( file, compression=compression, encoding=encoding, temp_fs=temp_fs ) else: return ReadZipFS(file, encoding=encoding) if typing.TYPE_CHECKING: def __init__( self, file, # type: Union[Text, BinaryIO] write=False, # type: bool compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text ): # noqa: D107 # type: (...) -> None pass
[docs]@six.python_2_unicode_compatible class WriteZipFS(WrapFS): """A writable zip file."""
[docs] def __init__( self, file, # type: Union[Text, BinaryIO] compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Union[Text, FS] ): # noqa: D107 # type: (...) -> None self._file = file self.compression = compression self.encoding = encoding self._temp_fs_url = temp_fs self._temp_fs = open_fs(temp_fs) self._meta = dict(self._temp_fs.getmeta()) # type: ignore super(WriteZipFS, self).__init__(self._temp_fs)
def __repr__(self): # type: () -> Text t = "WriteZipFS({!r}, compression={!r}, encoding={!r}, temp_fs={!r})" return t.format(self._file, self.compression, self.encoding, self._temp_fs_url) def __str__(self): # type: () -> Text return "<zipfs-write '{}'>".format(self._file)
[docs] def delegate_path(self, path): # type: (Text) -> Tuple[FS, Text] return self._temp_fs, path
[docs] def delegate_fs(self): # type: () -> FS return self._temp_fs
[docs] def close(self): # type: () -> None if not self.isclosed(): try: self.write_zip() finally: self._temp_fs.close() super(WriteZipFS, self).close()
[docs] def write_zip( self, file=None, # type: Union[Text, BinaryIO, None] compression=None, # type: Optional[int] encoding=None, # type: Optional[Text] ): # type: (...) -> None """Write zip to a file. Arguments: file (str or io.IOBase, optional): Destination file, may be a file name or an open file handle. compression (int, optional): Compression to use (one of the constants defined in the `zipfile` module in the stdlib). encoding (str, optional): The character encoding to use (default uses the encoding defined in `~WriteZipFS.__init__`). Note: This is called automatically when the ZipFS is closed. """ if not self.isclosed(): write_zip( self._temp_fs, file or self._file, compression=compression or self.compression, encoding=encoding or self.encoding, )
[docs]@six.python_2_unicode_compatible class ReadZipFS(FS): """A readable zip file.""" _meta = { "case_insensitive": False, "network": False, "read_only": True, "supports_rename": False, "thread_safe": True, "unicode_paths": True, "virtual": False, }
[docs] @errors.CreateFailed.catch_all def __init__(self, file, encoding="utf-8"): # noqa: D107 # type: (Union[BinaryIO, Text], Text) -> None super(ReadZipFS, self).__init__() self._file = file self.encoding = encoding self._zip = zipfile.ZipFile(file, "r") self._directory_fs = None # type: Optional[MemoryFS]
def __repr__(self): # type: () -> Text return "ReadZipFS({!r})".format(self._file) def __str__(self): # type: () -> Text return "<zipfs '{}'>".format(self._file) def _path_to_zip_name(self, path): # type: (Text) -> str """Convert a path to a zip file name.""" path = relpath(normpath(path)) if self._directory.isdir(path): path = forcedir(path) if six.PY2: return path.encode(self.encoding) return path @property def _directory(self): # type: () -> MemoryFS """`MemoryFS`: a filesystem with the same folder hierarchy as the zip.""" self.check() with self._lock: if self._directory_fs is None: self._directory_fs = _fs = MemoryFS() for zip_name in self._zip.namelist(): resource_name = zip_name if six.PY2: resource_name = resource_name.decode(self.encoding, "replace") if resource_name.endswith("/"): _fs.makedirs(resource_name, recreate=True) else: _fs.makedirs(dirname(resource_name), recreate=True) _fs.create(resource_name) return self._directory_fs
[docs] def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info _path = self.validatepath(path) namespaces = namespaces or () raw_info = {} # type: Dict[Text, Dict[Text, object]] if _path == "/": raw_info["basic"] = {"name": "", "is_dir": True} if "details" in namespaces: raw_info["details"] = {"type": int(ResourceType.directory)} else: basic_info = self._directory.getinfo(_path) raw_info["basic"] = {"name": basic_info.name, "is_dir": basic_info.is_dir} if not {"details", "access", "zip"}.isdisjoint(namespaces): zip_name = self._path_to_zip_name(path) try: zip_info = self._zip.getinfo(zip_name) except KeyError: # Can occur if there is an implied directory in the zip pass else: if "details" in namespaces: raw_info["details"] = { "size": zip_info.file_size, "type": int( ResourceType.directory if basic_info.is_dir else ResourceType.file ), "modified": datetime_to_epoch( datetime(*zip_info.date_time) ), } if "zip" in namespaces: raw_info["zip"] = { k: getattr(zip_info, k) for k in zip_info.__slots__ # type: ignore if not k.startswith("_") } if "access" in namespaces: # check the zip was created on UNIX to get permissions if zip_info.external_attr and zip_info.create_system == 3: raw_info["access"] = { "permissions": Permissions( mode=zip_info.external_attr >> 16 & 0xFFF ).dump() } return Info(raw_info)
[docs] def setinfo(self, path, info): # type: (Text, RawInfo) -> None self.check() raise errors.ResourceReadOnly(path)
[docs] def listdir(self, path): # type: (Text) -> List[Text] self.check() return self._directory.listdir(path)
[docs] def makedir( self, # type: R path, # type: Text permissions=None, # type: Optional[Permissions] recreate=False, # type: bool ): # type: (...) -> SubFS[R] self.check() raise errors.ResourceReadOnly(path)
[docs] def openbin(self, path, mode="r", buffering=-1, **kwargs): # type: (Text, Text, int, **Any) -> BinaryIO self.check() if "w" in mode or "+" in mode or "a" in mode: raise errors.ResourceReadOnly(path) if not self._directory.exists(path): raise errors.ResourceNotFound(path) elif self._directory.isdir(path): raise errors.FileExpected(path) zip_name = self._path_to_zip_name(path) return _ZipExtFile(self, zip_name) # type: ignore
[docs] def remove(self, path): # type: (Text) -> None self.check() raise errors.ResourceReadOnly(path)
[docs] def removedir(self, path): # type: (Text) -> None self.check() raise errors.ResourceReadOnly(path)
[docs] def close(self): # type: () -> None super(ReadZipFS, self).close() if hasattr(self, "_zip"): self._zip.close()
[docs] def readbytes(self, path): # type: (Text) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) zip_name = self._path_to_zip_name(path) zip_bytes = self._zip.read(zip_name) return zip_bytes
[docs] def geturl(self, path, purpose="download"): # type: (Text, Text) -> Text if purpose == "fs" and isinstance(self._file, six.string_types): quoted_file = url_quote(self._file) quoted_path = url_quote(path) return "zip://{}!/{}".format(quoted_file, quoted_path) else: raise errors.NoURL(path, purpose)