convert type comments

This commit is contained in:
harupy 2021-07-23 23:29:29 +09:00
parent 2de3af1747
commit 93db97c383
10 changed files with 236 additions and 281 deletions

View File

@ -10,8 +10,7 @@ from pip._internal.utils.subprocess import runner_with_spinner_message
from pip._internal.utils.temp_dir import TempDirectory
def generate_metadata(build_env, backend):
# type: (BuildEnvironment, Pep517HookCaller) -> str
def generate_metadata(build_env: BuildEnvironment, backend: Pep517HookCaller) -> str:
"""Generate metadata using mechanisms described in PEP 517.
Returns the generated metadata directory.

View File

@ -13,8 +13,7 @@ from pip._internal.utils.temp_dir import TempDirectory
logger = logging.getLogger(__name__)
def _find_egg_info(directory):
# type: (str) -> str
def _find_egg_info(directory: str) -> str:
"""Find an .egg-info subdirectory in `directory`.
"""
filenames = [
@ -37,13 +36,12 @@ def _find_egg_info(directory):
def generate_metadata(
build_env, # type: BuildEnvironment
setup_py_path, # type: str
source_dir, # type: str
isolated, # type: bool
details, # type: str
):
# type: (...) -> str
build_env: BuildEnvironment,
setup_py_path: str,
source_dir: str,
isolated: bool,
details: str,
) -> str:
"""Generate metadata using setup.py-based defacto mechanisms.
Returns the generated metadata directory.

View File

@ -10,12 +10,11 @@ logger = logging.getLogger(__name__)
def build_wheel_pep517(
name, # type: str
backend, # type: Pep517HookCaller
metadata_directory, # type: str
tempd, # type: str
):
# type: (...) -> Optional[str]
name: str,
backend: Pep517HookCaller,
metadata_directory: str,
tempd: str,
) -> Optional[str]:
"""Build one InstallRequirement using the PEP 517 build process.
Returns path to wheel if successfully built. Otherwise, returns None.

View File

@ -14,10 +14,9 @@ logger = logging.getLogger(__name__)
def format_command_result(
command_args, # type: List[str]
command_output, # type: str
):
# type: (...) -> str
command_args: List[str],
command_output: str,
) -> str:
"""Format command information for logging."""
command_desc = format_command_args(command_args)
text = f'Command arguments: {command_desc}\n'
@ -35,13 +34,12 @@ def format_command_result(
def get_legacy_build_wheel_path(
names, # type: List[str]
temp_dir, # type: str
name, # type: str
command_args, # type: List[str]
command_output, # type: str
):
# type: (...) -> Optional[str]
names: List[str],
temp_dir: str,
name: str,
command_args: List[str],
command_output: str,
) -> Optional[str]:
"""Return the path to the wheel in the temporary build directory."""
# Sort for determinism.
names = sorted(names)
@ -65,14 +63,13 @@ def get_legacy_build_wheel_path(
def build_wheel_legacy(
name, # type: str
setup_py_path, # type: str
source_dir, # type: str
global_options, # type: List[str]
build_options, # type: List[str]
tempd, # type: str
):
# type: (...) -> Optional[str]
name: str,
setup_py_path: str,
source_dir: str,
global_options: List[str],
build_options: List[str],
tempd: str,
) -> Optional[str]:
"""Build one unpacked package using the "legacy" build process.
Returns path to wheel if successfully built. Otherwise, returns None.

View File

@ -51,8 +51,9 @@ def create_package_set_from_installed() -> Tuple[PackageSet, bool]:
return package_set, problems
def check_package_set(package_set, should_ignore=None):
# type: (PackageSet, Optional[Callable[[str], bool]]) -> CheckResult
def check_package_set(
package_set: PackageSet, should_ignore: Optional[Callable[[str], bool]] = None
) -> CheckResult:
"""Check if a package set is consistent
If should_ignore is passed, it should be a callable that takes a
@ -64,8 +65,8 @@ def check_package_set(package_set, should_ignore=None):
for package_name, package_detail in package_set.items():
# Info about dependencies of package_name
missing_deps = set() # type: Set[Missing]
conflicting_deps = set() # type: Set[Conflicting]
missing_deps: Set[Missing] = set()
conflicting_deps: Set[Conflicting] = set()
if should_ignore and should_ignore(package_name):
continue
@ -95,8 +96,7 @@ def check_package_set(package_set, should_ignore=None):
return missing, conflicting
def check_install_conflicts(to_install):
# type: (List[InstallRequirement]) -> ConflictDetails
def check_install_conflicts(to_install: List[InstallRequirement]) -> ConflictDetails:
"""For checking if the dependency graph would be consistent after \
installing given requirements
"""
@ -116,8 +116,9 @@ def check_install_conflicts(to_install):
)
def _simulate_installation_of(to_install, package_set):
# type: (List[InstallRequirement], PackageSet) -> Set[NormalizedName]
def _simulate_installation_of(
to_install: List[InstallRequirement], package_set: PackageSet
) -> Set["NormalizedName"]:
"""Computes the version of packages after installing to_install.
"""
# Keep track of packages that were installed
@ -137,8 +138,9 @@ def _simulate_installation_of(to_install, package_set):
return installed
def _create_whitelist(would_be_installed, package_set):
# type: (Set[NormalizedName], PackageSet) -> Set[NormalizedName]
def _create_whitelist(
would_be_installed: Set["NormalizedName"], package_set: PackageSet
) -> Set["NormalizedName"]:
packages_affected = set(would_be_installed)
for package_name in package_set:

View File

@ -36,16 +36,15 @@ class _EditableInfo(NamedTuple):
def freeze(
requirement=None, # type: Optional[List[str]]
local_only=False, # type: bool
user_only=False, # type: bool
paths=None, # type: Optional[List[str]]
isolated=False, # type: bool
exclude_editable=False, # type: bool
skip=() # type: Container[str]
):
# type: (...) -> Iterator[str]
installations = {} # type: Dict[str, FrozenRequirement]
requirement: Optional[List[str]] = None,
local_only: bool = False,
user_only: bool = False,
paths: Optional[List[str]] = None,
isolated: bool = False,
exclude_editable: bool = False,
skip: Container[str] = ()
) -> Iterator[str]:
installations: Dict[str, FrozenRequirement] = {}
dists = get_environment(paths).iter_installed_distributions(
local_only=local_only,
@ -63,10 +62,10 @@ def freeze(
# should only be emitted once, even if the same option is in multiple
# requirements files, so we need to keep track of what has been emitted
# so that we don't emit it again if it's seen again
emitted_options = set() # type: Set[str]
emitted_options: Set[str] = set()
# keep track of which files a requirement is in so that we can
# give an accurate warning if a requirement appears multiple times.
req_files = collections.defaultdict(list) # type: Dict[str, List[str]]
req_files: Dict[str, List[str]] = collections.defaultdict(list)
for req_file_path in requirement:
with open(req_file_path) as req_file:
for line in req_file:
@ -241,8 +240,13 @@ def _get_editable_info(dist: BaseDistribution) -> _EditableInfo:
class FrozenRequirement:
def __init__(self, name, req, editable, comments=()):
# type: (str, Union[str, Requirement], bool, Iterable[str]) -> None
def __init__(
self,
name: str,
req: Union[str, Requirement],
editable: bool,
comments: Iterable[str] = (),
) -> None:
self.name = name
self.canonical_name = canonicalize_name(name)
self.req = req
@ -269,8 +273,7 @@ class FrozenRequirement:
return cls(dist.raw_name, req, editable, comments=comments)
def __str__(self):
# type: () -> str
def __str__(self) -> str:
req = self.req
if self.editable:
req = f'-e {req}'

View File

@ -12,18 +12,17 @@ logger = logging.getLogger(__name__)
def install_editable(
install_options, # type: List[str]
global_options, # type: Sequence[str]
prefix, # type: Optional[str]
home, # type: Optional[str]
use_user_site, # type: bool
name, # type: str
setup_py_path, # type: str
isolated, # type: bool
build_env, # type: BuildEnvironment
unpacked_source_directory, # type: str
):
# type: (...) -> None
install_options: List[str],
global_options: Sequence[str],
prefix: Optional[str],
home: Optional[str],
use_user_site: bool,
name: str,
setup_py_path: str,
isolated: bool,
build_env: BuildEnvironment,
unpacked_source_directory: str,
) -> None:
"""Install a package in editable mode. Most arguments are pass-through
to setuptools.
"""

View File

@ -20,28 +20,26 @@ logger = logging.getLogger(__name__)
class LegacyInstallFailure(Exception):
def __init__(self):
# type: () -> None
def __init__(self) -> None:
self.parent = sys.exc_info()
def install(
install_options, # type: List[str]
global_options, # type: Sequence[str]
root, # type: Optional[str]
home, # type: Optional[str]
prefix, # type: Optional[str]
use_user_site, # type: bool
pycompile, # type: bool
scheme, # type: Scheme
setup_py_path, # type: str
isolated, # type: bool
req_name, # type: str
build_env, # type: BuildEnvironment
unpacked_source_directory, # type: str
req_description, # type: str
):
# type: (...) -> bool
install_options: List[str],
global_options: Sequence[str],
root: Optional[str],
home: Optional[str],
prefix: Optional[str],
use_user_site: bool,
pycompile: bool,
scheme: Scheme,
setup_py_path: str,
isolated: bool,
req_name: str,
build_env: BuildEnvironment,
unpacked_source_directory: str,
req_description: str,
) -> bool:
header_dir = scheme.headers
@ -88,8 +86,7 @@ def install(
with open(record_filename) as f:
record_lines = f.read().splitlines()
def prepend_root(path):
# type: (str) -> str
def prepend_root(path: str) -> str:
if root is None or not os.path.isabs(path):
return path
else:

View File

@ -59,12 +59,11 @@ if TYPE_CHECKING:
from typing import Protocol
class File(Protocol):
src_record_path = None # type: RecordPath
dest_path = None # type: str
changed = None # type: bool
src_record_path: "RecordPath"
dest_path: str
changed: bool
def save(self):
# type: () -> None
def save(self) -> None:
pass
@ -74,8 +73,7 @@ RecordPath = NewType('RecordPath', str)
InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
def rehash(path, blocksize=1 << 20):
# type: (str, int) -> Tuple[str, str]
def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]:
"""Return (encoded_digest, length) for path using hashlib.sha256()"""
h, length = hash_file(path, blocksize)
digest = 'sha256=' + urlsafe_b64encode(
@ -84,16 +82,14 @@ def rehash(path, blocksize=1 << 20):
return (digest, str(length))
def csv_io_kwargs(mode):
# type: (str) -> Dict[str, Any]
def csv_io_kwargs(mode: str) -> Dict[str, Any]:
"""Return keyword arguments to properly open a CSV file
in the given mode.
"""
return {'mode': mode, 'newline': '', 'encoding': 'utf-8'}
def fix_script(path):
# type: (str) -> bool
def fix_script(path: str) -> bool:
"""Replace #!python with #!/path/to/python
Return True if file was changed.
"""
@ -113,8 +109,7 @@ def fix_script(path):
return True
def wheel_root_is_purelib(metadata):
# type: (Message) -> bool
def wheel_root_is_purelib(metadata: Message) -> bool:
return metadata.get("Root-Is-Purelib", "").lower() == "true"
@ -129,8 +124,7 @@ def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, s
return console_scripts, gui_scripts
def message_about_scripts_not_on_PATH(scripts):
# type: (Sequence[str]) -> Optional[str]
def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]:
"""Determine if any scripts are not on PATH and format a warning.
Returns a warning message if one or more scripts are not on PATH,
otherwise None.
@ -139,7 +133,7 @@ def message_about_scripts_not_on_PATH(scripts):
return None
# Group scripts by the path they were installed in
grouped_by_dir = collections.defaultdict(set) # type: Dict[str, Set[str]]
grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set)
for destfile in scripts:
parent_dir = os.path.dirname(destfile)
script_name = os.path.basename(destfile)
@ -153,17 +147,17 @@ def message_about_scripts_not_on_PATH(scripts):
# If an executable sits with sys.executable, we don't warn for it.
# This covers the case of venv invocations without activating the venv.
not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
warn_for = {
warn_for: Dict[str, Set[str]] = {
parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items()
if os.path.normcase(parent_dir) not in not_warn_dirs
} # type: Dict[str, Set[str]]
}
if not warn_for:
return None
# Format a message
msg_lines = []
for parent_dir, dir_scripts in warn_for.items():
sorted_scripts = sorted(dir_scripts) # type: List[str]
sorted_scripts: List[str] = sorted(dir_scripts)
if len(sorted_scripts) == 1:
start_text = "script {} is".format(sorted_scripts[0])
else:
@ -200,8 +194,9 @@ def message_about_scripts_not_on_PATH(scripts):
return "\n".join(msg_lines)
def _normalized_outrows(outrows):
# type: (Iterable[InstalledCSVRow]) -> List[Tuple[str, str, str]]
def _normalized_outrows(
outrows: Iterable[InstalledCSVRow]
) -> List[Tuple[str, str, str]]:
"""Normalize the given rows of a RECORD file.
Items in each row are converted into str. Rows are then sorted to make
@ -226,13 +221,11 @@ def _normalized_outrows(outrows):
)
def _record_to_fs_path(record_path):
# type: (RecordPath) -> str
def _record_to_fs_path(record_path: RecordPath) -> str:
return record_path
def _fs_to_record_path(path, relative_to=None):
# type: (str, Optional[str]) -> RecordPath
def _fs_to_record_path(path: str, relative_to: Optional[str] = None) -> RecordPath:
if relative_to is not None:
# On Windows, do not handle relative paths if they belong to different
# logical disks
@ -243,25 +236,23 @@ def _fs_to_record_path(path, relative_to=None):
return cast('RecordPath', path)
def _parse_record_path(record_column):
# type: (str) -> RecordPath
def _parse_record_path(record_column: str) -> RecordPath:
p = ensure_text(record_column, encoding='utf-8')
return cast('RecordPath', p)
def get_csv_rows_for_installed(
old_csv_rows, # type: List[List[str]]
installed, # type: Dict[RecordPath, RecordPath]
changed, # type: Set[RecordPath]
generated, # type: List[str]
lib_dir, # type: str
):
# type: (...) -> List[InstalledCSVRow]
old_csv_rows: List[List[str]],
installed: Dict[RecordPath, RecordPath],
changed: Set[RecordPath],
generated: List[str],
lib_dir: str,
) -> List[InstalledCSVRow]:
"""
:param installed: A map from archive RECORD path to installation RECORD
path.
"""
installed_rows = [] # type: List[InstalledCSVRow]
installed_rows: List[InstalledCSVRow] = []
for row in old_csv_rows:
if len(row) > 3:
logger.warning('RECORD line has more than three elements: %s', row)
@ -282,8 +273,7 @@ def get_csv_rows_for_installed(
return installed_rows
def get_console_script_specs(console):
# type: (Dict[str, str]) -> List[str]
def get_console_script_specs(console: Dict[str, str]) -> List[str]:
"""
Given the mapping from entrypoint name to callable, return the relevant
console script specs.
@ -369,19 +359,18 @@ def get_console_script_specs(console):
class ZipBackedFile:
def __init__(self, src_record_path, dest_path, zip_file):
# type: (RecordPath, str, ZipFile) -> None
def __init__(
self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
) -> None:
self.src_record_path = src_record_path
self.dest_path = dest_path
self._zip_file = zip_file
self.changed = False
def _getinfo(self):
# type: () -> ZipInfo
def _getinfo(self) -> ZipInfo:
return self._zip_file.getinfo(self.src_record_path)
def save(self):
# type: () -> None
def save(self) -> None:
# directory creation is lazy and after file filtering
# to ensure we don't install empty dirs; empty dirs can't be
# uninstalled.
@ -410,22 +399,19 @@ class ZipBackedFile:
class ScriptFile:
def __init__(self, file):
# type: (File) -> None
def __init__(self, file: "File") -> None:
self._file = file
self.src_record_path = self._file.src_record_path
self.dest_path = self._file.dest_path
self.changed = False
def save(self):
# type: () -> None
def save(self) -> None:
self._file.save()
self.changed = fix_script(self.dest_path)
class MissingCallableSuffix(InstallationError):
def __init__(self, entry_point):
# type: (str) -> None
def __init__(self, entry_point: str) -> None:
super().__init__(
"Invalid script entry point: {} - A callable "
"suffix is required. Cf https://packaging.python.org/"
@ -434,31 +420,28 @@ class MissingCallableSuffix(InstallationError):
)
def _raise_for_invalid_entrypoint(specification):
# type: (str) -> None
def _raise_for_invalid_entrypoint(specification: str) -> None:
entry = get_export_entry(specification)
if entry is not None and entry.suffix is None:
raise MissingCallableSuffix(str(entry))
class PipScriptMaker(ScriptMaker):
def make(self, specification, options=None):
# type: (str, Dict[str, Any]) -> List[str]
def make(self, specification: str, options: Dict[str, Any] = None) -> List[str]:
_raise_for_invalid_entrypoint(specification)
return super().make(specification, options)
def _install_wheel(
name, # type: str
wheel_zip, # type: ZipFile
wheel_path, # type: str
scheme, # type: Scheme
pycompile=True, # type: bool
warn_script_location=True, # type: bool
direct_url=None, # type: Optional[DirectUrl]
requested=False, # type: bool
):
# type: (...) -> None
name: str,
wheel_zip: ZipFile,
wheel_path: str,
scheme: Scheme,
pycompile: bool = True,
warn_script_location: bool = True,
direct_url: Optional[DirectUrl] = None,
requested: bool = False,
) -> None:
"""Install a wheel.
:param name: Name of the project to install
@ -485,20 +468,20 @@ def _install_wheel(
# installed = files copied from the wheel to the destination
# changed = files changed while installing (scripts #! line typically)
# generated = files newly generated during the install (script wrappers)
installed = {} # type: Dict[RecordPath, RecordPath]
changed = set() # type: Set[RecordPath]
generated = [] # type: List[str]
installed: Dict[RecordPath, RecordPath] = {}
changed: Set[RecordPath] = set()
generated: List[str] = []
def record_installed(srcfile, destfile, modified=False):
# type: (RecordPath, str, bool) -> None
def record_installed(
srcfile: RecordPath, destfile: str, modified: bool = False
) -> None:
"""Map archive RECORD paths to installation RECORD paths."""
newpath = _fs_to_record_path(destfile, lib_dir)
installed[srcfile] = newpath
if modified:
changed.add(_fs_to_record_path(destfile))
def all_paths():
# type: () -> Iterable[RecordPath]
def all_paths() -> Iterable[RecordPath]:
names = wheel_zip.namelist()
# If a flag is set, names may be unicode in Python 2. We convert to
# text explicitly so these are valid for lookup in RECORD.
@ -506,12 +489,10 @@ def _install_wheel(
for name in decoded_names:
yield cast("RecordPath", name)
def is_dir_path(path):
# type: (RecordPath) -> bool
def is_dir_path(path: RecordPath) -> bool:
return path.endswith("/")
def assert_no_path_traversal(dest_dir_path, target_path):
# type: (str, str) -> None
def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
if not is_within_directory(dest_dir_path, target_path):
message = (
"The wheel {!r} has a file {!r} trying to install"
@ -521,10 +502,10 @@ def _install_wheel(
message.format(wheel_path, target_path, dest_dir_path)
)
def root_scheme_file_maker(zip_file, dest):
# type: (ZipFile, str) -> Callable[[RecordPath], File]
def make_root_scheme_file(record_path):
# type: (RecordPath) -> File
def root_scheme_file_maker(
zip_file: ZipFile, dest: str
) -> Callable[[RecordPath], "File"]:
def make_root_scheme_file(record_path: RecordPath) -> "File":
normed_path = os.path.normpath(record_path)
dest_path = os.path.join(dest, normed_path)
assert_no_path_traversal(dest, dest_path)
@ -532,8 +513,9 @@ def _install_wheel(
return make_root_scheme_file
def data_scheme_file_maker(zip_file, scheme):
# type: (ZipFile, Scheme) -> Callable[[RecordPath], File]
def data_scheme_file_maker(
zip_file: ZipFile, scheme: Scheme
) -> Callable[[RecordPath], "File"]:
scheme_paths = {}
for key in SCHEME_KEYS:
encoded_key = ensure_text(key)
@ -541,8 +523,7 @@ def _install_wheel(
getattr(scheme, key), encoding=sys.getfilesystemencoding()
)
def make_data_scheme_file(record_path):
# type: (RecordPath) -> File
def make_data_scheme_file(record_path: RecordPath) -> "File":
normed_path = os.path.normpath(record_path)
try:
_, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
@ -572,8 +553,7 @@ def _install_wheel(
return make_data_scheme_file
def is_data_scheme_path(path):
# type: (RecordPath) -> bool
def is_data_scheme_path(path: RecordPath) -> bool:
return path.split("/", 1)[0].endswith(".data")
paths = all_paths()
@ -588,8 +568,7 @@ def _install_wheel(
)
files = map(make_root_scheme_file, root_scheme_paths)
def is_script_scheme_path(path):
# type: (RecordPath) -> bool
def is_script_scheme_path(path: RecordPath) -> bool:
parts = path.split("/", 2)
return (
len(parts) > 2 and
@ -609,8 +588,7 @@ def _install_wheel(
distribution = get_wheel_distribution(wheel_path, canonicalize_name(name))
console, gui = get_entrypoints(distribution)
def is_entrypoint_wrapper(file):
# type: (File) -> bool
def is_entrypoint_wrapper(file: "File") -> bool:
# EP, EP.exe and EP-script.py are scripts generated for
# entry point EP by setuptools
path = file.dest_path
@ -637,8 +615,7 @@ def _install_wheel(
file.save()
record_installed(file.src_record_path, file.dest_path, file.changed)
def pyc_source_file_paths():
# type: () -> Iterator[str]
def pyc_source_file_paths() -> Iterator[str]:
# We de-duplicate installation paths, since there can be overlap (e.g.
# file in .data maps to same location as file in wheel root).
# Sorting installation paths makes it easier to reproduce and debug
@ -651,8 +628,7 @@ def _install_wheel(
continue
yield full_installed_path
def pyc_output_path(path):
# type: (str) -> str
def pyc_output_path(path: str) -> str:
"""Return the path the pyc file would have been written to.
"""
return importlib.util.cache_from_source(path)
@ -716,8 +692,7 @@ def _install_wheel(
generated_file_mode = 0o666 & ~current_umask()
@contextlib.contextmanager
def _generate_file(path, **kwargs):
# type: (str, **Any) -> Iterator[BinaryIO]
def _generate_file(path: str, **kwargs: Any) -> Iterator[BinaryIO]:
with adjacent_tmp_file(path, **kwargs) as f:
yield f
os.chmod(f.name, generated_file_mode)
@ -767,8 +742,7 @@ def _install_wheel(
@contextlib.contextmanager
def req_error_context(req_description):
# type: (str) -> Iterator[None]
def req_error_context(req_description: str) -> Iterator[None]:
try:
yield
except InstallationError as e:
@ -779,16 +753,15 @@ def req_error_context(req_description):
def install_wheel(
name, # type: str
wheel_path, # type: str
scheme, # type: Scheme
req_description, # type: str
pycompile=True, # type: bool
warn_script_location=True, # type: bool
direct_url=None, # type: Optional[DirectUrl]
requested=False, # type: bool
):
# type: (...) -> None
name: str,
wheel_path: str,
scheme: Scheme,
req_description: str,
pycompile: bool = True,
warn_script_location: bool = True,
direct_url: Optional[DirectUrl] = None,
requested: bool = False,
) -> None:
with ZipFile(wheel_path, allowZip64=True) as z:
with req_error_context(req_description):
_install_wheel(

View File

@ -48,12 +48,11 @@ logger = logging.getLogger(__name__)
def _get_prepared_distribution(
req, # type: InstallRequirement
req_tracker, # type: RequirementTracker
finder, # type: PackageFinder
build_isolation, # type: bool
):
# type: (...) -> Distribution
req: InstallRequirement,
req_tracker: RequirementTracker,
finder: PackageFinder,
build_isolation: bool,
) -> Distribution:
"""Prepare a distribution for installation."""
abstract_dist = make_distribution_for_install_requirement(req)
with req_tracker.track(req):
@ -61,8 +60,7 @@ def _get_prepared_distribution(
return abstract_dist.get_pkg_resources_distribution()
def unpack_vcs_link(link, location):
# type: (Link, str) -> None
def unpack_vcs_link(link: Link, location: str) -> None:
vcs_backend = vcs.get_backend_for_scheme(link.scheme)
assert vcs_backend is not None
vcs_backend.unpack(location, url=hide_url(link.url))
@ -70,8 +68,7 @@ def unpack_vcs_link(link, location):
class File:
def __init__(self, path, content_type):
# type: (str, Optional[str]) -> None
def __init__(self, path: str, content_type: Optional[str]) -> None:
self.path = path
if content_type is None:
self.content_type = mimetypes.guess_type(path)[0]
@ -80,12 +77,11 @@ class File:
def get_http_url(
link, # type: Link
download, # type: Downloader
download_dir=None, # type: Optional[str]
hashes=None, # type: Optional[Hashes]
):
# type: (...) -> File
link: Link,
download: Downloader,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None,
) -> File:
temp_dir = TempDirectory(kind="unpack", globally_managed=True)
# If a download dir is specified, is the file already downloaded there?
already_downloaded_path = None
@ -106,8 +102,7 @@ def get_http_url(
return File(from_path, content_type)
def _copy2_ignoring_special_files(src, dest):
# type: (str, str) -> None
def _copy2_ignoring_special_files(src: str, dest: str) -> None:
"""Copying special files is not supported, but as a convenience to users
we skip errors copying them. This supports tools that may create e.g.
socket files in the project source directory.
@ -127,15 +122,13 @@ def _copy2_ignoring_special_files(src, dest):
)
def _copy_source_tree(source, target):
# type: (str, str) -> None
def _copy_source_tree(source: str, target: str) -> None:
target_abspath = os.path.abspath(target)
target_basename = os.path.basename(target_abspath)
target_dirname = os.path.dirname(target_abspath)
def ignore(d, names):
# type: (str, List[str]) -> List[str]
skipped = [] # type: List[str]
def ignore(d: str, names: List[str]) -> List[str]:
skipped: List[str] = []
if d == source:
# Pulling in those directories can potentially be very slow,
# exclude the following directories if they appear in the top
@ -159,11 +152,10 @@ def _copy_source_tree(source, target):
def get_file_url(
link, # type: Link
download_dir=None, # type: Optional[str]
hashes=None # type: Optional[Hashes]
):
# type: (...) -> File
link: Link,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None
) -> File:
"""Get file and optionally check its hash.
"""
# If a download dir is specified, is the file already there and valid?
@ -189,13 +181,12 @@ def get_file_url(
def unpack_url(
link, # type: Link
location, # type: str
download, # type: Downloader
download_dir=None, # type: Optional[str]
hashes=None, # type: Optional[Hashes]
):
# type: (...) -> Optional[File]
link: Link,
location: str,
download: Downloader,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None,
) -> Optional[File]:
"""Unpack link into location, downloading if required.
:param hashes: A Hashes object, one of whose embedded hashes must match,
@ -251,8 +242,9 @@ def unpack_url(
return file
def _check_download_dir(link, download_dir, hashes):
# type: (Link, str, Optional[Hashes]) -> Optional[str]
def _check_download_dir(
link: Link, download_dir: str, hashes: Optional[Hashes]
) -> Optional[str]:
""" Check download_dir for previously downloaded file with correct hash
If a correct file is found return its path else None
"""
@ -283,20 +275,19 @@ class RequirementPreparer:
def __init__(
self,
build_dir, # type: str
download_dir, # type: Optional[str]
src_dir, # type: str
build_isolation, # type: bool
req_tracker, # type: RequirementTracker
session, # type: PipSession
progress_bar, # type: str
finder, # type: PackageFinder
require_hashes, # type: bool
use_user_site, # type: bool
lazy_wheel, # type: bool
in_tree_build, # type: bool
):
# type: (...) -> None
build_dir: str,
download_dir: Optional[str],
src_dir: str,
build_isolation: bool,
req_tracker: RequirementTracker,
session: PipSession,
progress_bar: str,
finder: PackageFinder,
require_hashes: bool,
use_user_site: bool,
lazy_wheel: bool,
in_tree_build: bool,
) -> None:
super().__init__()
self.src_dir = src_dir
@ -327,13 +318,12 @@ class RequirementPreparer:
self.in_tree_build = in_tree_build
# Memoized downloaded files, as mapping of url: (path, mime type)
self._downloaded = {} # type: Dict[str, Tuple[str, str]]
self._downloaded: Dict[str, Tuple[str, str]] = {}
# Previous "header" printed for a link-based InstallRequirement
self._previous_requirement_header = ("", "")
def _log_preparing_link(self, req):
# type: (InstallRequirement) -> None
def _log_preparing_link(self, req: InstallRequirement) -> None:
"""Provide context for the requirement being prepared."""
if req.link.is_file and not req.original_link_is_in_wheel_cache:
message = "Processing %s"
@ -350,8 +340,9 @@ class RequirementPreparer:
with indent_log():
logger.info("Using cached %s", req.link.filename)
def _ensure_link_req_src_dir(self, req, parallel_builds):
# type: (InstallRequirement, bool) -> None
def _ensure_link_req_src_dir(
self, req: InstallRequirement, parallel_builds: bool
) -> None:
"""Ensure source_dir of a linked InstallRequirement."""
# Since source_dir is only set for editable requirements.
if req.link.is_wheel:
@ -385,8 +376,7 @@ class RequirementPreparer:
"Please delete it and try again.".format(req, req.source_dir)
)
def _get_linked_req_hashes(self, req):
# type: (InstallRequirement) -> Hashes
def _get_linked_req_hashes(self, req: InstallRequirement) -> Hashes:
# By the time this is called, the requirement's link should have
# been checked so we can tell what kind of requirements req is
# and raise some more informative errors than otherwise.
@ -418,8 +408,7 @@ class RequirementPreparer:
# showing the user what the hash should be.
return req.hashes(trust_internet=False) or MissingHashes()
def _fetch_metadata_using_lazy_wheel(self, link):
# type: (Link) -> Optional[Distribution]
def _fetch_metadata_using_lazy_wheel(self, link: Link) -> Optional[Distribution]:
"""Fetch metadata using lazy wheel, if possible."""
if not self.use_lazy_wheel:
return None
@ -449,10 +438,9 @@ class RequirementPreparer:
def _complete_partial_requirements(
self,
partially_downloaded_reqs, # type: Iterable[InstallRequirement]
parallel_builds=False, # type: bool
):
# type: (...) -> None
partially_downloaded_reqs: Iterable[InstallRequirement],
parallel_builds: bool = False,
) -> None:
"""Download any requirements which were only fetched by metadata."""
# Download to a temporary directory. These will be copied over as
# needed for downstream 'download', 'wheel', and 'install' commands.
@ -461,7 +449,7 @@ class RequirementPreparer:
# Map each link to the requirement that owns it. This allows us to set
# `req.local_file_path` on the appropriate requirement after passing
# all the links at once into BatchDownloader.
links_to_fully_download = {} # type: Dict[Link, InstallRequirement]
links_to_fully_download: Dict[Link, InstallRequirement] = {}
for req in partially_downloaded_reqs:
assert req.link
links_to_fully_download[req.link] = req
@ -480,8 +468,9 @@ class RequirementPreparer:
for req in partially_downloaded_reqs:
self._prepare_linked_requirement(req, parallel_builds)
def prepare_linked_requirement(self, req, parallel_builds=False):
# type: (InstallRequirement, bool) -> Distribution
def prepare_linked_requirement(
self, req: InstallRequirement, parallel_builds: bool = False
) -> Distribution:
"""Prepare a requirement to be obtained from req.link."""
assert req.link
link = req.link
@ -507,8 +496,9 @@ class RequirementPreparer:
# None of the optimizations worked, fully prepare the requirement
return self._prepare_linked_requirement(req, parallel_builds)
def prepare_linked_requirements_more(self, reqs, parallel_builds=False):
# type: (Iterable[InstallRequirement], bool) -> None
def prepare_linked_requirements_more(
self, reqs: Iterable[InstallRequirement], parallel_builds: bool = False
) -> None:
"""Prepare linked requirements more, if needed."""
reqs = [req for req in reqs if req.needs_more_preparation]
for req in reqs:
@ -522,7 +512,7 @@ class RequirementPreparer:
# Prepare requirements we found were already downloaded for some
# reason. The other downloads will be completed separately.
partially_downloaded_reqs = [] # type: List[InstallRequirement]
partially_downloaded_reqs: List[InstallRequirement] = []
for req in reqs:
if req.needs_more_preparation:
partially_downloaded_reqs.append(req)
@ -535,8 +525,9 @@ class RequirementPreparer:
partially_downloaded_reqs, parallel_builds=parallel_builds,
)
def _prepare_linked_requirement(self, req, parallel_builds):
# type: (InstallRequirement, bool) -> Distribution
def _prepare_linked_requirement(
self, req: InstallRequirement, parallel_builds: bool
) -> Distribution:
assert req.link
link = req.link
@ -572,8 +563,7 @@ class RequirementPreparer:
)
return dist
def save_linked_requirement(self, req):
# type: (InstallRequirement) -> None
def save_linked_requirement(self, req: InstallRequirement) -> None:
assert self.download_dir is not None
assert req.link is not None
link = req.link
@ -600,9 +590,8 @@ class RequirementPreparer:
def prepare_editable_requirement(
self,
req, # type: InstallRequirement
):
# type: (...) -> Distribution
req: InstallRequirement,
) -> Distribution:
"""Prepare an editable requirement
"""
assert req.editable, "cannot prepare a non-editable req as editable"
@ -629,10 +618,9 @@ class RequirementPreparer:
def prepare_installed_requirement(
self,
req, # type: InstallRequirement
skip_reason # type: str
):
# type: (...) -> Distribution
req: InstallRequirement,
skip_reason: str
) -> Distribution:
"""Prepare an already-installed requirement
"""
assert req.satisfied_by, "req should have been satisfied but isn't"