Source code for fs.compress

"""Functions to compress the contents of a filesystem.

Currently zip and tar are supported, using the `zipfile` and
`tarfile` modules from the standard library.
"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals

import time
import tarfile
import typing
import zipfile
from datetime import datetime

import six

from .enums import ResourceType
from .path import relpath
from .time import datetime_to_epoch
from .errors import NoSysPath, MissingInfoNamespace
from .walk import Walker

if False:  # typing.TYPE_CHECKING
    from typing import BinaryIO, Optional, Text, Tuple, Type, Union
    from .base import FS

    ZipTime = Tuple[int, int, int, int, int, int]


[docs]def write_zip( src_fs, # type: FS file, # type: Union[Text, BinaryIO] compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text walker=None, # type: Optional[Walker] ): # type: (...) -> None """Write the contents of a filesystem to a zip file. Arguments: src_fs (~fs.base.FS): The source filesystem to compress. file (str or io.IOBase): Destination file, may be a file name or an open file object. compression (int): Compression to use (one of the constants defined in the `zipfile` module in the stdlib). Defaults to `zipfile.ZIP_DEFLATED`. encoding (str): The encoding to use for filenames. The default is ``"utf-8"``, use ``"CP437"`` if compatibility with WinZip is desired. walker (~fs.walk.Walker, optional): A `Walker` instance, or `None` to use default walker. You can use this to specify which files you want to compress. """ _zip = zipfile.ZipFile(file, mode="w", compression=compression, allowZip64=True) walker = walker or Walker() with _zip: gen_walk = walker.info(src_fs, namespaces=["details", "stat", "access"]) for path, info in gen_walk: # Zip names must be relative, directory names must end # with a slash. zip_name = relpath(path + "/" if info.is_dir else path) if not six.PY3: # Python2 expects bytes filenames zip_name = zip_name.encode(encoding, "replace") if info.has_namespace("stat"): # If the file has a stat namespace, get the # zip time directory from the stat structure st_mtime = info.get("stat", "st_mtime", None) _mtime = time.localtime(st_mtime) zip_time = _mtime[0:6] # type: ZipTime else: # Otherwise, use the modified time from details # namespace. mt = info.modified or datetime.utcnow() zip_time = (mt.year, mt.month, mt.day, mt.hour, mt.minute, mt.second) # NOTE(@althonos): typeshed's `zipfile.py` on declares # ZipInfo.__init__ for Python < 3 ?! zip_info = zipfile.ZipInfo(zip_name, zip_time) # type: ignore try: if info.permissions is not None: zip_info.external_attr = info.permissions.mode << 16 except MissingInfoNamespace: pass if info.is_dir: zip_info.external_attr |= 0x10 # This is how to record directories with zipfile _zip.writestr(zip_info, b"") else: # Get a syspath if possible try: sys_path = src_fs.getsyspath(path) except NoSysPath: # Write from bytes _zip.writestr(zip_info, src_fs.readbytes(path)) else: # Write from a file which is (presumably) # more memory efficient _zip.write(sys_path, zip_name)
[docs]def write_tar( src_fs, # type: FS file, # type: Union[Text, BinaryIO] compression=None, # type: Optional[Text] encoding="utf-8", # type: Text walker=None, # type: Optional[Walker] ): # type: (...) -> None """Write the contents of a filesystem to a tar file. Arguments: file (str or io.IOBase): Destination file, may be a file name or an open file object. compression (str, optional): Compression to use, or `None` for a plain Tar archive without compression. encoding(str): The encoding to use for filenames. The default is ``"utf-8"``. walker (~fs.walk.Walker, optional): A `Walker` instance, or `None` to use default walker. You can use this to specify which files you want to compress. """ type_map = { ResourceType.block_special_file: tarfile.BLKTYPE, ResourceType.character: tarfile.CHRTYPE, ResourceType.directory: tarfile.DIRTYPE, ResourceType.fifo: tarfile.FIFOTYPE, ResourceType.file: tarfile.REGTYPE, ResourceType.socket: tarfile.AREGTYPE, # no type for socket ResourceType.symlink: tarfile.SYMTYPE, ResourceType.unknown: tarfile.AREGTYPE, # no type for unknown } tar_attr = [("uid", "uid"), ("gid", "gid"), ("uname", "user"), ("gname", "group")] mode = "w:{}".format(compression or "") if isinstance(file, (six.text_type, six.binary_type)): _tar = tarfile.open(file, mode=mode) else: _tar = tarfile.open(fileobj=file, mode=mode) current_time = time.time() walker = walker or Walker() with _tar: gen_walk = walker.info(src_fs, namespaces=["details", "stat", "access"]) for path, info in gen_walk: # Tar names must be relative tar_name = relpath(path) if not six.PY3: # Python2 expects bytes filenames tar_name = tar_name.encode(encoding, "replace") tar_info = tarfile.TarInfo(tar_name) if info.has_namespace("stat"): mtime = info.get("stat", "st_mtime", current_time) else: mtime = info.modified or current_time if isinstance(mtime, datetime): mtime = datetime_to_epoch(mtime) if isinstance(mtime, float): mtime = int(mtime) tar_info.mtime = mtime for tarattr, infoattr in tar_attr: if getattr(info, infoattr, None) is not None: setattr(tar_info, tarattr, getattr(info, infoattr, None)) if info.has_namespace("access"): tar_info.mode = getattr(info.permissions, "mode", 0o420) if info.is_dir: tar_info.type = tarfile.DIRTYPE _tar.addfile(tar_info) else: tar_info.type = type_map.get(info.type, tarfile.REGTYPE) tar_info.size = info.size with src_fs.openbin(path) as bin_file: _tar.addfile(tar_info, bin_file)