1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge pull request #10194 from harupy/type-annotations-internal

This commit is contained in:
Pradyun Gedam 2021-08-14 09:09:28 +01:00 committed by GitHub
commit f51895867a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 191 deletions

View file

@ -32,8 +32,7 @@ logger = logging.getLogger(__name__)
class _Prefix:
def __init__(self, path):
# type: (str) -> None
def __init__(self, path: str) -> None:
self.path = path
self.setup = False
self.bin_dir = get_paths(
@ -73,8 +72,7 @@ class BuildEnvironment:
"""Creates and manages an isolated environment to install build deps
"""
def __init__(self):
# type: () -> None
def __init__(self) -> None:
temp_dir = TempDirectory(
kind=tempdir_kinds.BUILD_ENV, globally_managed=True
)
@ -84,8 +82,8 @@ class BuildEnvironment:
for name in ('normal', 'overlay')
)
self._bin_dirs = [] # type: List[str]
self._lib_dirs = [] # type: List[str]
self._bin_dirs: List[str] = []
self._lib_dirs: List[str] = []
for prefix in reversed(list(self._prefixes.values())):
self._bin_dirs.append(prefix.bin_dir)
self._lib_dirs.extend(prefix.lib_dirs)
@ -127,8 +125,7 @@ class BuildEnvironment:
'''
).format(system_sites=system_sites, lib_dirs=self._lib_dirs))
def __enter__(self):
# type: () -> None
def __enter__(self) -> None:
self._save_env = {
name: os.environ.get(name, None)
for name in ('PATH', 'PYTHONNOUSERSITE', 'PYTHONPATH')
@ -149,19 +146,19 @@ class BuildEnvironment:
def __exit__(
self,
exc_type, # type: Optional[Type[BaseException]]
exc_val, # type: Optional[BaseException]
exc_tb # type: Optional[TracebackType]
):
# type: (...) -> None
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]
) -> None:
for varname, old_value in self._save_env.items():
if old_value is None:
os.environ.pop(varname, None)
else:
os.environ[varname] = old_value
def check_requirements(self, reqs):
# type: (Iterable[str]) -> Tuple[Set[Tuple[str, str]], Set[str]]
def check_requirements(
self, reqs: Iterable[str]
) -> Tuple[Set[Tuple[str, str]], Set[str]]:
"""Return 2 sets:
- conflicting requirements: set of (installed, wanted) reqs tuples
- missing requirements: set of reqs
@ -187,12 +184,11 @@ class BuildEnvironment:
def install_requirements(
self,
finder, # type: PackageFinder
requirements, # type: Iterable[str]
prefix_as_string, # type: str
message # type: str
):
# type: (...) -> None
finder: "PackageFinder",
requirements: Iterable[str],
prefix_as_string: str,
message: str
) -> None:
prefix = self._prefixes[prefix_as_string]
assert not prefix.setup
prefix.setup = True
@ -223,11 +219,11 @@ class BuildEnvironment:
prefix: _Prefix,
message: str,
) -> None:
args = [
args: List[str] = [
sys.executable, pip_runnable, 'install',
'--ignore-installed', '--no-user', '--prefix', prefix.path,
'--no-warn-script-location',
] # type: List[str]
]
if logger.getEffectiveLevel() <= logging.DEBUG:
args.append('-v')
for format_control in ('no_binary', 'only_binary'):
@ -262,33 +258,28 @@ class NoOpBuildEnvironment(BuildEnvironment):
"""A no-op drop-in replacement for BuildEnvironment
"""
def __init__(self):
# type: () -> None
def __init__(self) -> None:
pass
def __enter__(self):
# type: () -> None
def __enter__(self) -> None:
pass
def __exit__(
self,
exc_type, # type: Optional[Type[BaseException]]
exc_val, # type: Optional[BaseException]
exc_tb # type: Optional[TracebackType]
):
# type: (...) -> None
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]
) -> None:
pass
def cleanup(self):
# type: () -> None
def cleanup(self) -> None:
pass
def install_requirements(
self,
finder, # type: PackageFinder
requirements, # type: Iterable[str]
prefix_as_string, # type: str
message # type: str
):
# type: (...) -> None
finder: "PackageFinder",
requirements: Iterable[str],
prefix_as_string: str,
message: str
) -> None:
raise NotImplementedError()

View file

@ -20,8 +20,7 @@ from pip._internal.utils.urls import path_to_url
logger = logging.getLogger(__name__)
def _hash_dict(d):
# type: (Dict[str, str]) -> str
def _hash_dict(d: Dict[str, str]) -> str:
"""Return a stable sha224 of a dictionary."""
s = json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
return hashlib.sha224(s.encode("ascii")).hexdigest()
@ -38,8 +37,9 @@ class Cache:
('binary' and 'source' are the only allowed values)
"""
def __init__(self, cache_dir, format_control, allowed_formats):
# type: (str, FormatControl, Set[str]) -> None
def __init__(
self, cache_dir: str, format_control: FormatControl, allowed_formats: Set[str]
) -> None:
super().__init__()
assert not cache_dir or os.path.isabs(cache_dir)
self.cache_dir = cache_dir or None
@ -49,8 +49,7 @@ class Cache:
_valid_formats = {"source", "binary"}
assert self.allowed_formats.union(_valid_formats) == _valid_formats
def _get_cache_path_parts(self, link):
# type: (Link) -> List[str]
def _get_cache_path_parts(self, link: Link) -> List[str]:
"""Get parts of part that must be os.path.joined with cache_dir
"""
@ -84,8 +83,7 @@ class Cache:
return parts
def _get_candidates(self, link, canonical_package_name):
# type: (Link, str) -> List[Any]
def _get_candidates(self, link: Link, canonical_package_name: str) -> List[Any]:
can_not_cache = (
not self.cache_dir or
not canonical_package_name or
@ -107,19 +105,17 @@ class Cache:
candidates.append((candidate, path))
return candidates
def get_path_for_link(self, link):
# type: (Link) -> str
def get_path_for_link(self, link: Link) -> str:
"""Return a directory to store cached items in for link.
"""
raise NotImplementedError()
def get(
self,
link, # type: Link
package_name, # type: Optional[str]
supported_tags, # type: List[Tag]
):
# type: (...) -> Link
link: Link,
package_name: Optional[str],
supported_tags: List[Tag],
) -> Link:
"""Returns a link to a cached item if it exists, otherwise returns the
passed link.
"""
@ -130,12 +126,10 @@ class SimpleWheelCache(Cache):
"""A cache of wheels for future installs.
"""
def __init__(self, cache_dir, format_control):
# type: (str, FormatControl) -> None
def __init__(self, cache_dir: str, format_control: FormatControl) -> None:
super().__init__(cache_dir, format_control, {"binary"})
def get_path_for_link(self, link):
# type: (Link) -> str
def get_path_for_link(self, link: Link) -> str:
"""Return a directory to store cached wheels for link
Because there are M wheels for any one sdist, we provide a directory
@ -157,11 +151,10 @@ class SimpleWheelCache(Cache):
def get(
self,
link, # type: Link
package_name, # type: Optional[str]
supported_tags, # type: List[Tag]
):
# type: (...) -> Link
link: Link,
package_name: Optional[str],
supported_tags: List[Tag],
) -> Link:
candidates = []
if not package_name:
@ -204,8 +197,7 @@ class EphemWheelCache(SimpleWheelCache):
"""A SimpleWheelCache that creates it's own temporary cache directory
"""
def __init__(self, format_control):
# type: (FormatControl) -> None
def __init__(self, format_control: FormatControl) -> None:
self._temp_dir = TempDirectory(
kind=tempdir_kinds.EPHEM_WHEEL_CACHE,
globally_managed=True,
@ -217,8 +209,8 @@ class EphemWheelCache(SimpleWheelCache):
class CacheEntry:
def __init__(
self,
link, # type: Link
persistent, # type: bool
link: Link,
persistent: bool,
):
self.link = link
self.persistent = persistent
@ -231,27 +223,23 @@ class WheelCache(Cache):
when a certain link is not found in the simple wheel cache first.
"""
def __init__(self, cache_dir, format_control):
# type: (str, FormatControl) -> None
def __init__(self, cache_dir: str, format_control: FormatControl) -> None:
super().__init__(cache_dir, format_control, {'binary'})
self._wheel_cache = SimpleWheelCache(cache_dir, format_control)
self._ephem_cache = EphemWheelCache(format_control)
def get_path_for_link(self, link):
# type: (Link) -> str
def get_path_for_link(self, link: Link) -> str:
return self._wheel_cache.get_path_for_link(link)
def get_ephem_path_for_link(self, link):
# type: (Link) -> str
def get_ephem_path_for_link(self, link: Link) -> str:
return self._ephem_cache.get_path_for_link(link)
def get(
self,
link, # type: Link
package_name, # type: Optional[str]
supported_tags, # type: List[Tag]
):
# type: (...) -> Link
link: Link,
package_name: Optional[str],
supported_tags: List[Tag],
) -> Link:
cache_entry = self.get_cache_entry(link, package_name, supported_tags)
if cache_entry is None:
return link
@ -259,11 +247,10 @@ class WheelCache(Cache):
def get_cache_entry(
self,
link, # type: Link
package_name, # type: Optional[str]
supported_tags, # type: List[Tag]
):
# type: (...) -> Optional[CacheEntry]
link: Link,
package_name: Optional[str],
supported_tags: List[Tag],
) -> Optional[CacheEntry]:
"""Returns a CacheEntry with a link to a cached item if it exists or
None. The cache entry indicates if the item was found in the persistent
or ephemeral cache.

View file

@ -47,8 +47,7 @@ logger = logging.getLogger(__name__)
# NOTE: Maybe use the optionx attribute to normalize keynames.
def _normalize_name(name):
# type: (str) -> str
def _normalize_name(name: str) -> str:
"""Make a name consistent regardless of source (environment or file)
"""
name = name.lower().replace('_', '-')
@ -57,8 +56,7 @@ def _normalize_name(name):
return name
def _disassemble_key(name):
# type: (str) -> List[str]
def _disassemble_key(name: str) -> List[str]:
if "." not in name:
error_message = (
"Key does not contain dot separated section and key. "
@ -68,8 +66,7 @@ def _disassemble_key(name):
return name.split(".", 1)
def get_configuration_files():
# type: () -> Dict[Kind, List[str]]
def get_configuration_files() -> Dict[Kind, List[str]]:
global_config_files = [
os.path.join(path, CONFIG_BASENAME)
for path in appdirs.site_config_dirs('pip')
@ -105,8 +102,7 @@ class Configuration:
and the data stored is also nice.
"""
def __init__(self, isolated, load_only=None):
# type: (bool, Optional[Kind]) -> None
def __init__(self, isolated: bool, load_only: Optional[Kind] = None) -> None:
super().__init__()
if load_only is not None and load_only not in VALID_LOAD_ONLY:
@ -119,24 +115,22 @@ class Configuration:
self.load_only = load_only
# Because we keep track of where we got the data from
self._parsers = {
self._parsers: Dict[Kind, List[Tuple[str, RawConfigParser]]] = {
variant: [] for variant in OVERRIDE_ORDER
} # type: Dict[Kind, List[Tuple[str, RawConfigParser]]]
self._config = {
}
self._config: Dict[Kind, Dict[str, Any]] = {
variant: {} for variant in OVERRIDE_ORDER
} # type: Dict[Kind, Dict[str, Any]]
self._modified_parsers = [] # type: List[Tuple[str, RawConfigParser]]
}
self._modified_parsers: List[Tuple[str, RawConfigParser]] = []
def load(self):
# type: () -> None
def load(self) -> None:
"""Loads configuration from configuration files and environment
"""
self._load_config_files()
if not self.isolated:
self._load_environment_vars()
def get_file_to_edit(self):
# type: () -> Optional[str]
def get_file_to_edit(self) -> Optional[str]:
"""Returns the file with highest priority in configuration
"""
assert self.load_only is not None, \
@ -147,15 +141,13 @@ class Configuration:
except IndexError:
return None
def items(self):
# type: () -> Iterable[Tuple[str, Any]]
def items(self) -> Iterable[Tuple[str, Any]]:
"""Returns key-value pairs like dict.items() representing the loaded
configuration
"""
return self._dictionary.items()
def get_value(self, key):
# type: (str) -> Any
def get_value(self, key: str) -> Any:
"""Get a value from the configuration.
"""
try:
@ -163,8 +155,7 @@ class Configuration:
except KeyError:
raise ConfigurationError(f"No such key - {key}")
def set_value(self, key, value):
# type: (str, Any) -> None
def set_value(self, key: str, value: Any) -> None:
"""Modify a value in the configuration.
"""
self._ensure_have_load_only()
@ -183,8 +174,7 @@ class Configuration:
self._config[self.load_only][key] = value
self._mark_as_modified(fname, parser)
def unset_value(self, key):
# type: (str) -> None
def unset_value(self, key: str) -> None:
"""Unset a value in the configuration."""
self._ensure_have_load_only()
@ -210,8 +200,7 @@ class Configuration:
del self._config[self.load_only][key]
def save(self):
# type: () -> None
def save(self) -> None:
"""Save the current in-memory state.
"""
self._ensure_have_load_only()
@ -229,15 +218,13 @@ class Configuration:
# Private routines
#
def _ensure_have_load_only(self):
# type: () -> None
def _ensure_have_load_only(self) -> None:
if self.load_only is None:
raise ConfigurationError("Needed a specific file to be modifying.")
logger.debug("Will be working with %s variant only", self.load_only)
@property
def _dictionary(self):
# type: () -> Dict[str, Any]
def _dictionary(self) -> Dict[str, Any]:
"""A dictionary representing the loaded configuration.
"""
# NOTE: Dictionaries are not populated if not loaded. So, conditionals
@ -249,8 +236,7 @@ class Configuration:
return retval
def _load_config_files(self):
# type: () -> None
def _load_config_files(self) -> None:
"""Loads configuration from configuration files
"""
config_files = dict(self.iter_config_files())
@ -276,8 +262,7 @@ class Configuration:
# Keeping track of the parsers used
self._parsers[variant].append((fname, parser))
def _load_file(self, variant, fname):
# type: (Kind, str) -> RawConfigParser
def _load_file(self, variant: Kind, fname: str) -> RawConfigParser:
logger.debug("For variant '%s', will try loading '%s'", variant, fname)
parser = self._construct_parser(fname)
@ -287,8 +272,7 @@ class Configuration:
return parser
def _construct_parser(self, fname):
# type: (str) -> RawConfigParser
def _construct_parser(self, fname: str) -> RawConfigParser:
parser = configparser.RawConfigParser()
# If there is no such file, don't bother reading it but create the
# parser anyway, to hold the data.
@ -310,16 +294,16 @@ class Configuration:
raise ConfigurationFileCouldNotBeLoaded(error=error)
return parser
def _load_environment_vars(self):
# type: () -> None
def _load_environment_vars(self) -> None:
"""Loads configuration from environment variables
"""
self._config[kinds.ENV_VAR].update(
self._normalized_keys(":env:", self.get_environ_vars())
)
def _normalized_keys(self, section, items):
# type: (str, Iterable[Tuple[str, Any]]) -> Dict[str, Any]
def _normalized_keys(
self, section: str, items: Iterable[Tuple[str, Any]]
) -> Dict[str, Any]:
"""Normalizes items to construct a dictionary with normalized keys.
This routine is where the names become keys and are made the same
@ -331,8 +315,7 @@ class Configuration:
normalized[key] = val
return normalized
def get_environ_vars(self):
# type: () -> Iterable[Tuple[str, str]]
def get_environ_vars(self) -> Iterable[Tuple[str, str]]:
"""Returns a generator with all environmental vars with prefix PIP_"""
for key, val in os.environ.items():
if key.startswith("PIP_"):
@ -341,8 +324,7 @@ class Configuration:
yield name, val
# XXX: This is patched in the tests.
def iter_config_files(self):
# type: () -> Iterable[Tuple[Kind, List[str]]]
def iter_config_files(self) -> Iterable[Tuple[Kind, List[str]]]:
"""Yields variant and configuration files associated with it.
This should be treated like items of a dictionary.
@ -372,13 +354,11 @@ class Configuration:
# finally virtualenv configuration first trumping others
yield kinds.SITE, config_files[kinds.SITE]
def get_values_in_config(self, variant):
# type: (Kind) -> Dict[str, Any]
def get_values_in_config(self, variant: Kind) -> Dict[str, Any]:
"""Get values present in a config file"""
return self._config[variant]
def _get_parser_to_modify(self):
# type: () -> Tuple[str, RawConfigParser]
def _get_parser_to_modify(self) -> Tuple[str, RawConfigParser]:
# Determine which parser to modify
assert self.load_only
parsers = self._parsers[self.load_only]
@ -392,12 +372,10 @@ class Configuration:
return parsers[-1]
# XXX: This is patched in the tests.
def _mark_as_modified(self, fname, parser):
# type: (str, RawConfigParser) -> None
def _mark_as_modified(self, fname: str, parser: RawConfigParser) -> None:
file_parser_tuple = (fname, parser)
if file_parser_tuple not in self._modified_parsers:
self._modified_parsers.append(file_parser_tuple)
def __repr__(self):
# type: () -> str
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._dictionary!r})"

View file

@ -38,8 +38,7 @@ class NoneMetadataError(PipError):
"PKG-INFO").
"""
def __init__(self, dist, metadata_name):
# type: (Distribution, str) -> None
def __init__(self, dist: Distribution, metadata_name: str) -> None:
"""
:param dist: A Distribution object.
:param metadata_name: The name of the metadata being accessed
@ -48,8 +47,7 @@ class NoneMetadataError(PipError):
self.dist = dist
self.metadata_name = metadata_name
def __str__(self):
# type: () -> str
def __str__(self) -> str:
# Use `dist` in the error message because its stringification
# includes more information, like the version and location.
return (
@ -62,14 +60,12 @@ class NoneMetadataError(PipError):
class UserInstallationInvalid(InstallationError):
"""A --user install is requested on an environment without user site."""
def __str__(self):
# type: () -> str
def __str__(self) -> str:
return "User base directory is not specified"
class InvalidSchemeCombination(InstallationError):
def __str__(self):
# type: () -> str
def __str__(self) -> str:
before = ", ".join(str(a) for a in self.args[:-1])
return f"Cannot set {before} and {self.args[-1]} together"
@ -102,8 +98,9 @@ class PreviousBuildDirError(PipError):
class NetworkConnectionError(PipError):
"""HTTP connection error"""
def __init__(self, error_msg, response=None, request=None):
# type: (str, Response, Request) -> None
def __init__(
self, error_msg: str, response: Response = None, request: Request = None
) -> None:
"""
Initialize NetworkConnectionError with `request` and `response`
objects.
@ -116,8 +113,7 @@ class NetworkConnectionError(PipError):
self.request = self.response.request
super().__init__(error_msg, response, request)
def __str__(self):
# type: () -> str
def __str__(self) -> str:
return str(self.error_msg)
@ -136,15 +132,15 @@ class MetadataInconsistent(InstallationError):
that do not match the information previously obtained from sdist filename
or user-supplied ``#egg=`` value.
"""
def __init__(self, ireq, field, f_val, m_val):
# type: (InstallRequirement, str, str, str) -> None
def __init__(
self, ireq: "InstallRequirement", field: str, f_val: str, m_val: str
) -> None:
self.ireq = ireq
self.field = field
self.f_val = f_val
self.m_val = m_val
def __str__(self):
# type: () -> str
def __str__(self) -> str:
template = (
"Requested {} has inconsistent {}: "
"filename has {!r}, but metadata has {!r}"
@ -154,13 +150,11 @@ class MetadataInconsistent(InstallationError):
class InstallationSubprocessError(InstallationError):
"""A subprocess call failed during installation."""
def __init__(self, returncode, description):
# type: (int, str) -> None
def __init__(self, returncode: int, description: str) -> None:
self.returncode = returncode
self.description = description
def __str__(self):
# type: () -> str
def __str__(self) -> str:
return (
"Command errored out with exit status {}: {} "
"Check the logs for full command output."
@ -170,16 +164,13 @@ class InstallationSubprocessError(InstallationError):
class HashErrors(InstallationError):
"""Multiple HashError instances rolled into one for reporting"""
def __init__(self):
# type: () -> None
self.errors = [] # type: List[HashError]
def __init__(self) -> None:
self.errors: List["HashError"] = []
def append(self, error):
# type: (HashError) -> None
def append(self, error: "HashError") -> None:
self.errors.append(error)
def __str__(self):
# type: () -> str
def __str__(self) -> str:
lines = []
self.errors.sort(key=lambda e: e.order)
for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
@ -189,8 +180,7 @@ class HashErrors(InstallationError):
return '\n'.join(lines)
return ''
def __bool__(self):
# type: () -> bool
def __bool__(self) -> bool:
return bool(self.errors)
@ -210,12 +200,11 @@ class HashError(InstallationError):
typically available earlier.
"""
req = None # type: Optional[InstallRequirement]
req: Optional["InstallRequirement"] = None
head = ''
order = -1 # type: int
order: int = -1
def body(self):
# type: () -> str
def body(self) -> str:
"""Return a summary of me for display under the heading.
This default implementation simply prints a description of the
@ -227,12 +216,10 @@ class HashError(InstallationError):
"""
return f' {self._requirement_name()}'
def __str__(self):
# type: () -> str
def __str__(self) -> str:
return f'{self.head}\n{self.body()}'
def _requirement_name(self):
# type: () -> str
def _requirement_name(self) -> str:
"""Return a description of the requirement that triggered me.
This default implementation returns long description of the req, with
@ -272,16 +259,14 @@ class HashMissing(HashError):
'manually, note that it turns on automatically when any package '
'has a hash.)')
def __init__(self, gotten_hash):
# type: (str) -> None
def __init__(self, gotten_hash: str) -> None:
"""
:param gotten_hash: The hash of the (possibly malicious) archive we
just downloaded
"""
self.gotten_hash = gotten_hash
def body(self):
# type: () -> str
def body(self) -> str:
# Dodge circular import.
from pip._internal.utils.hashes import FAVORITE_HASH
@ -323,8 +308,7 @@ class HashMismatch(HashError):
'the hashes. Otherwise, examine the package contents carefully; '
'someone may have tampered with them.')
def __init__(self, allowed, gots):
# type: (Dict[str, List[str]], Dict[str, _Hash]) -> None
def __init__(self, allowed: Dict[str, List[str]], gots: Dict[str, "_Hash"]) -> None:
"""
:param allowed: A dict of algorithm names pointing to lists of allowed
hex digests
@ -334,13 +318,11 @@ class HashMismatch(HashError):
self.allowed = allowed
self.gots = gots
def body(self):
# type: () -> str
def body(self) -> str:
return ' {}:\n{}'.format(self._requirement_name(),
self._hash_comparison())
def _hash_comparison(self):
# type: () -> str
def _hash_comparison(self) -> str:
"""
Return a comparison of actual and expected hash values.
@ -351,13 +333,12 @@ class HashMismatch(HashError):
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def hash_then_or(hash_name):
# type: (str) -> chain[str]
def hash_then_or(hash_name: str) -> "chain[str]":
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return chain([hash_name], repeat(' or'))
lines = [] # type: List[str]
lines: List[str] = []
for hash_name, expecteds in self.allowed.items():
prefix = hash_then_or(hash_name)
lines.extend((' Expected {} {}'.format(next(prefix), e))
@ -376,15 +357,18 @@ class ConfigurationFileCouldNotBeLoaded(ConfigurationError):
"""When there are errors while loading a configuration file
"""
def __init__(self, reason="could not be loaded", fname=None, error=None):
# type: (str, Optional[str], Optional[configparser.Error]) -> None
def __init__(
self,
reason: str = "could not be loaded",
fname: Optional[str] = None,
error: Optional[configparser.Error] = None,
) -> None:
super().__init__(error)
self.reason = reason
self.fname = fname
self.error = error
def __str__(self):
# type: () -> str
def __str__(self) -> str:
if self.fname is not None:
message_part = f" in {self.fname}."
else: