1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Merge pull request #10196 from pradyunsg/blacken/req

This commit is contained in:
Pradyun Gedam 2021-07-28 11:05:14 +01:00 committed by GitHub
commit 174e0f6962
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 328 additions and 334 deletions

View file

@ -26,7 +26,6 @@ repos:
^src/pip/_internal/index|
^src/pip/_internal/models|
^src/pip/_internal/operations|
^src/pip/_internal/req|
^src/pip/_internal/vcs|
^src/pip/_internal/\w+\.py$|
# Tests

View file

@ -9,8 +9,10 @@ from .req_install import InstallRequirement
from .req_set import RequirementSet
__all__ = [
"RequirementSet", "InstallRequirement",
"parse_requirements", "install_given_reqs",
"RequirementSet",
"InstallRequirement",
"parse_requirements",
"install_given_reqs",
]
logger = logging.getLogger(__name__)
@ -52,8 +54,8 @@ def install_given_reqs(
if to_install:
logger.info(
'Installing collected packages: %s',
', '.join(to_install.keys()),
"Installing collected packages: %s",
", ".join(to_install.keys()),
)
installed = []
@ -61,11 +63,9 @@ def install_given_reqs(
with indent_log():
for req_name, requirement in to_install.items():
if requirement.should_reinstall:
logger.info('Attempting uninstall: %s', req_name)
logger.info("Attempting uninstall: %s", req_name)
with indent_log():
uninstalled_pathset = requirement.uninstall(
auto_confirm=True
)
uninstalled_pathset = requirement.uninstall(auto_confirm=True)
else:
uninstalled_pathset = None

View file

@ -31,8 +31,9 @@ from pip._internal.utils.urls import path_to_url
from pip._internal.vcs import is_url, vcs
__all__ = [
"install_req_from_editable", "install_req_from_line",
"parse_editable"
"install_req_from_editable",
"install_req_from_line",
"parse_editable",
]
logger = logging.getLogger(__name__)
@ -40,7 +41,7 @@ operators = Specifier._operators.keys()
def _strip_extras(path: str) -> Tuple[str, Optional[str]]:
m = re.match(r'^(.+)(\[[^\]]+\])$', path)
m = re.match(r"^(.+)(\[[^\]]+\])$", path)
extras = None
if m:
path_no_extras = m.group(1)
@ -74,26 +75,25 @@ def parse_editable(editable_req: str) -> Tuple[Optional[str], str, Set[str]]:
url_no_extras, extras = _strip_extras(url)
if os.path.isdir(url_no_extras):
setup_py = os.path.join(url_no_extras, 'setup.py')
setup_cfg = os.path.join(url_no_extras, 'setup.cfg')
setup_py = os.path.join(url_no_extras, "setup.py")
setup_cfg = os.path.join(url_no_extras, "setup.cfg")
if not os.path.exists(setup_py) and not os.path.exists(setup_cfg):
msg = (
'File "setup.py" or "setup.cfg" not found. Directory cannot be '
'installed in editable mode: {}'
.format(os.path.abspath(url_no_extras))
"installed in editable mode: {}".format(os.path.abspath(url_no_extras))
)
pyproject_path = make_pyproject_path(url_no_extras)
if os.path.isfile(pyproject_path):
msg += (
'\n(A "pyproject.toml" file was found, but editable '
'mode currently requires a setuptools-based build.)'
"mode currently requires a setuptools-based build.)"
)
raise InstallationError(msg)
# Treating it as code that has already been checked out
url_no_extras = path_to_url(url_no_extras)
if url_no_extras.lower().startswith('file:'):
if url_no_extras.lower().startswith("file:"):
package_name = Link(url_no_extras).egg_fragment
if extras:
return (
@ -105,8 +105,8 @@ def parse_editable(editable_req: str) -> Tuple[Optional[str], str, Set[str]]:
return package_name, url_no_extras, set()
for version_control in vcs:
if url.lower().startswith(f'{version_control}:'):
url = f'{version_control}+{url}'
if url.lower().startswith(f"{version_control}:"):
url = f"{version_control}+{url}"
break
link = Link(url)
@ -114,9 +114,9 @@ def parse_editable(editable_req: str) -> Tuple[Optional[str], str, Set[str]]:
if not link.is_vcs:
backends = ", ".join(vcs.all_schemes)
raise InstallationError(
f'{editable_req} is not a valid editable requirement. '
f'It should either be a path to a local project or a VCS URL '
f'(beginning with {backends}).'
f"{editable_req} is not a valid editable requirement. "
f"It should either be a path to a local project or a VCS URL "
f"(beginning with {backends})."
)
package_name = link.egg_fragment
@ -150,9 +150,7 @@ def deduce_helpful_msg(req: str) -> str:
" the packages specified within it."
).format(req)
except RequirementParseError:
logger.debug(
"Cannot parse '%s' as requirements file", req, exc_info=True
)
logger.debug("Cannot parse '%s' as requirements file", req, exc_info=True)
else:
msg += f" File '{req}' does not exist."
return msg
@ -160,11 +158,11 @@ def deduce_helpful_msg(req: str) -> str:
class RequirementParts:
def __init__(
self,
requirement: Optional[Requirement],
link: Optional[Link],
markers: Optional[Marker],
extras: Set[str],
self,
requirement: Optional[Requirement],
link: Optional[Link],
markers: Optional[Marker],
extras: Set[str],
):
self.requirement = requirement
self.link = link
@ -258,24 +256,23 @@ def _get_url_from_path(path: str, name: str) -> Optional[str]:
return None
if os.path.isfile(path):
return path_to_url(path)
urlreq_parts = name.split('@', 1)
urlreq_parts = name.split("@", 1)
if len(urlreq_parts) >= 2 and not _looks_like_path(urlreq_parts[0]):
# If the path contains '@' and the part before it does not look
# like a path, try to treat it as a PEP 440 URL req instead.
return None
logger.warning(
'Requirement %r looks like a filename, but the '
'file does not exist',
name
"Requirement %r looks like a filename, but the file does not exist",
name,
)
return path_to_url(path)
def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementParts:
if is_url(name):
marker_sep = '; '
marker_sep = "; "
else:
marker_sep = ';'
marker_sep = ";"
if marker_sep in name:
name, markers_as_string = name.split(marker_sep, 1)
markers_as_string = markers_as_string.strip()
@ -302,9 +299,8 @@ def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementPar
# it's a local file, dir, or url
if link:
# Handle relative file URLs
if link.scheme == 'file' and re.search(r'\.\./', link.url):
link = Link(
path_to_url(os.path.normpath(os.path.abspath(link.path))))
if link.scheme == "file" and re.search(r"\.\./", link.url):
link = Link(path_to_url(os.path.normpath(os.path.abspath(link.path))))
# wheel file
if link.is_wheel:
wheel = Wheel(link.filename) # can raise InvalidWheelFilename
@ -323,7 +319,7 @@ def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementPar
def with_source(text: str) -> str:
if not line_source:
return text
return f'{text} (from {line_source})'
return f"{text} (from {line_source})"
def _parse_req_string(req_as_string: str) -> Requirement:
try:
@ -332,16 +328,15 @@ def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementPar
if os.path.sep in req_as_string:
add_msg = "It looks like a path."
add_msg += deduce_helpful_msg(req_as_string)
elif ('=' in req_as_string and
not any(op in req_as_string for op in operators)):
elif "=" in req_as_string and not any(
op in req_as_string for op in operators
):
add_msg = "= is not a valid operator. Did you mean == ?"
else:
add_msg = ''
msg = with_source(
f'Invalid requirement: {req_as_string!r}'
)
add_msg = ""
msg = with_source(f"Invalid requirement: {req_as_string!r}")
if add_msg:
msg += f'\nHint: {add_msg}'
msg += f"\nHint: {add_msg}"
raise InstallationError(msg)
else:
# Deprecate extras after specifiers: "name>=1.0[extras]"
@ -350,7 +345,7 @@ def parse_req_from_line(name: str, line_source: Optional[str]) -> RequirementPar
# RequirementParts
for spec in req.specifier:
spec_str = str(spec)
if spec_str.endswith(']'):
if spec_str.endswith("]"):
msg = f"Extras after version '{spec_str}'."
raise InstallationError(msg)
return req
@ -382,8 +377,12 @@ def install_req_from_line(
parts = parse_req_from_line(name, line_source)
return InstallRequirement(
parts.requirement, comes_from, link=parts.link, markers=parts.markers,
use_pep517=use_pep517, isolated=isolated,
parts.requirement,
comes_from,
link=parts.link,
markers=parts.markers,
use_pep517=use_pep517,
isolated=isolated,
install_options=options.get("install_options", []) if options else [],
global_options=options.get("global_options", []) if options else [],
hash_options=options.get("hashes", {}) if options else {},
@ -409,8 +408,12 @@ def install_req_from_req_string(
PyPI.file_storage_domain,
TestPyPI.file_storage_domain,
]
if (req.url and comes_from and comes_from.link and
comes_from.link.netloc in domains_not_allowed):
if (
req.url
and comes_from
and comes_from.link
and comes_from.link.netloc in domains_not_allowed
):
# Explicitly disallow pypi packages that depend on external urls
raise InstallationError(
"Packages installed from PyPI cannot depend on packages "

View file

@ -25,20 +25,20 @@ if TYPE_CHECKING:
from pip._internal.index.package_finder import PackageFinder
__all__ = ['parse_requirements']
__all__ = ["parse_requirements"]
ReqFileLines = Iterator[Tuple[int, str]]
LineParser = Callable[[str], Tuple[str, Values]]
SCHEME_RE = re.compile(r'^(http|https|file):', re.I)
COMMENT_RE = re.compile(r'(^|\s+)#.*$')
SCHEME_RE = re.compile(r"^(http|https|file):", re.I)
COMMENT_RE = re.compile(r"(^|\s+)#.*$")
# Matches environment variable-style values in '${MY_VARIABLE_1}' with the
# variable name consisting of only uppercase letters, digits or the '_'
# (underscore). This follows the POSIX standard defined in IEEE Std 1003.1,
# 2013 Edition.
ENV_VAR_RE = re.compile(r'(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})')
ENV_VAR_RE = re.compile(r"(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})")
SUPPORTED_OPTIONS: List[Callable[..., optparse.Option]] = [
cmdoptions.index_url,
@ -134,10 +134,7 @@ def parse_requirements(
for parsed_line in parser.parse(filename, constraint):
parsed_req = handle_line(
parsed_line,
options=options,
finder=finder,
session=session
parsed_line, options=options, finder=finder, session=session
)
if parsed_req is not None:
yield parsed_req
@ -161,8 +158,10 @@ def handle_requirement_line(
) -> ParsedRequirement:
# preserve for the nested code path
line_comes_from = '{} {} (line {})'.format(
'-c' if line.constraint else '-r', line.filename, line.lineno,
line_comes_from = "{} {} (line {})".format(
"-c" if line.constraint else "-r",
line.filename,
line.lineno,
)
assert line.is_requirement
@ -187,7 +186,7 @@ def handle_requirement_line(
if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
req_options[dest] = line.opts.__dict__[dest]
line_source = f'line {line.lineno} of {line.filename}'
line_source = f"line {line.lineno} of {line.filename}"
return ParsedRequirement(
requirement=line.requirement,
is_editable=line.is_editable,
@ -213,8 +212,7 @@ def handle_option_line(
options.require_hashes = opts.require_hashes
if opts.features_enabled:
options.features_enabled.extend(
f for f in opts.features_enabled
if f not in options.features_enabled
f for f in opts.features_enabled if f not in options.features_enabled
)
# set finder options
@ -256,7 +254,7 @@ def handle_option_line(
if session:
for host in opts.trusted_hosts or []:
source = f'line {lineno} of {filename}'
source = f"line {lineno} of {filename}"
session.add_trusted_host(host, source=source)
@ -314,17 +312,15 @@ class RequirementsFileParser:
self._line_parser = line_parser
def parse(self, filename: str, constraint: bool) -> Iterator[ParsedLine]:
"""Parse a given file, yielding parsed lines.
"""
"""Parse a given file, yielding parsed lines."""
yield from self._parse_and_recurse(filename, constraint)
def _parse_and_recurse(
self, filename: str, constraint: bool
) -> Iterator[ParsedLine]:
for line in self._parse_file(filename, constraint):
if (
not line.is_requirement and
(line.opts.requirements or line.opts.constraints)
if not line.is_requirement and (
line.opts.requirements or line.opts.constraints
):
# parse a nested requirements file
if line.opts.requirements:
@ -342,7 +338,8 @@ class RequirementsFileParser:
elif not SCHEME_RE.search(req_path):
# do a join so relative paths work
req_path = os.path.join(
os.path.dirname(filename), req_path,
os.path.dirname(filename),
req_path,
)
yield from self._parse_and_recurse(req_path, nested_constraint)
@ -359,7 +356,7 @@ class RequirementsFileParser:
args_str, opts = self._line_parser(line)
except OptionParsingError as e:
# add offending line
msg = f'Invalid requirement: {line}\n{e.msg}'
msg = f"Invalid requirement: {line}\n{e.msg}"
raise RequirementsFileParseError(msg)
yield ParsedLine(
@ -395,16 +392,16 @@ def break_args_options(line: str) -> Tuple[str, str]:
(and then optparse) the options, not the args. args can contain markers
which are corrupted by shlex.
"""
tokens = line.split(' ')
tokens = line.split(" ")
args = []
options = tokens[:]
for token in tokens:
if token.startswith('-') or token.startswith('--'):
if token.startswith("-") or token.startswith("--"):
break
else:
args.append(token)
options.pop(0)
return ' '.join(args), ' '.join(options)
return " ".join(args), " ".join(options)
class OptionParsingError(Exception):
@ -427,6 +424,7 @@ def build_parser() -> optparse.OptionParser:
# that in our own exception.
def parser_exit(self: Any, msg: str) -> "NoReturn":
raise OptionParsingError(msg)
# NOTE: mypy disallows assigning to a method
# https://github.com/python/mypy/issues/2427
parser.exit = parser_exit # type: ignore
@ -441,26 +439,26 @@ def join_lines(lines_enum: ReqFileLines) -> ReqFileLines:
primary_line_number = None
new_line: List[str] = []
for line_number, line in lines_enum:
if not line.endswith('\\') or COMMENT_RE.match(line):
if not line.endswith("\\") or COMMENT_RE.match(line):
if COMMENT_RE.match(line):
# this ensures comments are always matched later
line = ' ' + line
line = " " + line
if new_line:
new_line.append(line)
assert primary_line_number is not None
yield primary_line_number, ''.join(new_line)
yield primary_line_number, "".join(new_line)
new_line = []
else:
yield line_number, line
else:
if not new_line:
primary_line_number = line_number
new_line.append(line.strip('\\'))
new_line.append(line.strip("\\"))
# last line contains \
if new_line:
assert primary_line_number is not None
yield primary_line_number, ''.join(new_line)
yield primary_line_number, "".join(new_line)
# TODO: handle space after '\'.
@ -470,7 +468,7 @@ def ignore_comments(lines_enum: ReqFileLines) -> ReqFileLines:
Strips comments and filter empty lines.
"""
for line_number, line in lines_enum:
line = COMMENT_RE.sub('', line)
line = COMMENT_RE.sub("", line)
line = line.strip()
if line:
yield line_number, line
@ -514,15 +512,15 @@ def get_file_content(url: str, session: PipSession) -> Tuple[str, str]:
scheme = get_url_scheme(url)
# Pip has special support for file:// URLs (LocalFSAdapter).
if scheme in ['http', 'https', 'file']:
if scheme in ["http", "https", "file"]:
resp = session.get(url)
raise_for_status(resp)
return resp.url, resp.text
# Assume this is a bare path.
try:
with open(url, 'rb') as f:
with open(url, "rb") as f:
content = auto_decode(f.read())
except OSError as exc:
raise InstallationError(f'Could not open requirements file: {exc}')
raise InstallationError(f"Could not open requirements file: {exc}")
return url, content

View file

@ -122,9 +122,7 @@ class InstallRequirement:
if self.editable:
assert link
if link.is_file:
self.source_dir = os.path.normpath(
os.path.abspath(link.file_path)
)
self.source_dir = os.path.normpath(os.path.abspath(link.file_path))
if link is None and req and req.url:
# PEP 508 URL requirement
@ -140,9 +138,7 @@ class InstallRequirement:
if extras:
self.extras = extras
elif req:
self.extras = {
pkg_resources.safe_extra(extra) for extra in req.extras
}
self.extras = {pkg_resources.safe_extra(extra) for extra in req.extras}
else:
self.extras = set()
if markers is None and req:
@ -202,36 +198,34 @@ class InstallRequirement:
if self.req:
s = str(self.req)
if self.link:
s += ' from {}'.format(redact_auth_from_url(self.link.url))
s += " from {}".format(redact_auth_from_url(self.link.url))
elif self.link:
s = redact_auth_from_url(self.link.url)
else:
s = '<InstallRequirement>'
s = "<InstallRequirement>"
if self.satisfied_by is not None:
s += ' in {}'.format(display_path(self.satisfied_by.location))
s += " in {}".format(display_path(self.satisfied_by.location))
if self.comes_from:
if isinstance(self.comes_from, str):
comes_from: Optional[str] = self.comes_from
else:
comes_from = self.comes_from.from_path()
if comes_from:
s += f' (from {comes_from})'
s += f" (from {comes_from})"
return s
def __repr__(self) -> str:
return '<{} object: {} editable={!r}>'.format(
self.__class__.__name__, str(self), self.editable)
return "<{} object: {} editable={!r}>".format(
self.__class__.__name__, str(self), self.editable
)
def format_debug(self) -> str:
"""An un-tested helper for getting state, for debugging.
"""
"""An un-tested helper for getting state, for debugging."""
attributes = vars(self)
names = sorted(attributes)
state = (
"{}={!r}".format(attr, attributes[attr]) for attr in sorted(names)
)
return '<{name} object: {{{state}}}>'.format(
state = ("{}={!r}".format(attr, attributes[attr]) for attr in sorted(names))
return "<{name} object: {{{state}}}>".format(
name=self.__class__.__name__,
state=", ".join(state),
)
@ -254,18 +248,17 @@ class InstallRequirement:
For example, some-package==1.2 is pinned; some-package>1.2 is not.
"""
specifiers = self.specifier
return (len(specifiers) == 1 and
next(iter(specifiers)).operator in {'==', '==='})
return len(specifiers) == 1 and next(iter(specifiers)).operator in {"==", "==="}
def match_markers(self, extras_requested: Optional[Iterable[str]] = None) -> bool:
if not extras_requested:
# Provide an extra to safely evaluate the markers
# without matching any extra
extras_requested = ('',)
extras_requested = ("",)
if self.markers is not None:
return any(
self.markers.evaluate({'extra': extra})
for extra in extras_requested)
self.markers.evaluate({"extra": extra}) for extra in extras_requested
)
else:
return True
@ -301,8 +294,7 @@ class InstallRequirement:
return Hashes(good_hashes)
def from_path(self) -> Optional[str]:
"""Format a nice indicator to show where this "comes from"
"""
"""Format a nice indicator to show where this "comes from" """
if self.req is None:
return None
s = str(self.req)
@ -312,7 +304,7 @@ class InstallRequirement:
else:
comes_from = self.comes_from.from_path()
if comes_from:
s += '->' + comes_from
s += "->" + comes_from
return s
def ensure_build_location(
@ -345,7 +337,7 @@ class InstallRequirement:
# FIXME: Is there a better place to create the build_dir? (hg and bzr
# need this)
if not os.path.exists(build_dir):
logger.debug('Creating directory %s', build_dir)
logger.debug("Creating directory %s", build_dir)
os.makedirs(build_dir)
actual_build_dir = os.path.join(build_dir, dir_name)
# `None` indicates that we respect the globally-configured deletion
@ -359,8 +351,7 @@ class InstallRequirement:
).path
def _set_requirement(self) -> None:
"""Set requirement after generating metadata.
"""
"""Set requirement after generating metadata."""
assert self.req is None
assert self.metadata is not None
assert self.source_dir is not None
@ -372,11 +363,13 @@ class InstallRequirement:
op = "==="
self.req = Requirement(
"".join([
self.metadata["Name"],
op,
self.metadata["Version"],
])
"".join(
[
self.metadata["Name"],
op,
self.metadata["Version"],
]
)
)
def warn_on_mismatching_name(self) -> None:
@ -387,10 +380,12 @@ class InstallRequirement:
# If we're here, there's a mismatch. Log a warning about it.
logger.warning(
'Generating metadata for package %s '
'produced metadata for project name %s. Fix your '
'#egg=%s fragments.',
self.name, metadata_name, self.name
"Generating metadata for package %s "
"produced metadata for project name %s. Fix your "
"#egg=%s fragments.",
self.name,
metadata_name,
self.name,
)
self.req = Requirement(metadata_name)
@ -411,20 +406,22 @@ class InstallRequirement:
# parses the version instead.
existing_version = existing_dist.version
version_compatible = (
existing_version is not None and
self.req.specifier.contains(existing_version, prereleases=True)
existing_version is not None
and self.req.specifier.contains(existing_version, prereleases=True)
)
if not version_compatible:
self.satisfied_by = None
if use_user_site:
if dist_in_usersite(existing_dist):
self.should_reinstall = True
elif (running_under_virtualenv() and
dist_in_site_packages(existing_dist)):
elif running_under_virtualenv() and dist_in_site_packages(
existing_dist
):
raise InstallationError(
"Will not install to the user site because it will "
"lack sys.path precedence to {} in {}".format(
existing_dist.project_name, existing_dist.location)
existing_dist.project_name, existing_dist.location
)
)
else:
self.should_reinstall = True
@ -448,13 +445,13 @@ class InstallRequirement:
@property
def unpacked_source_directory(self) -> str:
return os.path.join(
self.source_dir,
self.link and self.link.subdirectory_fragment or '')
self.source_dir, self.link and self.link.subdirectory_fragment or ""
)
@property
def setup_py_path(self) -> str:
assert self.source_dir, f"No source dir for {self}"
setup_py = os.path.join(self.unpacked_source_directory, 'setup.py')
setup_py = os.path.join(self.unpacked_source_directory, "setup.py")
return setup_py
@ -472,10 +469,7 @@ class InstallRequirement:
follow the PEP 517 or legacy (setup.py) code path.
"""
pyproject_toml_data = load_pyproject_toml(
self.use_pep517,
self.pyproject_toml_path,
self.setup_py_path,
str(self)
self.use_pep517, self.pyproject_toml_path, self.setup_py_path, str(self)
)
if pyproject_toml_data is None:
@ -487,12 +481,13 @@ class InstallRequirement:
self.requirements_to_check = check
self.pyproject_requires = requires
self.pep517_backend = Pep517HookCaller(
self.unpacked_source_directory, backend, backend_path=backend_path,
self.unpacked_source_directory,
backend,
backend_path=backend_path,
)
def _generate_metadata(self) -> str:
"""Invokes metadata generator functions, with the required arguments.
"""
"""Invokes metadata generator functions, with the required arguments."""
if not self.use_pep517:
assert self.unpacked_source_directory
@ -506,7 +501,7 @@ class InstallRequirement:
setup_py_path=self.setup_py_path,
source_dir=self.unpacked_source_directory,
isolated=self.isolated,
details=self.name or f"from {self.link}"
details=self.name or f"from {self.link}",
)
assert self.pep517_backend is not None
@ -537,7 +532,7 @@ class InstallRequirement:
@property
def metadata(self) -> Any:
if not hasattr(self, '_metadata'):
if not hasattr(self, "_metadata"):
self._metadata = get_metadata(self.get_dist())
return self._metadata
@ -547,16 +542,16 @@ class InstallRequirement:
def assert_source_matches_version(self) -> None:
assert self.source_dir
version = self.metadata['version']
version = self.metadata["version"]
if self.req.specifier and version not in self.req.specifier:
logger.warning(
'Requested %s, but installing version %s',
"Requested %s, but installing version %s",
self,
version,
)
else:
logger.debug(
'Source in %s has version %s, which satisfies requirement %s',
"Source in %s has version %s, which satisfies requirement %s",
display_path(self.source_dir),
version,
self,
@ -589,14 +584,13 @@ class InstallRequirement:
def update_editable(self) -> None:
if not self.link:
logger.debug(
"Cannot update repository at %s; repository location is "
"unknown",
"Cannot update repository at %s; repository location is unknown",
self.source_dir,
)
return
assert self.editable
assert self.source_dir
if self.link.scheme == 'file':
if self.link.scheme == "file":
# Static paths don't get updated
return
vcs_backend = vcs.get_backend_for_scheme(self.link.scheme)
@ -627,25 +621,24 @@ class InstallRequirement:
if not dist:
logger.warning("Skipping %s as it is not installed.", self.name)
return None
logger.info('Found existing installation: %s', dist)
logger.info("Found existing installation: %s", dist)
uninstalled_pathset = UninstallPathSet.from_dist(dist)
uninstalled_pathset.remove(auto_confirm, verbose)
return uninstalled_pathset
def _get_archive_name(self, path: str, parentdir: str, rootdir: str) -> str:
def _clean_zip_name(name: str, prefix: str) -> str:
assert name.startswith(prefix + os.path.sep), (
f"name {name!r} doesn't start with prefix {prefix!r}"
)
name = name[len(prefix) + 1:]
name = name.replace(os.path.sep, '/')
assert name.startswith(
prefix + os.path.sep
), f"name {name!r} doesn't start with prefix {prefix!r}"
name = name[len(prefix) + 1 :]
name = name.replace(os.path.sep, "/")
return name
path = os.path.join(parentdir, path)
name = _clean_zip_name(path, rootdir)
return self.name + '/' + name
return self.name + "/" + name
def archive(self, build_dir: Optional[str]) -> None:
"""Saves archive to provided build_dir.
@ -657,57 +650,62 @@ class InstallRequirement:
return
create_archive = True
archive_name = '{}-{}.zip'.format(self.name, self.metadata["version"])
archive_name = "{}-{}.zip".format(self.name, self.metadata["version"])
archive_path = os.path.join(build_dir, archive_name)
if os.path.exists(archive_path):
response = ask_path_exists(
'The file {} exists. (i)gnore, (w)ipe, '
'(b)ackup, (a)bort '.format(
display_path(archive_path)),
('i', 'w', 'b', 'a'))
if response == 'i':
"The file {} exists. (i)gnore, (w)ipe, "
"(b)ackup, (a)bort ".format(display_path(archive_path)),
("i", "w", "b", "a"),
)
if response == "i":
create_archive = False
elif response == 'w':
logger.warning('Deleting %s', display_path(archive_path))
elif response == "w":
logger.warning("Deleting %s", display_path(archive_path))
os.remove(archive_path)
elif response == 'b':
elif response == "b":
dest_file = backup_dir(archive_path)
logger.warning(
'Backing up %s to %s',
"Backing up %s to %s",
display_path(archive_path),
display_path(dest_file),
)
shutil.move(archive_path, dest_file)
elif response == 'a':
elif response == "a":
sys.exit(-1)
if not create_archive:
return
zip_output = zipfile.ZipFile(
archive_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True,
archive_path,
"w",
zipfile.ZIP_DEFLATED,
allowZip64=True,
)
with zip_output:
dir = os.path.normcase(
os.path.abspath(self.unpacked_source_directory)
)
dir = os.path.normcase(os.path.abspath(self.unpacked_source_directory))
for dirpath, dirnames, filenames in os.walk(dir):
for dirname in dirnames:
dir_arcname = self._get_archive_name(
dirname, parentdir=dirpath, rootdir=dir,
dirname,
parentdir=dirpath,
rootdir=dir,
)
zipdir = zipfile.ZipInfo(dir_arcname + '/')
zipdir = zipfile.ZipInfo(dir_arcname + "/")
zipdir.external_attr = 0x1ED << 16 # 0o755
zip_output.writestr(zipdir, '')
zip_output.writestr(zipdir, "")
for filename in filenames:
file_arcname = self._get_archive_name(
filename, parentdir=dirpath, rootdir=dir,
filename,
parentdir=dirpath,
rootdir=dir,
)
filename = os.path.join(dirpath, filename)
zip_output.write(filename, file_arcname)
logger.info('Saved %s', display_path(archive_path))
logger.info("Saved %s", display_path(archive_path))
def install(
self,
@ -718,7 +716,7 @@ class InstallRequirement:
prefix: Optional[str] = None,
warn_script_location: bool = True,
use_user_site: bool = False,
pycompile: bool = True
pycompile: bool = True,
) -> None:
scheme = get_scheme(
self.name,
@ -808,8 +806,9 @@ class InstallRequirement:
deprecated(
reason=(
"{} was installed using the legacy 'setup.py install' "
"method, because a wheel could not be built for it.".
format(self.name)
"method, because a wheel could not be built for it.".format(
self.name
)
),
replacement="to fix the wheel build issue reported above",
gone_in=None,
@ -837,12 +836,10 @@ def check_invalid_constraint_type(req: InstallRequirement) -> str:
"undocumented. The new implementation of the resolver no "
"longer supports these forms."
),
replacement=(
"replacing the constraint with a requirement."
),
replacement="replacing the constraint with a requirement.",
# No plan yet for when the new resolver becomes default
gone_in=None,
issue=8210
issue=8210,
)
return problem

View file

@ -13,10 +13,8 @@ logger = logging.getLogger(__name__)
class RequirementSet:
def __init__(self, check_supported_wheels: bool = True) -> None:
"""Create a RequirementSet.
"""
"""Create a RequirementSet."""
self.requirements: Dict[str, InstallRequirement] = OrderedDict()
self.check_supported_wheels = check_supported_wheels
@ -28,7 +26,7 @@ class RequirementSet:
(req for req in self.requirements.values() if not req.comes_from),
key=lambda req: canonicalize_name(req.name or ""),
)
return ' '.join(str(req.req) for req in requirements)
return " ".join(str(req.req) for req in requirements)
def __repr__(self) -> str:
requirements = sorted(
@ -36,11 +34,11 @@ class RequirementSet:
key=lambda req: canonicalize_name(req.name or ""),
)
format_string = '<{classname} object; {count} requirement(s): {reqs}>'
format_string = "<{classname} object; {count} requirement(s): {reqs}>"
return format_string.format(
classname=self.__class__.__name__,
count=len(requirements),
reqs=', '.join(str(req.req) for req in requirements),
reqs=", ".join(str(req.req) for req in requirements),
)
def add_unnamed_requirement(self, install_req: InstallRequirement) -> None:
@ -57,7 +55,7 @@ class RequirementSet:
self,
install_req: InstallRequirement,
parent_req_name: Optional[str] = None,
extras_requested: Optional[Iterable[str]] = None
extras_requested: Optional[Iterable[str]] = None,
) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]:
"""Add install_req as a requirement to install.
@ -77,7 +75,8 @@ class RequirementSet:
if not install_req.match_markers(extras_requested):
logger.info(
"Ignoring %s: markers '%s' don't match your environment",
install_req.name, install_req.markers,
install_req.name,
install_req.markers,
)
return [], None
@ -88,16 +87,17 @@ class RequirementSet:
if install_req.link and install_req.link.is_wheel:
wheel = Wheel(install_req.link.filename)
tags = compatibility_tags.get_supported()
if (self.check_supported_wheels and not wheel.supported(tags)):
if self.check_supported_wheels and not wheel.supported(tags):
raise InstallationError(
"{} is not a supported wheel on this platform.".format(
wheel.filename)
wheel.filename
)
)
# This next bit is really a sanity check.
assert not install_req.user_supplied or parent_req_name is None, (
"a user supplied req shouldn't have a parent"
)
assert (
not install_req.user_supplied or parent_req_name is None
), "a user supplied req shouldn't have a parent"
# Unnamed requirements are scanned again and the requirement won't be
# added as a dependency until after scanning.
@ -107,23 +107,25 @@ class RequirementSet:
try:
existing_req: Optional[InstallRequirement] = self.get_requirement(
install_req.name)
install_req.name
)
except KeyError:
existing_req = None
has_conflicting_requirement = (
parent_req_name is None and
existing_req and
not existing_req.constraint and
existing_req.extras == install_req.extras and
existing_req.req and
install_req.req and
existing_req.req.specifier != install_req.req.specifier
parent_req_name is None
and existing_req
and not existing_req.constraint
and existing_req.extras == install_req.extras
and existing_req.req
and install_req.req
and existing_req.req.specifier != install_req.req.specifier
)
if has_conflicting_requirement:
raise InstallationError(
"Double requirement given: {} (already in {}, name={!r})"
.format(install_req, existing_req, install_req.name)
"Double requirement given: {} (already in {}, name={!r})".format(
install_req, existing_req, install_req.name
)
)
# When no existing requirement exists, add the requirement as a
@ -138,12 +140,8 @@ class RequirementSet:
if install_req.constraint or not existing_req.constraint:
return [], existing_req
does_not_satisfy_constraint = (
install_req.link and
not (
existing_req.link and
install_req.link.path == existing_req.link.path
)
does_not_satisfy_constraint = install_req.link and not (
existing_req.link and install_req.link.path == existing_req.link.path
)
if does_not_satisfy_constraint:
raise InstallationError(
@ -158,12 +156,13 @@ class RequirementSet:
# mark the existing object as such.
if install_req.user_supplied:
existing_req.user_supplied = True
existing_req.extras = tuple(sorted(
set(existing_req.extras) | set(install_req.extras)
))
existing_req.extras = tuple(
sorted(set(existing_req.extras) | set(install_req.extras))
)
logger.debug(
"Setting %s extras to: %s",
existing_req, existing_req.extras,
existing_req,
existing_req.extras,
)
# Return the existing requirement for addition to the parent and
# scanning again.
@ -173,8 +172,8 @@ class RequirementSet:
project_name = canonicalize_name(name)
return (
project_name in self.requirements and
not self.requirements[project_name].constraint
project_name in self.requirements
and not self.requirements[project_name].constraint
)
def get_requirement(self, name: str) -> InstallRequirement:

View file

@ -40,12 +40,10 @@ def update_env_context_manager(**changes: str) -> Iterator[None]:
@contextlib.contextmanager
def get_requirement_tracker() -> Iterator["RequirementTracker"]:
root = os.environ.get('PIP_REQ_TRACKER')
root = os.environ.get("PIP_REQ_TRACKER")
with contextlib.ExitStack() as ctx:
if root is None:
root = ctx.enter_context(
TempDirectory(kind='req-tracker')
).path
root = ctx.enter_context(TempDirectory(kind="req-tracker")).path
ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
logger.debug("Initialized build tracking at %s", root)
@ -54,7 +52,6 @@ def get_requirement_tracker() -> Iterator["RequirementTracker"]:
class RequirementTracker:
def __init__(self, root: str) -> None:
self._root = root
self._entries: Set[InstallRequirement] = set()
@ -68,7 +65,7 @@ class RequirementTracker:
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]
exc_tb: Optional[TracebackType],
) -> None:
self.cleanup()
@ -77,8 +74,7 @@ class RequirementTracker:
return os.path.join(self._root, hashed)
def add(self, req: InstallRequirement) -> None:
"""Add an InstallRequirement to build tracking.
"""
"""Add an InstallRequirement to build tracking."""
assert req.link
# Get the file to write information about this requirement.
@ -92,30 +88,28 @@ class RequirementTracker:
except FileNotFoundError:
pass
else:
message = '{} is already being built: {}'.format(
req.link, contents)
message = "{} is already being built: {}".format(req.link, contents)
raise LookupError(message)
# If we're here, req should really not be building already.
assert req not in self._entries
# Start tracking this requirement.
with open(entry_path, 'w', encoding="utf-8") as fp:
with open(entry_path, "w", encoding="utf-8") as fp:
fp.write(str(req))
self._entries.add(req)
logger.debug('Added %s to build tracker %r', req, self._root)
logger.debug("Added %s to build tracker %r", req, self._root)
def remove(self, req: InstallRequirement) -> None:
"""Remove an InstallRequirement from build tracking.
"""
"""Remove an InstallRequirement from build tracking."""
assert req.link
# Delete the created file and the corresponding entries.
os.unlink(self._entry_path(req.link))
self._entries.remove(req)
logger.debug('Removed %s from build tracker %r', req, self._root)
logger.debug("Removed %s from build tracker %r", req, self._root)
def cleanup(self) -> None:
for req in set(self._entries):

View file

@ -40,12 +40,12 @@ def _script_names(dist: Distribution, script_name: str, is_gui: bool) -> List[st
exe_name = os.path.join(bin_dir, script_name)
paths_to_remove = [exe_name]
if WINDOWS:
paths_to_remove.append(exe_name + '.exe')
paths_to_remove.append(exe_name + '.exe.manifest')
paths_to_remove.append(exe_name + ".exe")
paths_to_remove.append(exe_name + ".exe.manifest")
if is_gui:
paths_to_remove.append(exe_name + '-script.pyw')
paths_to_remove.append(exe_name + "-script.pyw")
else:
paths_to_remove.append(exe_name + '-script.py')
paths_to_remove.append(exe_name + "-script.py")
return paths_to_remove
@ -57,6 +57,7 @@ def _unique(fn: Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]:
if item not in seen:
seen.add(item)
yield item
return unique
@ -76,29 +77,31 @@ def uninstallation_paths(dist: Distribution) -> Iterator[str]:
https://packaging.python.org/specifications/recording-installed-packages/
"""
try:
r = csv.reader(dist.get_metadata_lines('RECORD'))
r = csv.reader(dist.get_metadata_lines("RECORD"))
except FileNotFoundError as missing_record_exception:
msg = 'Cannot uninstall {dist}, RECORD file not found.'.format(dist=dist)
msg = "Cannot uninstall {dist}, RECORD file not found.".format(dist=dist)
try:
installer = next(dist.get_metadata_lines('INSTALLER'))
if not installer or installer == 'pip':
installer = next(dist.get_metadata_lines("INSTALLER"))
if not installer or installer == "pip":
raise ValueError()
except (OSError, StopIteration, ValueError):
dep = '{}=={}'.format(dist.project_name, dist.version)
msg += (" You might be able to recover from this via: "
"'pip install --force-reinstall --no-deps {}'.".format(dep))
dep = "{}=={}".format(dist.project_name, dist.version)
msg += (
" You might be able to recover from this via: "
"'pip install --force-reinstall --no-deps {}'.".format(dep)
)
else:
msg += ' Hint: The package was installed by {}.'.format(installer)
msg += " Hint: The package was installed by {}.".format(installer)
raise UninstallationError(msg) from missing_record_exception
for row in r:
path = os.path.join(dist.location, row[0])
yield path
if path.endswith('.py'):
if path.endswith(".py"):
dn, fn = os.path.split(path)
base = fn[:-3]
path = os.path.join(dn, base + '.pyc')
path = os.path.join(dn, base + ".pyc")
yield path
path = os.path.join(dn, base + '.pyo')
path = os.path.join(dn, base + ".pyo")
yield path
@ -112,8 +115,8 @@ def compact(paths: Iterable[str]) -> Set[str]:
short_paths: Set[str] = set()
for path in sorted(paths, key=len):
should_skip = any(
path.startswith(shortpath.rstrip("*")) and
path[len(shortpath.rstrip("*").rstrip(sep))] == sep
path.startswith(shortpath.rstrip("*"))
and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
for shortpath in short_paths
)
if not should_skip:
@ -136,18 +139,15 @@ def compress_for_rename(paths: Iterable[str]) -> Set[str]:
return os.path.normcase(os.path.join(*a))
for root in unchecked:
if any(os.path.normcase(root).startswith(w)
for w in wildcards):
if any(os.path.normcase(root).startswith(w) for w in wildcards):
# This directory has already been handled.
continue
all_files: Set[str] = set()
all_subdirs: Set[str] = set()
for dirname, subdirs, files in os.walk(root):
all_subdirs.update(norm_join(root, dirname, d)
for d in subdirs)
all_files.update(norm_join(root, dirname, f)
for f in files)
all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
all_files.update(norm_join(root, dirname, f) for f in files)
# If all the files we found are in our remaining set of files to
# remove, then remove them from the latter set and add a wildcard
# for the directory.
@ -196,14 +196,14 @@ def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str
continue
file_ = os.path.join(dirpath, fname)
if (os.path.isfile(file_) and
os.path.normcase(file_) not in _normcased_files):
if (
os.path.isfile(file_)
and os.path.normcase(file_) not in _normcased_files
):
# We are skipping this file. Add it to the set.
will_skip.add(file_)
will_remove = files | {
os.path.join(folder, "*") for folder in folders
}
will_remove = files | {os.path.join(folder, "*") for folder in folders}
return will_remove, will_skip
@ -211,6 +211,7 @@ def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str
class StashedUninstallPathSet:
"""A set of file rename operations to stash files while
tentatively uninstalling them."""
def __init__(self) -> None:
# Mapping from source file root to [Adjacent]TempDirectory
# for files under that directory.
@ -252,7 +253,7 @@ class StashedUninstallPathSet:
else:
# Did not find any suitable root
head = os.path.dirname(path)
save_dir = TempDirectory(kind='uninstall')
save_dir = TempDirectory(kind="uninstall")
self._save_dirs[head] = save_dir
relpath = os.path.relpath(path, head)
@ -271,7 +272,7 @@ class StashedUninstallPathSet:
new_path = self._get_file_stash(path)
self._moves.append((path, new_path))
if (path_is_dir and os.path.isdir(new_path)):
if path_is_dir and os.path.isdir(new_path):
# If we're moving a directory, we need to
# remove the destination first or else it will be
# moved to inside the existing directory.
@ -295,7 +296,7 @@ class StashedUninstallPathSet:
for new_path, path in self._moves:
try:
logger.debug('Replacing %s from %s', new_path, path)
logger.debug("Replacing %s from %s", new_path, path)
if os.path.isfile(new_path) or os.path.islink(new_path):
os.unlink(new_path)
elif os.path.isdir(new_path):
@ -315,6 +316,7 @@ class StashedUninstallPathSet:
class UninstallPathSet:
"""A set of file paths to be removed in the uninstallation of a
requirement."""
def __init__(self, dist: Distribution) -> None:
self.paths: Set[str] = set()
self._refuse: Set[str] = set()
@ -346,7 +348,7 @@ class UninstallPathSet:
# __pycache__ files can show up after 'installed-files.txt' is created,
# due to imports
if os.path.splitext(path)[1] == '.py':
if os.path.splitext(path)[1] == ".py":
self.add(cache_from_source(path))
def add_pth(self, pth_file: str, entry: str) -> None:
@ -369,10 +371,8 @@ class UninstallPathSet:
)
return
dist_name_version = (
self.dist.project_name + "-" + self.dist.version
)
logger.info('Uninstalling %s:', dist_name_version)
dist_name_version = self.dist.project_name + "-" + self.dist.version
logger.info("Uninstalling %s:", dist_name_version)
with indent_log():
if auto_confirm or self._allowed_to_proceed(verbose):
@ -382,16 +382,15 @@ class UninstallPathSet:
for path in sorted(compact(for_rename)):
moved.stash(path)
logger.verbose('Removing file or directory %s', path)
logger.verbose("Removing file or directory %s", path)
for pth in self.pth.values():
pth.remove()
logger.info('Successfully uninstalled %s', dist_name_version)
logger.info("Successfully uninstalled %s", dist_name_version)
def _allowed_to_proceed(self, verbose: bool) -> bool:
"""Display which files would be deleted and prompt for confirmation
"""
"""Display which files would be deleted and prompt for confirmation"""
def _display(msg: str, paths: Iterable[str]) -> None:
if not paths:
@ -410,13 +409,13 @@ class UninstallPathSet:
will_remove = set(self.paths)
will_skip = set()
_display('Would remove:', will_remove)
_display('Would not remove (might be manually added):', will_skip)
_display('Would not remove (outside of prefix):', self._refuse)
_display("Would remove:", will_remove)
_display("Would not remove (might be manually added):", will_skip)
_display("Would not remove (outside of prefix):", self._refuse)
if verbose:
_display('Will actually move:', compress_for_rename(self.paths))
_display("Will actually move:", compress_for_rename(self.paths))
return ask('Proceed (Y/n)? ', ('y', 'n', '')) != 'n'
return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
def rollback(self) -> None:
"""Rollback the changes previously made by remove()."""
@ -426,7 +425,7 @@ class UninstallPathSet:
self.dist.project_name,
)
return
logger.info('Rolling back uninstall of %s', self.dist.project_name)
logger.info("Rolling back uninstall of %s", self.dist.project_name)
self._moved_paths.rollback()
for pth in self.pth.values():
pth.rollback()
@ -447,9 +446,11 @@ class UninstallPathSet:
)
return cls(dist)
if dist_path in {p for p in {sysconfig.get_path("stdlib"),
sysconfig.get_path("platstdlib")}
if p}:
if dist_path in {
p
for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
if p
}:
logger.info(
"Not uninstalling %s at %s, as it is in the standard library.",
dist.key,
@ -459,43 +460,47 @@ class UninstallPathSet:
paths_to_remove = cls(dist)
develop_egg_link = egg_link_path(dist)
develop_egg_link_egg_info = '{}.egg-info'.format(
pkg_resources.to_filename(dist.project_name))
develop_egg_link_egg_info = "{}.egg-info".format(
pkg_resources.to_filename(dist.project_name)
)
egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info)
# Special case for distutils installed package
distutils_egg_info = getattr(dist._provider, 'path', None)
distutils_egg_info = getattr(dist._provider, "path", None)
# Uninstall cases order do matter as in the case of 2 installs of the
# same package, pip needs to uninstall the currently detected version
if (egg_info_exists and dist.egg_info.endswith('.egg-info') and
not dist.egg_info.endswith(develop_egg_link_egg_info)):
if (
egg_info_exists
and dist.egg_info.endswith(".egg-info")
and not dist.egg_info.endswith(develop_egg_link_egg_info)
):
# if dist.egg_info.endswith(develop_egg_link_egg_info), we
# are in fact in the develop_egg_link case
paths_to_remove.add(dist.egg_info)
if dist.has_metadata('installed-files.txt'):
if dist.has_metadata("installed-files.txt"):
for installed_file in dist.get_metadata(
'installed-files.txt').splitlines():
path = os.path.normpath(
os.path.join(dist.egg_info, installed_file)
)
"installed-files.txt"
).splitlines():
path = os.path.normpath(os.path.join(dist.egg_info, installed_file))
paths_to_remove.add(path)
# FIXME: need a test for this elif block
# occurs with --single-version-externally-managed/--record outside
# of pip
elif dist.has_metadata('top_level.txt'):
if dist.has_metadata('namespace_packages.txt'):
namespaces = dist.get_metadata('namespace_packages.txt')
elif dist.has_metadata("top_level.txt"):
if dist.has_metadata("namespace_packages.txt"):
namespaces = dist.get_metadata("namespace_packages.txt")
else:
namespaces = []
for top_level_pkg in [
p for p
in dist.get_metadata('top_level.txt').splitlines()
if p and p not in namespaces]:
p
for p in dist.get_metadata("top_level.txt").splitlines()
if p and p not in namespaces
]:
path = os.path.join(dist.location, top_level_pkg)
paths_to_remove.add(path)
paths_to_remove.add(path + '.py')
paths_to_remove.add(path + '.pyc')
paths_to_remove.add(path + '.pyo')
paths_to_remove.add(path + ".py")
paths_to_remove.add(path + ".pyc")
paths_to_remove.add(path + ".pyo")
elif distutils_egg_info:
raise UninstallationError(
@ -506,17 +511,18 @@ class UninstallPathSet:
)
)
elif dist.location.endswith('.egg'):
elif dist.location.endswith(".egg"):
# package installed by easy_install
# We cannot match on dist.egg_name because it can slightly vary
# i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
paths_to_remove.add(dist.location)
easy_install_egg = os.path.split(dist.location)[1]
easy_install_pth = os.path.join(os.path.dirname(dist.location),
'easy-install.pth')
paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg)
easy_install_pth = os.path.join(
os.path.dirname(dist.location), "easy-install.pth"
)
paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
elif egg_info_exists and dist.egg_info.endswith('.dist-info'):
elif egg_info_exists and dist.egg_info.endswith(".dist-info"):
for path in uninstallation_paths(dist):
paths_to_remove.add(path)
@ -524,40 +530,42 @@ class UninstallPathSet:
# develop egg
with open(develop_egg_link) as fh:
link_pointer = os.path.normcase(fh.readline().strip())
assert (link_pointer == dist.location), (
'Egg-link {} does not match installed location of {} '
'(at {})'.format(
link_pointer, dist.project_name, dist.location)
assert (
link_pointer == dist.location
), "Egg-link {} does not match installed location of {} (at {})".format(
link_pointer, dist.project_name, dist.location
)
paths_to_remove.add(develop_egg_link)
easy_install_pth = os.path.join(os.path.dirname(develop_egg_link),
'easy-install.pth')
easy_install_pth = os.path.join(
os.path.dirname(develop_egg_link), "easy-install.pth"
)
paths_to_remove.add_pth(easy_install_pth, dist.location)
else:
logger.debug(
'Not sure how to uninstall: %s - Check: %s',
dist, dist.location,
"Not sure how to uninstall: %s - Check: %s",
dist,
dist.location,
)
# find distutils scripts= scripts
if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'):
for script in dist.metadata_listdir('scripts'):
if dist.has_metadata("scripts") and dist.metadata_isdir("scripts"):
for script in dist.metadata_listdir("scripts"):
if dist_in_usersite(dist):
bin_dir = get_bin_user()
else:
bin_dir = get_bin_prefix()
paths_to_remove.add(os.path.join(bin_dir, script))
if WINDOWS:
paths_to_remove.add(os.path.join(bin_dir, script) + '.bat')
paths_to_remove.add(os.path.join(bin_dir, script) + ".bat")
# find console_scripts
_scripts_to_remove = []
console_scripts = dist.get_entry_map(group='console_scripts')
console_scripts = dist.get_entry_map(group="console_scripts")
for name in console_scripts.keys():
_scripts_to_remove.extend(_script_names(dist, name, False))
# find gui_scripts
gui_scripts = dist.get_entry_map(group='gui_scripts')
gui_scripts = dist.get_entry_map(group="gui_scripts")
for name in gui_scripts.keys():
_scripts_to_remove.extend(_script_names(dist, name, True))
@ -585,45 +593,41 @@ class UninstallPthEntries:
# have more than "\\sever\share". Valid examples: "\\server\share\" or
# "\\server\share\folder".
if WINDOWS and not os.path.splitdrive(entry)[0]:
entry = entry.replace('\\', '/')
entry = entry.replace("\\", "/")
self.entries.add(entry)
def remove(self) -> None:
logger.verbose('Removing pth entries from %s:', self.file)
logger.verbose("Removing pth entries from %s:", self.file)
# If the file doesn't exist, log a warning and return
if not os.path.isfile(self.file):
logger.warning(
"Cannot remove entries from nonexistent file %s", self.file
)
logger.warning("Cannot remove entries from nonexistent file %s", self.file)
return
with open(self.file, 'rb') as fh:
with open(self.file, "rb") as fh:
# windows uses '\r\n' with py3k, but uses '\n' with py2.x
lines = fh.readlines()
self._saved_lines = lines
if any(b'\r\n' in line for line in lines):
endline = '\r\n'
if any(b"\r\n" in line for line in lines):
endline = "\r\n"
else:
endline = '\n'
endline = "\n"
# handle missing trailing newline
if lines and not lines[-1].endswith(endline.encode("utf-8")):
lines[-1] = lines[-1] + endline.encode("utf-8")
for entry in self.entries:
try:
logger.verbose('Removing entry: %s', entry)
logger.verbose("Removing entry: %s", entry)
lines.remove((entry + endline).encode("utf-8"))
except ValueError:
pass
with open(self.file, 'wb') as fh:
with open(self.file, "wb") as fh:
fh.writelines(lines)
def rollback(self) -> bool:
if self._saved_lines is None:
logger.error(
'Cannot roll back changes to %s, none were made', self.file
)
logger.error("Cannot roll back changes to %s, none were made", self.file)
return False
logger.debug('Rolling %s back to previous state', self.file)
with open(self.file, 'wb') as fh:
logger.debug("Rolling %s back to previous state", self.file)
with open(self.file, "wb") as fh:
fh.writelines(self._saved_lines)
return True