"""Utilities related archives. """ # The following comment should be removed at some point in the future. # mypy: strict-optional=False # mypy: disallow-untyped-defs=False from __future__ import absolute_import import logging import os import shutil import stat import tarfile import zipfile from pip._internal.exceptions import InstallationError from pip._internal.utils.filetypes import ( BZ2_EXTENSIONS, TAR_EXTENSIONS, XZ_EXTENSIONS, ZIP_EXTENSIONS, ) from pip._internal.utils.misc import ensure_dir from pip._internal.utils.typing import MYPY_CHECK_RUNNING if MYPY_CHECK_RUNNING: from typing import Iterable, List, Optional, Text, Union logger = logging.getLogger(__name__) SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS try: import bz2 # noqa SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS except ImportError: logger.debug('bz2 module is not available') try: # Only for Python 3.3+ import lzma # noqa SUPPORTED_EXTENSIONS += XZ_EXTENSIONS except ImportError: logger.debug('lzma module is not available') def current_umask(): """Get the current umask which involves having to set it temporarily.""" mask = os.umask(0) os.umask(mask) return mask def split_leading_dir(path): # type: (Union[str, Text]) -> List[Union[str, Text]] path = path.lstrip('/').lstrip('\\') if ( '/' in path and ( ('\\' in path and path.find('/') < path.find('\\')) or '\\' not in path ) ): return path.split('/', 1) elif '\\' in path: return path.split('\\', 1) else: return [path, ''] def has_leading_dir(paths): # type: (Iterable[Union[str, Text]]) -> bool """Returns true if all the paths have the same leading path name (i.e., everything is in one subdirectory in an archive)""" common_prefix = None for path in paths: prefix, rest = split_leading_dir(path) if not prefix: return False elif common_prefix is None: common_prefix = prefix elif prefix != common_prefix: return False return True def is_within_directory(directory, target): # type: ((Union[str, Text]), (Union[str, Text])) -> bool """ Return true if the absolute path of target is within the directory """ abs_directory = os.path.abspath(directory) abs_target = os.path.abspath(target) prefix = os.path.commonprefix([abs_directory, abs_target]) return prefix == abs_directory def unzip_file(filename, location, flatten=True): # type: (str, str, bool) -> None """ Unzip the file (with path `filename`) to the destination `location`. All files are written based on system defaults and umask (i.e. permissions are not preserved), except that regular file members with any execute permissions (user, group, or world) have "chmod +x" applied after being written. Note that for windows, any execute changes using os.chmod are no-ops per the python docs. """ ensure_dir(location) zipfp = open(filename, 'rb') try: zip = zipfile.ZipFile(zipfp, allowZip64=True) leading = has_leading_dir(zip.namelist()) and flatten for info in zip.infolist(): name = info.filename fn = name if leading: fn = split_leading_dir(name)[1] fn = os.path.join(location, fn) dir = os.path.dirname(fn) if not is_within_directory(location, fn): message = ( 'The zip file ({}) has a file ({}) trying to install ' 'outside target directory ({})' ) raise InstallationError(message.format(filename, fn, location)) if fn.endswith('/') or fn.endswith('\\'): # A directory ensure_dir(fn) else: ensure_dir(dir) # Don't use read() to avoid allocating an arbitrarily large # chunk of memory for the file's content fp = zip.open(name) try: with open(fn, 'wb') as destfp: shutil.copyfileobj(fp, destfp) finally: fp.close() mode = info.external_attr >> 16 # if mode and regular file and any execute permissions for # user/group/world? if mode and stat.S_ISREG(mode) and mode & 0o111: # make dest file have execute for user/group/world # (chmod +x) no-op on windows per python docs os.chmod(fn, (0o777 - current_umask() | 0o111)) finally: zipfp.close() def untar_file(filename, location): # type: (str, str) -> None """ Untar the file (with path `filename`) to the destination `location`. All files are written based on system defaults and umask (i.e. permissions are not preserved), except that regular file members with any execute permissions (user, group, or world) have "chmod +x" applied after being written. Note that for windows, any execute changes using os.chmod are no-ops per the python docs. """ ensure_dir(location) if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'): mode = 'r:gz' elif filename.lower().endswith(BZ2_EXTENSIONS): mode = 'r:bz2' elif filename.lower().endswith(XZ_EXTENSIONS): mode = 'r:xz' elif filename.lower().endswith('.tar'): mode = 'r' else: logger.warning( 'Cannot determine compression type for file %s', filename, ) mode = 'r:*' tar = tarfile.open(filename, mode) try: leading = has_leading_dir([ member.name for member in tar.getmembers() ]) for member in tar.getmembers(): fn = member.name if leading: # https://github.com/python/mypy/issues/1174 fn = split_leading_dir(fn)[1] # type: ignore path = os.path.join(location, fn) if not is_within_directory(location, path): message = ( 'The tar file ({}) has a file ({}) trying to install ' 'outside target directory ({})' ) raise InstallationError( message.format(filename, path, location) ) if member.isdir(): ensure_dir(path) elif member.issym(): try: # https://github.com/python/typeshed/issues/2673 tar._extract_member(member, path) # type: ignore except Exception as exc: # Some corrupt tar files seem to produce this # (specifically bad symlinks) logger.warning( 'In the tar file %s the member %s is invalid: %s', filename, member.name, exc, ) continue else: try: fp = tar.extractfile(member) except (KeyError, AttributeError) as exc: # Some corrupt tar files seem to produce this # (specifically bad symlinks) logger.warning( 'In the tar file %s the member %s is invalid: %s', filename, member.name, exc, ) continue ensure_dir(os.path.dirname(path)) with open(path, 'wb') as destfp: shutil.copyfileobj(fp, destfp) fp.close() # Update the timestamp (useful for cython compiled files) # https://github.com/python/typeshed/issues/2673 tar.utime(member, path) # type: ignore # member have any execute permissions for user/group/world? if member.mode & 0o111: # make dest file have execute for user/group/world # no-op on windows per python docs os.chmod(path, (0o777 - current_umask() | 0o111)) finally: tar.close() def unpack_file( filename, # type: str location, # type: str content_type=None, # type: Optional[str] ): # type: (...) -> None filename = os.path.realpath(filename) if ( content_type == 'application/zip' or filename.lower().endswith(ZIP_EXTENSIONS) or zipfile.is_zipfile(filename) ): unzip_file( filename, location, flatten=not filename.endswith('.whl') ) elif ( content_type == 'application/x-gzip' or tarfile.is_tarfile(filename) or filename.lower().endswith( TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS ) ): untar_file(filename, location) else: # FIXME: handle? # FIXME: magic signatures? logger.critical( 'Cannot unpack file %s (downloaded from %s, content-type: %s); ' 'cannot detect archive format', filename, location, content_type, ) raise InstallationError( 'Cannot determine archive format of {}'.format(location) )