Source code for fs.copy

"""Functions for copying resources *between* filesystem.
"""

from __future__ import print_function, unicode_literals

import typing

import warnings

from .errors import ResourceNotFound
from .opener import manage_fs
from .path import abspath, combine, frombase, normpath
from .tools import is_thread_safe
from .walk import Walker

if typing.TYPE_CHECKING:
    from typing import Callable, Optional, Text, Union

    from .base import FS

    _OnCopy = Callable[[FS, Text, FS, Text], object]


[docs]def copy_fs( src_fs, # type: Union[FS, Text] dst_fs, # type: Union[FS, Text] walker=None, # type: Optional[Walker] on_copy=None, # type: Optional[_OnCopy] workers=0, # type: int preserve_time=False, # type: bool ): # type: (...) -> None """Copy the contents of one filesystem to another. Arguments: src_fs (FS or str): Source filesystem (URL or instance). dst_fs (FS or str): Destination filesystem (URL or instance). walker (~fs.walk.Walker, optional): A walker object that will be used to scan for files in ``src_fs``. Set this if you only want to consider a sub-set of the resources in ``src_fs``. on_copy (callable): A function callback called after a single file copy is executed. Expected signature is ``(src_fs, src_path, dst_fs, dst_path)``. workers (int): Use `worker` threads to copy data, or ``0`` (default) for a single-threaded copy. preserve_time (bool): If `True`, try to preserve mtime of the resources (defaults to `False`). """ return copy_fs_if( src_fs, dst_fs, "always", walker, on_copy, workers, preserve_time=preserve_time )
[docs]def copy_fs_if_newer( src_fs, # type: Union[FS, Text] dst_fs, # type: Union[FS, Text] walker=None, # type: Optional[Walker] on_copy=None, # type: Optional[_OnCopy] workers=0, # type: int preserve_time=False, # type: bool ): # type: (...) -> None """Copy the contents of one filesystem to another, checking times. .. deprecated:: 2.5.0 Use `~fs.copy.copy_fs_if` with ``condition="newer"`` instead. """ warnings.warn( "copy_fs_if_newer is deprecated. Use copy_fs_if instead.", DeprecationWarning ) return copy_fs_if( src_fs, dst_fs, "newer", walker, on_copy, workers, preserve_time=preserve_time )
[docs]def copy_fs_if( src_fs, # type: Union[FS, Text] dst_fs, # type: Union[FS, Text] condition="always", # type: Text walker=None, # type: Optional[Walker] on_copy=None, # type: Optional[_OnCopy] workers=0, # type: int preserve_time=False, # type: bool ): # type: (...) -> None """Copy the contents of one filesystem to another, depending on a condition. Arguments: src_fs (FS or str): Source filesystem (URL or instance). dst_fs (FS or str): Destination filesystem (URL or instance). condition (str): Name of the condition to check for each file. walker (~fs.walk.Walker, optional): A walker object that will be used to scan for files in ``src_fs``. Set this if you only want to consider a sub-set of the resources in ``src_fs``. on_copy (callable):A function callback called after a single file copy is executed. Expected signature is ``(src_fs, src_path, dst_fs, dst_path)``. workers (int): Use ``worker`` threads to copy data, or ``0`` (default) for a single-threaded copy. preserve_time (bool): If `True`, try to preserve mtime of the resources (defaults to `False`). See Also: `~fs.copy.copy_file_if` for the full list of supported values for the ``condition`` argument. """ return copy_dir_if( src_fs, "/", dst_fs, "/", condition, walker=walker, on_copy=on_copy, workers=workers, preserve_time=preserve_time, )
[docs]def copy_file( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text preserve_time=False, # type: bool ): # type: (...) -> None """Copy a file from one filesystem to another. If the destination exists, and is a file, it will be first truncated. Arguments: src_fs (FS or str): Source filesystem (instance or URL). src_path (str): Path to a file on the source filesystem. dst_fs (FS or str): Destination filesystem (instance or URL). dst_path (str): Path to a file on the destination filesystem. preserve_time (bool): If `True`, try to preserve mtime of the resource (defaults to `False`). """ copy_file_if( src_fs, src_path, dst_fs, dst_path, "always", preserve_time=preserve_time )
[docs]def copy_file_if_newer( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text preserve_time=False, # type: bool ): # type: (...) -> bool """Copy a file from one filesystem to another, checking times. .. deprecated:: 2.5.0 Use `~fs.copy.copy_file_if` with ``condition="newer"`` instead. """ warnings.warn( "copy_file_if_newer is deprecated. Use copy_file_if instead.", DeprecationWarning, ) return copy_file_if( src_fs, src_path, dst_fs, dst_path, "newer", preserve_time=preserve_time )
[docs]def copy_file_if( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text condition, # type: Text preserve_time=False, # type: bool ): # type: (...) -> bool """Copy a file from one filesystem to another, depending on a condition. Depending on the value of ``condition``, certain requirements must be fulfilled for a file to be copied to ``dst_fs``. The following values are supported: ``"always"`` The source file is always copied. ``"newer"`` The last modification time of the source file must be newer than that of the destination file. If either file has no modification time, the copy is performed always. ``"older"`` The last modification time of the source file must be older than that of the destination file. If either file has no modification time, the copy is performed always. ``"exists"`` The source file is only copied if a file of the same path already exists in ``dst_fs``. ``"not_exists"`` The source file is only copied if no file of the same path already exists in ``dst_fs``. Arguments: src_fs (FS or str): Source filesystem (instance or URL). src_path (str): Path to a file on the source filesystem. dst_fs (FS or str): Destination filesystem (instance or URL). dst_path (str): Path to a file on the destination filesystem. condition (str): Name of the condition to check for each file. preserve_time (bool): If `True`, try to preserve mtime of the resource (defaults to `False`). Returns: bool: `True` if the file copy was executed, `False` otherwise. """ with manage_fs(src_fs, writeable=False) as _src_fs: with manage_fs(dst_fs, create=True) as _dst_fs: do_copy = _copy_is_necessary( _src_fs, src_path, _dst_fs, dst_path, condition ) if do_copy: copy_file_internal( _src_fs, src_path, _dst_fs, dst_path, preserve_time=preserve_time, lock=True, ) return do_copy
[docs]def copy_file_internal( src_fs, # type: FS src_path, # type: Text dst_fs, # type: FS dst_path, # type: Text preserve_time=False, # type: bool lock=False, # type: bool ): # type: (...) -> None """Copy a file at low level, without calling `manage_fs` or locking. If the destination exists, and is a file, it will be first truncated. This method exists to optimize copying in loops. In general you should prefer `copy_file`. Arguments: src_fs (FS): Source filesystem. src_path (str): Path to a file on the source filesystem. dst_fs (FS): Destination filesystem. dst_path (str): Path to a file on the destination filesystem. preserve_time (bool): If `True`, try to preserve mtime of the resource (defaults to `False`). lock (bool): Lock both filesystems before copying. """ if src_fs is dst_fs: # Same filesystem, so we can do a potentially optimized # copy src_fs.copy(src_path, dst_path, overwrite=True, preserve_time=preserve_time) return def _copy_locked(): if dst_fs.hassyspath(dst_path): with dst_fs.openbin(dst_path, "w") as write_file: src_fs.download(src_path, write_file) else: with src_fs.openbin(src_path) as read_file: dst_fs.upload(dst_path, read_file) if preserve_time: copy_modified_time(src_fs, src_path, dst_fs, dst_path) if lock: with src_fs.lock(), dst_fs.lock(): _copy_locked() else: _copy_locked()
[docs]def copy_structure( src_fs, # type: Union[FS, Text] dst_fs, # type: Union[FS, Text] walker=None, # type: Optional[Walker] src_root="/", # type: Text dst_root="/", # type: Text ): # type: (...) -> None """Copy directories (but not files) from ``src_fs`` to ``dst_fs``. Arguments: src_fs (FS or str): Source filesystem (instance or URL). dst_fs (FS or str): Destination filesystem (instance or URL). walker (~fs.walk.Walker, optional): A walker object that will be used to scan for files in ``src_fs``. Set this if you only want to consider a sub-set of the resources in ``src_fs``. src_root (str): Path of the base directory to consider as the root of the tree structure to copy. dst_root (str): Path to the target root of the tree structure. """ walker = walker or Walker() with manage_fs(src_fs) as _src_fs: with manage_fs(dst_fs, create=True) as _dst_fs: with _src_fs.lock(), _dst_fs.lock(): _dst_fs.makedirs(dst_root, recreate=True) for dir_path in walker.dirs(_src_fs, src_root): _dst_fs.makedir( combine(dst_root, frombase(src_root, dir_path)), recreate=True )
[docs]def copy_dir( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text walker=None, # type: Optional[Walker] on_copy=None, # type: Optional[_OnCopy] workers=0, # type: int preserve_time=False, # type: bool ): # type: (...) -> None """Copy a directory from one filesystem to another. Arguments: src_fs (FS or str): Source filesystem (instance or URL). src_path (str): Path to a directory on the source filesystem. dst_fs (FS or str): Destination filesystem (instance or URL). dst_path (str): Path to a directory on the destination filesystem. walker (~fs.walk.Walker, optional): A walker object that will be used to scan for files in ``src_fs``. Set this if you only want to consider a sub-set of the resources in ``src_fs``. on_copy (callable, optional): A function callback called after a single file copy is executed. Expected signature is ``(src_fs, src_path, dst_fs, dst_path)``. workers (int): Use ``worker`` threads to copy data, or ``0`` (default) for a single-threaded copy. preserve_time (bool): If `True`, try to preserve mtime of the resources (defaults to `False`). """ copy_dir_if( src_fs, src_path, dst_fs, dst_path, "always", walker, on_copy, workers, preserve_time=preserve_time, )
[docs]def copy_dir_if_newer( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text walker=None, # type: Optional[Walker] on_copy=None, # type: Optional[_OnCopy] workers=0, # type: int preserve_time=False, # type: bool ): # type: (...) -> None """Copy a directory from one filesystem to another, checking times. .. deprecated:: 2.5.0 Use `~fs.copy.copy_dir_if` with ``condition="newer"`` instead. """ warnings.warn( "copy_dir_if_newer is deprecated. Use copy_dir_if instead.", DeprecationWarning ) copy_dir_if( src_fs, src_path, dst_fs, dst_path, "newer", walker, on_copy, workers, preserve_time=preserve_time, )
[docs]def copy_dir_if( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text condition, # type: Text walker=None, # type: Optional[Walker] on_copy=None, # type: Optional[_OnCopy] workers=0, # type: int preserve_time=False, # type: bool ): # type: (...) -> None """Copy a directory from one filesystem to another, depending on a condition. Arguments: src_fs (FS or str): Source filesystem (instance or URL). src_path (str): Path to a directory on the source filesystem. dst_fs (FS or str): Destination filesystem (instance or URL). dst_path (str): Path to a directory on the destination filesystem. condition (str): Name of the condition to check for each file. walker (~fs.walk.Walker, optional): A walker object that will be used to scan for files in ``src_fs``. Set this if you only want to consider a sub-set of the resources in ``src_fs``. on_copy (callable):A function callback called after a single file copy is executed. Expected signature is ``(src_fs, src_path, dst_fs, dst_path)``. workers (int): Use ``worker`` threads to copy data, or ``0`` (default) for a single-threaded copy. preserve_time (bool): If `True`, try to preserve mtime of the resources (defaults to `False`). See Also: `~fs.copy.copy_file_if` for the full list of supported values for the ``condition`` argument. """ on_copy = on_copy or (lambda *args: None) walker = walker or Walker() _src_path = abspath(normpath(src_path)) _dst_path = abspath(normpath(dst_path)) from ._bulk import Copier copy_structure(src_fs, dst_fs, walker, src_path, dst_path) with manage_fs(src_fs, writeable=False) as _src_fs, manage_fs( dst_fs, create=True ) as _dst_fs: with _src_fs.lock(), _dst_fs.lock(): _thread_safe = is_thread_safe(_src_fs, _dst_fs) with Copier( num_workers=workers if _thread_safe else 0, preserve_time=preserve_time ) as copier: for dir_path in walker.files(_src_fs, _src_path): copy_path = combine(_dst_path, frombase(_src_path, dir_path)) if _copy_is_necessary( _src_fs, dir_path, _dst_fs, copy_path, condition ): copier.copy(_src_fs, dir_path, _dst_fs, copy_path) on_copy(_src_fs, dir_path, _dst_fs, copy_path)
def _copy_is_necessary( src_fs, # type: FS src_path, # type: Text dst_fs, # type: FS dst_path, # type: Text condition, # type: Text ): # type: (...) -> bool if condition == "always": return True elif condition == "newer": try: src_modified = src_fs.getmodified(src_path) dst_modified = dst_fs.getmodified(dst_path) except ResourceNotFound: return True else: return ( src_modified is None or dst_modified is None or src_modified > dst_modified ) elif condition == "older": try: src_modified = src_fs.getmodified(src_path) dst_modified = dst_fs.getmodified(dst_path) except ResourceNotFound: return True else: return ( src_modified is None or dst_modified is None or src_modified < dst_modified ) elif condition == "exists": return dst_fs.exists(dst_path) elif condition == "not_exists": return not dst_fs.exists(dst_path) else: raise ValueError("{} is not a valid copy condition.".format(condition))
[docs]def copy_modified_time( src_fs, # type: Union[FS, Text] src_path, # type: Text dst_fs, # type: Union[FS, Text] dst_path, # type: Text ): # type: (...) -> None """Copy modified time metadata from one file to another. Arguments: src_fs (FS or str): Source filesystem (instance or URL). src_path (str): Path to a directory on the source filesystem. dst_fs (FS or str): Destination filesystem (instance or URL). dst_path (str): Path to a directory on the destination filesystem. """ namespaces = ("details",) with manage_fs(src_fs, writeable=False) as _src_fs: with manage_fs(dst_fs, create=True) as _dst_fs: src_meta = _src_fs.getinfo(src_path, namespaces) src_details = src_meta.raw.get("details", {}) dst_details = {} for value in ("metadata_changed", "modified"): if value in src_details: dst_details[value] = src_details[value] _dst_fs.setinfo(dst_path, {"details": dst_details})