Move supporting functions to utils.unpacking.

This commit is contained in:
Chris Hunt 2019-09-17 22:34:54 -04:00
parent 6a8b47d20b
commit 5656364112
3 changed files with 72 additions and 87 deletions

View File

@ -26,9 +26,10 @@ from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.models.target_python import TargetPython
from pip._internal.utils.filetypes import WHEEL_EXTENSION
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import SUPPORTED_EXTENSIONS, build_netloc
from pip._internal.utils.misc import build_netloc
from pip._internal.utils.packaging import check_requires_python
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
from pip._internal.utils.urls import url_to_path
from pip._internal.wheel import Wheel

View File

@ -10,7 +10,6 @@ import io
import logging
import os
import posixpath
import re
import shutil
import stat
import subprocess
@ -41,12 +40,6 @@ from pip._internal.utils.compat import (
stdlib_pkgs,
str_to_display,
)
from pip._internal.utils.filetypes import (
BZ2_EXTENSIONS,
TAR_EXTENSIONS,
XZ_EXTENSIONS,
ZIP_EXTENSIONS,
)
from pip._internal.utils.marker_files import write_delete_marker_file
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.virtualenv import (
@ -61,7 +54,7 @@ else:
if MYPY_CHECK_RUNNING:
from typing import (
Any, AnyStr, Container, Iterable, List, Mapping, Match, Optional, Text,
Any, AnyStr, Container, Iterable, List, Mapping, Optional, Text,
Tuple, Union, cast,
)
from pip._vendor.pkg_resources import Distribution
@ -79,13 +72,10 @@ else:
__all__ = ['rmtree', 'display_path', 'backup_dir',
'ask', 'splitext',
'format_size', 'is_installable_dir',
'is_svn_page', 'file_contents',
'split_leading_dir', 'has_leading_dir',
'normalize_path',
'renames', 'get_prog',
'call_subprocess',
'captured_stdout', 'ensure_dir',
'SUPPORTED_EXTENSIONS',
'get_installed_version', 'remove_auth_from_url']
@ -94,21 +84,6 @@ subprocess_logger = logging.getLogger('pip.subprocessor')
LOG_DIVIDER = '----------------------------------------'
SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
try:
import bz2 # noqa
SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
except ImportError:
logger.debug('bz2 module is not available')
try:
# Only for Python 3.3+
import lzma # noqa
SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
except ImportError:
logger.debug('lzma module is not available')
def get_pip_version():
# type: () -> str
@ -327,21 +302,6 @@ def is_installable_dir(path):
return False
def is_svn_page(html):
# type: (Union[str, Text]) -> Optional[Match[Union[str, Text]]]
"""
Returns true if the page appears to be the index page of an svn repository
"""
return (re.search(r'<title>[^<]*Revision \d+:', html) and
re.search(r'Powered by (?:<a[^>]*?>)?Subversion', html, re.I))
def file_contents(filename):
# type: (str) -> Text
with open(filename, 'rb') as fp:
return fp.read().decode('utf-8')
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
"""Yield pieces of data from a file-like object until EOF."""
while True:
@ -351,34 +311,6 @@ def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
yield chunk
def split_leading_dir(path):
# type: (Union[str, Text]) -> List[Union[str, Text]]
path = path.lstrip('/').lstrip('\\')
if '/' in path and (('\\' in path and path.find('/') < path.find('\\')) or
'\\' not in path):
return path.split('/', 1)
elif '\\' in path:
return path.split('\\', 1)
else:
return [path, '']
def has_leading_dir(paths):
# type: (Iterable[Union[str, Text]]) -> bool
"""Returns true if all the paths have the same leading path name
(i.e., everything is in one subdirectory in an archive)"""
common_prefix = None
for path in paths:
prefix, rest = split_leading_dir(path)
if not prefix:
return False
elif common_prefix is None:
common_prefix = prefix
elif prefix != common_prefix:
return False
return True
def normalize_path(path, resolve_symlinks=True):
# type: (str, bool) -> str
"""
@ -601,13 +533,6 @@ def dist_location(dist):
return normalize_path(dist.location)
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def make_command(*args):
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
"""

View File

@ -8,6 +8,7 @@ from __future__ import absolute_import
import logging
import os
import re
import shutil
import stat
import tarfile
@ -20,19 +21,11 @@ from pip._internal.utils.filetypes import (
XZ_EXTENSIONS,
ZIP_EXTENSIONS,
)
from pip._internal.utils.misc import (
current_umask,
ensure_dir,
file_contents,
has_leading_dir,
hide_url,
is_svn_page,
split_leading_dir,
)
from pip._internal.utils.misc import ensure_dir, hide_url
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Optional
from typing import Iterable, List, Optional, Match, Text, Union
from pip._internal.models.link import Link
@ -40,6 +33,72 @@ if MYPY_CHECK_RUNNING:
logger = logging.getLogger(__name__)
SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
try:
import bz2 # noqa
SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
except ImportError:
logger.debug('bz2 module is not available')
try:
# Only for Python 3.3+
import lzma # noqa
SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
except ImportError:
logger.debug('lzma module is not available')
def current_umask():
"""Get the current umask which involves having to set it temporarily."""
mask = os.umask(0)
os.umask(mask)
return mask
def file_contents(filename):
# type: (str) -> Text
with open(filename, 'rb') as fp:
return fp.read().decode('utf-8')
def is_svn_page(html):
# type: (Union[str, Text]) -> Optional[Match[Union[str, Text]]]
"""
Returns true if the page appears to be the index page of an svn repository
"""
return (re.search(r'<title>[^<]*Revision \d+:', html) and
re.search(r'Powered by (?:<a[^>]*?>)?Subversion', html, re.I))
def split_leading_dir(path):
# type: (Union[str, Text]) -> List[Union[str, Text]]
path = path.lstrip('/').lstrip('\\')
if '/' in path and (('\\' in path and path.find('/') < path.find('\\')) or
'\\' not in path):
return path.split('/', 1)
elif '\\' in path:
return path.split('\\', 1)
else:
return [path, '']
def has_leading_dir(paths):
# type: (Iterable[Union[str, Text]]) -> bool
"""Returns true if all the paths have the same leading path name
(i.e., everything is in one subdirectory in an archive)"""
common_prefix = None
for path in paths:
prefix, rest = split_leading_dir(path)
if not prefix:
return False
elif common_prefix is None:
common_prefix = prefix
elif prefix != common_prefix:
return False
return True
def unzip_file(filename, location, flatten=True):
# type: (str, str, bool) -> None
"""