1
1
Fork 0
mirror of https://github.com/pypa/pip synced 2023-12-13 21:30:23 +01:00

Move all remaining type comments to annotations

Use the com2ann tool to convert remaining comments to annotations. Now,
no type comments remain.

https://github.com/ilevkivskyi/com2ann

Some types are not available at runtime (e.g. Literal) or require a
forward reference and so were quoted.
This commit is contained in:
Jon Dufresne 2021-08-27 18:29:39 -07:00
parent 7aaea4e218
commit 44a034a131
8 changed files with 237 additions and 393 deletions

View file

@ -28,8 +28,7 @@ class Hashes:
"""
def __init__(self, hashes=None):
# type: (Dict[str, List[str]]) -> None
def __init__(self, hashes: Dict[str, List[str]] = None) -> None:
"""
:param hashes: A dict of algorithm names pointing to lists of allowed
hex digests
@ -41,8 +40,7 @@ class Hashes:
allowed[alg] = sorted(keys)
self._allowed = allowed
def __and__(self, other):
# type: (Hashes) -> Hashes
def __and__(self, other: "Hashes") -> "Hashes":
if not isinstance(other, Hashes):
return NotImplemented
@ -62,21 +60,14 @@ class Hashes:
return Hashes(new)
@property
def digest_count(self):
# type: () -> int
def digest_count(self) -> int:
return sum(len(digests) for digests in self._allowed.values())
def is_hash_allowed(
self,
hash_name, # type: str
hex_digest, # type: str
):
# type: (...) -> bool
def is_hash_allowed(self, hash_name: str, hex_digest: str) -> bool:
"""Return whether the given hex digest is allowed."""
return hex_digest in self._allowed.get(hash_name, [])
def check_against_chunks(self, chunks):
# type: (Iterator[bytes]) -> None
def check_against_chunks(self, chunks: Iterator[bytes]) -> None:
"""Check good hashes against ones built from iterable of chunks of
data.
@ -99,12 +90,10 @@ class Hashes:
return
self._raise(gots)
def _raise(self, gots):
# type: (Dict[str, _Hash]) -> NoReturn
def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn":
raise HashMismatch(self._allowed, gots)
def check_against_file(self, file):
# type: (BinaryIO) -> None
def check_against_file(self, file: BinaryIO) -> None:
"""Check good hashes against a file-like object
Raise HashMismatch if none match.
@ -112,24 +101,20 @@ class Hashes:
"""
return self.check_against_chunks(read_chunks(file))
def check_against_path(self, path):
# type: (str) -> None
def check_against_path(self, path: str) -> None:
with open(path, "rb") as file:
return self.check_against_file(file)
def __bool__(self):
# type: () -> bool
def __bool__(self) -> bool:
"""Return whether I know any known-good hashes."""
return bool(self._allowed)
def __eq__(self, other):
# type: (object) -> bool
def __eq__(self, other: object) -> bool:
if not isinstance(other, Hashes):
return NotImplemented
return self._allowed == other._allowed
def __hash__(self):
# type: () -> int
def __hash__(self) -> int:
return hash(
",".join(
sorted(
@ -149,13 +134,11 @@ class MissingHashes(Hashes):
"""
def __init__(self):
# type: () -> None
def __init__(self) -> None:
"""Don't offer the ``hashes`` kwarg."""
# Pass our favorite hash in to generate a "gotten hash". With the
# empty list, it will never match, so an error will always raise.
super().__init__(hashes={FAVORITE_HASH: []})
def _raise(self, gots):
# type: (Dict[str, _Hash]) -> NoReturn
def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn":
raise HashMissing(gots[FAVORITE_HASH].hexdigest())

View file

@ -70,8 +70,7 @@ VersionInfo = Tuple[int, int, int]
NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
def get_pip_version():
# type: () -> str
def get_pip_version() -> str:
pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
pip_pkg_dir = os.path.abspath(pip_pkg_dir)
@ -82,8 +81,7 @@ def get_pip_version():
)
def normalize_version_info(py_version_info):
# type: (Tuple[int, ...]) -> Tuple[int, int, int]
def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:
"""
Convert a tuple of ints representing a Python version to one of length
three.
@ -102,8 +100,7 @@ def normalize_version_info(py_version_info):
return cast("VersionInfo", py_version_info)
def ensure_dir(path):
# type: (str) -> None
def ensure_dir(path: str) -> None:
"""os.path.makedirs without EEXIST."""
try:
os.makedirs(path)
@ -113,8 +110,7 @@ def ensure_dir(path):
raise
def get_prog():
# type: () -> str
def get_prog() -> str:
try:
prog = os.path.basename(sys.argv[0])
if prog in ("__main__.py", "-c"):
@ -129,13 +125,11 @@ def get_prog():
# Retry every half second for up to 3 seconds
# Tenacity raises RetryError by default, explicitly raise the original exception
@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
def rmtree(dir, ignore_errors=False):
# type: (str, bool) -> None
def rmtree(dir: str, ignore_errors: bool = False) -> None:
shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)
def rmtree_errorhandler(func, path, exc_info):
# type: (Callable[..., Any], str, ExcInfo) -> None
def rmtree_errorhandler(func: Callable[..., Any], path: str, exc_info: ExcInfo) -> None:
"""On Windows, the files in .svn are read-only, so when rmtree() tries to
remove them, an exception is thrown. We catch that here, remove the
read-only attribute, and hopefully continue without problems."""
@ -155,8 +149,7 @@ def rmtree_errorhandler(func, path, exc_info):
raise
def display_path(path):
# type: (str) -> str
def display_path(path: str) -> str:
"""Gives the display value for a given path, making it relative to cwd
if possible."""
path = os.path.normcase(os.path.abspath(path))
@ -165,8 +158,7 @@ def display_path(path):
return path
def backup_dir(dir, ext=".bak"):
# type: (str, str) -> str
def backup_dir(dir: str, ext: str = ".bak") -> str:
"""Figure out the name of a directory to back up the given dir to
(adding .bak, .bak2, etc)"""
n = 1
@ -177,16 +169,14 @@ def backup_dir(dir, ext=".bak"):
return dir + extension
def ask_path_exists(message, options):
# type: (str, Iterable[str]) -> str
def ask_path_exists(message: str, options: Iterable[str]) -> str:
for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
if action in options:
return action
return ask(message, options)
def _check_no_input(message):
# type: (str) -> None
def _check_no_input(message: str) -> None:
"""Raise an error if no input is allowed."""
if os.environ.get("PIP_NO_INPUT"):
raise Exception(
@ -194,8 +184,7 @@ def _check_no_input(message):
)
def ask(message, options):
# type: (str, Iterable[str]) -> str
def ask(message: str, options: Iterable[str]) -> str:
"""Ask the message interactively, with the given possible responses"""
while 1:
_check_no_input(message)
@ -210,22 +199,19 @@ def ask(message, options):
return response
def ask_input(message):
# type: (str) -> str
def ask_input(message: str) -> str:
"""Ask for input interactively."""
_check_no_input(message)
return input(message)
def ask_password(message):
# type: (str) -> str
def ask_password(message: str) -> str:
"""Ask for a password interactively."""
_check_no_input(message)
return getpass.getpass(message)
def strtobool(val):
# type: (str) -> int
def strtobool(val: str) -> int:
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
@ -241,8 +227,7 @@ def strtobool(val):
raise ValueError(f"invalid truth value {val!r}")
def format_size(bytes):
# type: (float) -> str
def format_size(bytes: float) -> str:
if bytes > 1000 * 1000:
return "{:.1f} MB".format(bytes / 1000.0 / 1000)
elif bytes > 10 * 1000:
@ -253,8 +238,7 @@ def format_size(bytes):
return "{} bytes".format(int(bytes))
def tabulate(rows):
# type: (Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]
def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:
"""Return a list of formatted rows and a list of column sizes.
For example::
@ -285,8 +269,7 @@ def is_installable_dir(path: str) -> bool:
return False
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
# type: (BinaryIO, int) -> Iterator[bytes]
def read_chunks(file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE) -> Iterator[bytes]:
"""Yield pieces of data from a file-like object until EOF."""
while True:
chunk = file.read(size)
@ -295,8 +278,7 @@ def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
yield chunk
def normalize_path(path, resolve_symlinks=True):
# type: (str, bool) -> str
def normalize_path(path: str, resolve_symlinks: bool = True) -> str:
"""
Convert a path to its canonical, case-normalized, absolute version.
@ -309,8 +291,7 @@ def normalize_path(path, resolve_symlinks=True):
return os.path.normcase(path)
def splitext(path):
# type: (str) -> Tuple[str, str]
def splitext(path: str) -> Tuple[str, str]:
"""Like os.path.splitext, but take off .tar too"""
base, ext = posixpath.splitext(path)
if base.lower().endswith(".tar"):
@ -319,8 +300,7 @@ def splitext(path):
return base, ext
def renames(old, new):
# type: (str, str) -> None
def renames(old: str, new: str) -> None:
"""Like os.renames(), but handles renaming across devices."""
# Implementation borrowed from os.renames().
head, tail = os.path.split(new)
@ -337,8 +317,7 @@ def renames(old, new):
pass
def is_local(path):
# type: (str) -> bool
def is_local(path: str) -> bool:
"""
Return True if path is within sys.prefix, if we're running in a virtualenv.
@ -352,8 +331,7 @@ def is_local(path):
return path.startswith(normalize_path(sys.prefix))
def dist_is_local(dist):
# type: (Distribution) -> bool
def dist_is_local(dist: Distribution) -> bool:
"""
Return True if given Distribution object is installed locally
(i.e. within current virtualenv).
@ -364,16 +342,14 @@ def dist_is_local(dist):
return is_local(dist_location(dist))
def dist_in_usersite(dist):
# type: (Distribution) -> bool
def dist_in_usersite(dist: Distribution) -> bool:
"""
Return True if given Distribution is installed in user site.
"""
return dist_location(dist).startswith(normalize_path(user_site))
def dist_in_site_packages(dist):
# type: (Distribution) -> bool
def dist_in_site_packages(dist: Distribution) -> bool:
"""
Return True if given Distribution is installed in
sysconfig.get_python_lib().
@ -381,8 +357,7 @@ def dist_in_site_packages(dist):
return dist_location(dist).startswith(normalize_path(site_packages))
def dist_is_editable(dist):
# type: (Distribution) -> bool
def dist_is_editable(dist: Distribution) -> bool:
"""
Return True if given Distribution is an editable install.
"""
@ -394,14 +369,13 @@ def dist_is_editable(dist):
def get_installed_distributions(
local_only=True, # type: bool
skip=stdlib_pkgs, # type: Container[str]
include_editables=True, # type: bool
editables_only=False, # type: bool
user_only=False, # type: bool
paths=None, # type: Optional[List[str]]
):
# type: (...) -> List[Distribution]
local_only: bool = True,
skip: Container[str] = stdlib_pkgs,
include_editables: bool = True,
editables_only: bool = False,
user_only: bool = False,
paths: Optional[List[str]] = None,
) -> List[Distribution]:
"""Return a list of installed Distribution objects.
Left for compatibility until direct pkg_resources uses are refactored out.
@ -423,8 +397,7 @@ def get_installed_distributions(
return [cast(_Dist, dist)._dist for dist in dists]
def get_distribution(req_name):
# type: (str) -> Optional[Distribution]
def get_distribution(req_name: str) -> Optional[Distribution]:
"""Given a requirement name, return the installed Distribution object.
This searches from *all* distributions available in the environment, to
@ -441,8 +414,7 @@ def get_distribution(req_name):
return cast(_Dist, dist)._dist
def egg_link_path(dist):
# type: (Distribution) -> Optional[str]
def egg_link_path(dist: Distribution) -> Optional[str]:
"""
Return the path for the .egg-link file if it exists, otherwise, None.
@ -477,8 +449,7 @@ def egg_link_path(dist):
return None
def dist_location(dist):
# type: (Distribution) -> str
def dist_location(dist: Distribution) -> str:
"""
Get the site-packages location of this distribution. Generally
this is dist.location, except in the case of develop-installed
@ -493,17 +464,15 @@ def dist_location(dist):
return normalize_path(dist.location)
def write_output(msg, *args):
# type: (Any, Any) -> None
def write_output(msg: Any, *args: Any) -> None:
logger.info(msg, *args)
class StreamWrapper(StringIO):
orig_stream = None # type: TextIO
orig_stream: TextIO = None
@classmethod
def from_stream(cls, orig_stream):
# type: (TextIO) -> StreamWrapper
def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper":
cls.orig_stream = orig_stream
return cls()
@ -515,8 +484,7 @@ class StreamWrapper(StringIO):
@contextlib.contextmanager
def captured_output(stream_name):
# type: (str) -> Iterator[StreamWrapper]
def captured_output(stream_name: str) -> Iterator[StreamWrapper]:
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO.
@ -530,8 +498,7 @@ def captured_output(stream_name):
setattr(sys, stream_name, orig_stdout)
def captured_stdout():
# type: () -> ContextManager[StreamWrapper]
def captured_stdout() -> ContextManager[StreamWrapper]:
"""Capture the output of sys.stdout:
with captured_stdout() as stdout:
@ -543,8 +510,7 @@ def captured_stdout():
return captured_output("stdout")
def captured_stderr():
# type: () -> ContextManager[StreamWrapper]
def captured_stderr() -> ContextManager[StreamWrapper]:
"""
See captured_stdout().
"""
@ -552,16 +518,14 @@ def captured_stderr():
# Simulates an enum
def enum(*sequential, **named):
# type: (*Any, **Any) -> Type[Any]
def enum(*sequential: Any, **named: Any) -> Type[Any]:
enums = dict(zip(sequential, range(len(sequential))), **named)
reverse = {value: key for key, value in enums.items()}
enums["reverse_mapping"] = reverse
return type("Enum", (), enums)
def build_netloc(host, port):
# type: (str, Optional[int]) -> str
def build_netloc(host: str, port: Optional[int]) -> str:
"""
Build a netloc from a host-port pair
"""
@ -573,8 +537,7 @@ def build_netloc(host, port):
return f"{host}:{port}"
def build_url_from_netloc(netloc, scheme="https"):
# type: (str, str) -> str
def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:
"""
Build a full URL from a netloc.
"""
@ -584,8 +547,7 @@ def build_url_from_netloc(netloc, scheme="https"):
return f"{scheme}://{netloc}"
def parse_netloc(netloc):
# type: (str) -> Tuple[str, Optional[int]]
def parse_netloc(netloc: str) -> Tuple[str, Optional[int]]:
"""
Return the host-port pair from a netloc.
"""
@ -594,8 +556,7 @@ def parse_netloc(netloc):
return parsed.hostname, parsed.port
def split_auth_from_netloc(netloc):
# type: (str) -> NetlocTuple
def split_auth_from_netloc(netloc: str) -> NetlocTuple:
"""
Parse out and remove the auth information from a netloc.
@ -608,7 +569,7 @@ def split_auth_from_netloc(netloc):
# behaves if more than one @ is present (which can be checked using
# the password attribute of urlsplit()'s return value).
auth, netloc = netloc.rsplit("@", 1)
pw = None # type: Optional[str]
pw: Optional[str] = None
if ":" in auth:
# Split from the left because that's how urllib.parse.urlsplit()
# behaves if more than one : is present (which again can be checked
@ -624,8 +585,7 @@ def split_auth_from_netloc(netloc):
return netloc, (user, pw)
def redact_netloc(netloc):
# type: (str) -> str
def redact_netloc(netloc: str) -> str:
"""
Replace the sensitive data in a netloc with "****", if it exists.
@ -647,8 +607,9 @@ def redact_netloc(netloc):
)
def _transform_url(url, transform_netloc):
# type: (str, Callable[[str], Tuple[Any, ...]]) -> Tuple[str, NetlocTuple]
def _transform_url(
url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]
) -> Tuple[str, NetlocTuple]:
"""Transform and replace netloc in a url.
transform_netloc is a function taking the netloc and returning a
@ -666,18 +627,15 @@ def _transform_url(url, transform_netloc):
return surl, cast("NetlocTuple", netloc_tuple)
def _get_netloc(netloc):
# type: (str) -> NetlocTuple
def _get_netloc(netloc: str) -> NetlocTuple:
return split_auth_from_netloc(netloc)
def _redact_netloc(netloc):
# type: (str) -> Tuple[str,]
def _redact_netloc(netloc: str) -> Tuple[str]:
return (redact_netloc(netloc),)
def split_auth_netloc_from_url(url):
# type: (str) -> Tuple[str, str, Tuple[str, str]]
def split_auth_netloc_from_url(url: str) -> Tuple[str, str, Tuple[str, str]]:
"""
Parse a url into separate netloc, auth, and url with no auth.
@ -687,41 +645,31 @@ def split_auth_netloc_from_url(url):
return url_without_auth, netloc, auth
def remove_auth_from_url(url):
# type: (str) -> str
def remove_auth_from_url(url: str) -> str:
"""Return a copy of url with 'username:password@' removed."""
# username/pass params are passed to subversion through flags
# and are not recognized in the url.
return _transform_url(url, _get_netloc)[0]
def redact_auth_from_url(url):
# type: (str) -> str
def redact_auth_from_url(url: str) -> str:
"""Replace the password in a given url with ****."""
return _transform_url(url, _redact_netloc)[0]
class HiddenText:
def __init__(
self,
secret, # type: str
redacted, # type: str
):
# type: (...) -> None
def __init__(self, secret: str, redacted: str) -> None:
self.secret = secret
self.redacted = redacted
def __repr__(self):
# type: (...) -> str
def __repr__(self) -> str:
return "<HiddenText {!r}>".format(str(self))
def __str__(self):
# type: (...) -> str
def __str__(self) -> str:
return self.redacted
# This is useful for testing.
def __eq__(self, other):
# type: (Any) -> bool
def __eq__(self, other: Any) -> bool:
if type(self) != type(other):
return False
@ -730,19 +678,16 @@ class HiddenText:
return self.secret == other.secret
def hide_value(value):
# type: (str) -> HiddenText
def hide_value(value: str) -> HiddenText:
return HiddenText(value, redacted="****")
def hide_url(url):
# type: (str) -> HiddenText
def hide_url(url: str) -> HiddenText:
redacted = redact_auth_from_url(url)
return HiddenText(url, redacted=redacted)
def protect_pip_from_modification_on_windows(modifying_pip):
# type: (bool) -> None
def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:
"""Protection of pip.exe from modification on Windows
On Windows, any operation modifying pip should be run as:
@ -768,14 +713,12 @@ def protect_pip_from_modification_on_windows(modifying_pip):
)
def is_console_interactive():
# type: () -> bool
def is_console_interactive() -> bool:
"""Is this console interactive?"""
return sys.stdin is not None and sys.stdin.isatty()
def hash_file(path, blocksize=1 << 20):
# type: (str, int) -> Tuple[Any, int]
def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:
"""Return (hash, length) for path using hashlib.sha256()"""
h = hashlib.sha256()
@ -787,8 +730,7 @@ def hash_file(path, blocksize=1 << 20):
return h, length
def is_wheel_installed():
# type: () -> bool
def is_wheel_installed() -> bool:
"""
Return whether the wheel package is installed.
"""
@ -800,8 +742,7 @@ def is_wheel_installed():
return True
def pairwise(iterable):
# type: (Iterable[Any]) -> Iterator[Tuple[Any, Any]]
def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
"""
Return paired elements.
@ -813,10 +754,9 @@ def pairwise(iterable):
def partition(
pred, # type: Callable[[T], bool]
iterable, # type: Iterable[T]
):
# type: (...) -> Tuple[Iterable[T], Iterable[T]]
pred: Callable[[T], bool],
iterable: Iterable[T],
) -> Tuple[Iterable[T], Iterable[T]]:
"""
Use a predicate to partition entries into false entries and true entries,
like

View file

@ -10,37 +10,29 @@ class KeyBasedCompareMixin:
__slots__ = ["_compare_key", "_defining_class"]
def __init__(self, key, defining_class):
# type: (Any, Type[KeyBasedCompareMixin]) -> None
def __init__(self, key: Any, defining_class: Type["KeyBasedCompareMixin"]) -> None:
self._compare_key = key
self._defining_class = defining_class
def __hash__(self):
# type: () -> int
def __hash__(self) -> int:
return hash(self._compare_key)
def __lt__(self, other):
# type: (Any) -> bool
def __lt__(self, other: Any) -> bool:
return self._compare(other, operator.__lt__)
def __le__(self, other):
# type: (Any) -> bool
def __le__(self, other: Any) -> bool:
return self._compare(other, operator.__le__)
def __gt__(self, other):
# type: (Any) -> bool
def __gt__(self, other: Any) -> bool:
return self._compare(other, operator.__gt__)
def __ge__(self, other):
# type: (Any) -> bool
def __ge__(self, other: Any) -> bool:
return self._compare(other, operator.__ge__)
def __eq__(self, other):
# type: (Any) -> bool
def __eq__(self, other: Any) -> bool:
return self._compare(other, operator.__eq__)
def _compare(self, other, method):
# type: (Any, Callable[[Any, Any], bool]) -> bool
def _compare(self, other: Any, method: Callable[[Any, Any], bool]) -> bool:
if not isinstance(other, self._defining_class):
return NotImplemented

View file

@ -30,12 +30,11 @@ CommandArgs = List[Union[str, HiddenText]]
LOG_DIVIDER = "----------------------------------------"
def make_command(*args):
# type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
def make_command(*args: Union[str, HiddenText, CommandArgs]) -> CommandArgs:
"""
Create a CommandArgs object.
"""
command_args = [] # type: CommandArgs
command_args: CommandArgs = []
for arg in args:
# Check for list instead of CommandArgs since CommandArgs is
# only known during type-checking.
@ -48,8 +47,7 @@ def make_command(*args):
return command_args
def format_command_args(args):
# type: (Union[List[str], CommandArgs]) -> str
def format_command_args(args: Union[List[str], CommandArgs]) -> str:
"""
Format command arguments for display.
"""
@ -64,8 +62,7 @@ def format_command_args(args):
)
def reveal_command_args(args):
# type: (Union[List[str], CommandArgs]) -> List[str]
def reveal_command_args(args: Union[List[str], CommandArgs]) -> List[str]:
"""
Return the arguments in their raw, unredacted form.
"""
@ -73,12 +70,11 @@ def reveal_command_args(args):
def make_subprocess_output_error(
cmd_args, # type: Union[List[str], CommandArgs]
cwd, # type: Optional[str]
lines, # type: List[str]
exit_status, # type: int
):
# type: (...) -> str
cmd_args: Union[List[str], CommandArgs],
cwd: Optional[str],
lines: List[str],
exit_status: int,
) -> str:
"""
Create and return the error message to use to log a subprocess error
with command output.
@ -109,19 +105,18 @@ def make_subprocess_output_error(
def call_subprocess(
cmd, # type: Union[List[str], CommandArgs]
show_stdout=False, # type: bool
cwd=None, # type: Optional[str]
on_returncode="raise", # type: Literal["raise", "warn", "ignore"]
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
command_desc=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
unset_environ=None, # type: Optional[Iterable[str]]
spinner=None, # type: Optional[SpinnerInterface]
log_failed_cmd=True, # type: Optional[bool]
stdout_only=False, # type: Optional[bool]
):
# type: (...) -> str
cmd: Union[List[str], CommandArgs],
show_stdout: bool = False,
cwd: Optional[str] = None,
on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
extra_ok_returncodes: Optional[Iterable[int]] = None,
command_desc: Optional[str] = None,
extra_environ: Optional[Mapping[str, Any]] = None,
unset_environ: Optional[Iterable[str]] = None,
spinner: Optional[SpinnerInterface] = None,
log_failed_cmd: Optional[bool] = True,
stdout_only: Optional[bool] = False,
) -> str:
"""
Args:
show_stdout: if true, use INFO to log the subprocess's stderr and
@ -206,7 +201,7 @@ def call_subprocess(
proc.stdin.close()
# In this mode, stdout and stderr are in the same pipe.
while True:
line = proc.stdout.readline() # type: str
line: str = proc.stdout.readline()
if not line:
break
line = line.rstrip()
@ -271,8 +266,7 @@ def call_subprocess(
return output
def runner_with_spinner_message(message):
# type: (str) -> Callable[..., None]
def runner_with_spinner_message(message: str) -> Callable[..., None]:
"""Provide a subprocess_runner that shows a spinner message.
Intended for use with for pep517's Pep517HookCaller. Thus, the runner has
@ -280,11 +274,10 @@ def runner_with_spinner_message(message):
"""
def runner(
cmd, # type: List[str]
cwd=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
):
# type: (...) -> None
cmd: List[str],
cwd: Optional[str] = None,
extra_environ: Optional[Mapping[str, Any]] = None,
) -> None:
with open_spinner(message) as spinner:
call_subprocess(
cmd,

View file

@ -52,8 +52,7 @@ SCP_REGEX = re.compile(
)
def looks_like_hash(sha):
# type: (str) -> bool
def looks_like_hash(sha: str) -> bool:
return bool(HASH_REGEX.match(sha))
@ -74,12 +73,10 @@ class Git(VersionControl):
default_arg_rev = "HEAD"
@staticmethod
def get_base_rev_args(rev):
# type: (str) -> List[str]
def get_base_rev_args(rev: str) -> List[str]:
return [rev]
def is_immutable_rev_checkout(self, url, dest):
# type: (str, str) -> bool
def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
_, rev_options = self.get_url_rev_options(hide_url(url))
if not rev_options.rev:
return False
@ -101,8 +98,7 @@ class Git(VersionControl):
return tuple(int(c) for c in match.groups())
@classmethod
def get_current_branch(cls, location):
# type: (str) -> Optional[str]
def get_current_branch(cls, location: str) -> Optional[str]:
"""
Return the current branch, or None if HEAD isn't at a branch
(e.g. detached HEAD).
@ -127,8 +123,7 @@ class Git(VersionControl):
return None
@classmethod
def get_revision_sha(cls, dest, rev):
# type: (str, str) -> Tuple[Optional[str], bool]
def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]:
"""
Return (sha_or_none, is_branch), where sha_or_none is a commit hash
if the revision names a remote branch or tag, otherwise None.
@ -174,8 +169,7 @@ class Git(VersionControl):
return (sha, False)
@classmethod
def _should_fetch(cls, dest, rev):
# type: (str, str) -> bool
def _should_fetch(cls, dest: str, rev: str) -> bool:
"""
Return true if rev is a ref or is a commit that we don't have locally.
@ -198,8 +192,9 @@ class Git(VersionControl):
return True
@classmethod
def resolve_revision(cls, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> RevOptions
def resolve_revision(
cls, dest: str, url: HiddenText, rev_options: RevOptions
) -> RevOptions:
"""
Resolve a revision to a new RevOptions object with the SHA1 of the
branch, tag, or ref if found.
@ -243,8 +238,7 @@ class Git(VersionControl):
return rev_options
@classmethod
def is_commit_id_equal(cls, dest, name):
# type: (str, Optional[str]) -> bool
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
"""
Return whether the current commit hash equals the given name.
@ -258,8 +252,7 @@ class Git(VersionControl):
return cls.get_revision(dest) == name
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
rev_display = rev_options.to_display()
logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))
if self.get_git_version() >= (2, 17):
@ -314,8 +307,7 @@ class Git(VersionControl):
#: repo may contain submodules
self.update_submodules(dest)
def switch(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
self.run_command(
make_command("config", "remote.origin.url", url),
cwd=dest,
@ -325,8 +317,7 @@ class Git(VersionControl):
self.update_submodules(dest)
def update(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
# First fetch changes from the default remote
if self.get_git_version() >= (1, 9):
# fetch tags in addition to everything else
@ -341,8 +332,7 @@ class Git(VersionControl):
self.update_submodules(dest)
@classmethod
def get_remote_url(cls, location):
# type: (str) -> str
def get_remote_url(cls, location: str) -> str:
"""
Return URL of the first remote encountered.
@ -372,8 +362,7 @@ class Git(VersionControl):
return cls._git_remote_to_pip_url(url.strip())
@staticmethod
def _git_remote_to_pip_url(url):
# type: (str) -> str
def _git_remote_to_pip_url(url: str) -> str:
"""
Convert a remote url from what git uses to what pip accepts.
@ -404,8 +393,7 @@ class Git(VersionControl):
raise RemoteNotValidError(url)
@classmethod
def has_commit(cls, location, rev):
# type: (str, str) -> bool
def has_commit(cls, location: str, rev: str) -> bool:
"""
Check if rev is a commit that is available in the local repository.
"""
@ -421,8 +409,7 @@ class Git(VersionControl):
return True
@classmethod
def get_revision(cls, location, rev=None):
# type: (str, Optional[str]) -> str
def get_revision(cls, location: str, rev: Optional[str] = None) -> str:
if rev is None:
rev = "HEAD"
current_rev = cls.run_command(
@ -434,8 +421,7 @@ class Git(VersionControl):
return current_rev.strip()
@classmethod
def get_subdirectory(cls, location):
# type: (str) -> Optional[str]
def get_subdirectory(cls, location: str) -> Optional[str]:
"""
Return the path to Python project root, relative to the repo root.
Return None if the project root is in the repo root.
@ -453,8 +439,7 @@ class Git(VersionControl):
return find_path_to_project_root_from_repo_root(location, repo_root)
@classmethod
def get_url_rev_and_auth(cls, url):
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
"""
Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
That's required because although they use SSH they sometimes don't
@ -485,8 +470,7 @@ class Git(VersionControl):
return url, rev, user_pass
@classmethod
def update_submodules(cls, location):
# type: (str) -> None
def update_submodules(cls, location: str) -> None:
if not os.path.exists(os.path.join(location, ".gitmodules")):
return
cls.run_command(
@ -495,8 +479,7 @@ class Git(VersionControl):
)
@classmethod
def get_repository_root(cls, location):
# type: (str) -> Optional[str]
def get_repository_root(cls, location: str) -> Optional[str]:
loc = super().get_repository_root(location)
if loc:
return loc
@ -521,8 +504,7 @@ class Git(VersionControl):
return os.path.normpath(r.rstrip("\r\n"))
@staticmethod
def should_add_vcs_url_prefix(repo_url):
# type: (str) -> bool
def should_add_vcs_url_prefix(repo_url: str) -> bool:
"""In either https or ssh form, requirements must be prefixed with git+."""
return True

View file

@ -34,18 +34,15 @@ class Subversion(VersionControl):
schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file")
@classmethod
def should_add_vcs_url_prefix(cls, remote_url):
# type: (str) -> bool
def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
return True
@staticmethod
def get_base_rev_args(rev):
# type: (str) -> List[str]
def get_base_rev_args(rev: str) -> List[str]:
return ["-r", rev]
@classmethod
def get_revision(cls, location):
# type: (str) -> str
def get_revision(cls, location: str) -> str:
"""
Return the maximum revision for all files under a given location
"""
@ -74,8 +71,9 @@ class Subversion(VersionControl):
return str(revision)
@classmethod
def get_netloc_and_auth(cls, netloc, scheme):
# type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]
def get_netloc_and_auth(
cls, netloc: str, scheme: str
) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
"""
This override allows the auth information to be passed to svn via the
--username and --password options instead of via the URL.
@ -88,8 +86,7 @@ class Subversion(VersionControl):
return split_auth_from_netloc(netloc)
@classmethod
def get_url_rev_and_auth(cls, url):
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
# hotfix the URL scheme after removing svn+ from svn+ssh:// readd it
url, rev, user_pass = super().get_url_rev_and_auth(url)
if url.startswith("ssh://"):
@ -97,9 +94,10 @@ class Subversion(VersionControl):
return url, rev, user_pass
@staticmethod
def make_rev_args(username, password):
# type: (Optional[str], Optional[HiddenText]) -> CommandArgs
extra_args = [] # type: CommandArgs
def make_rev_args(
username: Optional[str], password: Optional[HiddenText]
) -> CommandArgs:
extra_args: CommandArgs = []
if username:
extra_args += ["--username", username]
if password:
@ -108,8 +106,7 @@ class Subversion(VersionControl):
return extra_args
@classmethod
def get_remote_url(cls, location):
# type: (str) -> str
def get_remote_url(cls, location: str) -> str:
# In cases where the source is in a subdirectory, we have to look up in
# the location until we find a valid project root.
orig_location = location
@ -133,8 +130,7 @@ class Subversion(VersionControl):
return url
@classmethod
def _get_svn_url_rev(cls, location):
# type: (str) -> Tuple[Optional[str], int]
def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]:
from pip._internal.exceptions import InstallationError
entries_path = os.path.join(location, cls.dirname, "entries")
@ -184,13 +180,11 @@ class Subversion(VersionControl):
return url, rev
@classmethod
def is_commit_id_equal(cls, dest, name):
# type: (str, Optional[str]) -> bool
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
"""Always assume the versions don't match"""
return False
def __init__(self, use_interactive=None):
# type: (bool) -> None
def __init__(self, use_interactive: bool = None) -> None:
if use_interactive is None:
use_interactive = is_console_interactive()
self.use_interactive = use_interactive
@ -200,12 +194,11 @@ class Subversion(VersionControl):
# Special value definitions:
# None: Not evaluated yet.
# Empty tuple: Could not parse version.
self._vcs_version = None # type: Optional[Tuple[int, ...]]
self._vcs_version: Optional[Tuple[int, ...]] = None
super().__init__()
def call_vcs_version(self):
# type: () -> Tuple[int, ...]
def call_vcs_version(self) -> Tuple[int, ...]:
"""Query the version of the currently installed Subversion client.
:return: A tuple containing the parts of the version information or
@ -233,8 +226,7 @@ class Subversion(VersionControl):
return parsed_version
def get_vcs_version(self):
# type: () -> Tuple[int, ...]
def get_vcs_version(self) -> Tuple[int, ...]:
"""Return the version of the currently installed Subversion client.
If the version of the Subversion client has already been queried,
@ -254,8 +246,7 @@ class Subversion(VersionControl):
self._vcs_version = vcs_version
return vcs_version
def get_remote_call_options(self):
# type: () -> CommandArgs
def get_remote_call_options(self) -> CommandArgs:
"""Return options to be used on calls to Subversion that contact the server.
These options are applicable for the following ``svn`` subcommands used
@ -286,8 +277,7 @@ class Subversion(VersionControl):
return []
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
rev_display = rev_options.to_display()
logger.info(
"Checking out %s%s to %s",
@ -305,8 +295,7 @@ class Subversion(VersionControl):
)
self.run_command(cmd_args)
def switch(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
cmd_args = make_command(
"switch",
self.get_remote_call_options(),
@ -316,8 +305,7 @@ class Subversion(VersionControl):
)
self.run_command(cmd_args)
def update(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
cmd_args = make_command(
"update",
self.get_remote_call_options(),

View file

@ -49,8 +49,7 @@ logger = logging.getLogger(__name__)
AuthInfo = Tuple[Optional[str], Optional[str]]
def is_url(name):
# type: (str) -> bool
def is_url(name: str) -> bool:
"""
Return true if the name looks like a URL.
"""
@ -60,8 +59,9 @@ def is_url(name):
return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):
# type: (str, str, str, Optional[str]) -> str
def make_vcs_requirement_url(
repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
) -> str:
"""
Return the URL for a VCS requirement.
@ -77,8 +77,9 @@ def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None):
return req
def find_path_to_project_root_from_repo_root(location, repo_root):
# type: (str, str) -> Optional[str]
def find_path_to_project_root_from_repo_root(
location: str, repo_root: str
) -> Optional[str]:
"""
Find the the Python project's root by searching up the filesystem from
`location`. Return the path to project root relative to `repo_root`.
@ -126,11 +127,10 @@ class RevOptions:
def __init__(
self,
vc_class, # type: Type[VersionControl]
rev=None, # type: Optional[str]
extra_args=None, # type: Optional[CommandArgs]
):
# type: (...) -> None
vc_class: Type["VersionControl"],
rev: Optional[str] = None,
extra_args: Optional[CommandArgs] = None,
) -> None:
"""
Args:
vc_class: a VersionControl subclass.
@ -143,26 +143,23 @@ class RevOptions:
self.extra_args = extra_args
self.rev = rev
self.vc_class = vc_class
self.branch_name = None # type: Optional[str]
self.branch_name: Optional[str] = None
def __repr__(self):
# type: () -> str
def __repr__(self) -> str:
return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
@property
def arg_rev(self):
# type: () -> Optional[str]
def arg_rev(self) -> Optional[str]:
if self.rev is None:
return self.vc_class.default_arg_rev
return self.rev
def to_args(self):
# type: () -> CommandArgs
def to_args(self) -> CommandArgs:
"""
Return the VCS-specific command arguments.
"""
args = [] # type: CommandArgs
args: CommandArgs = []
rev = self.arg_rev
if rev is not None:
args += self.vc_class.get_base_rev_args(rev)
@ -170,15 +167,13 @@ class RevOptions:
return args
def to_display(self):
# type: () -> str
def to_display(self) -> str:
if not self.rev:
return ""
return f" (to revision {self.rev})"
def make_new(self, rev):
# type: (str) -> RevOptions
def make_new(self, rev: str) -> "RevOptions":
"""
Make a copy of the current instance, but with a new rev.
@ -189,40 +184,34 @@ class RevOptions:
class VcsSupport:
_registry = {} # type: Dict[str, VersionControl]
_registry: Dict[str, "VersionControl"] = {}
schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
def __init__(self):
# type: () -> None
def __init__(self) -> None:
# Register more schemes with urlparse for various version control
# systems
urllib.parse.uses_netloc.extend(self.schemes)
super().__init__()
def __iter__(self):
# type: () -> Iterator[str]
def __iter__(self) -> Iterator[str]:
return self._registry.__iter__()
@property
def backends(self):
# type: () -> List[VersionControl]
def backends(self) -> List["VersionControl"]:
return list(self._registry.values())
@property
def dirnames(self):
# type: () -> List[str]
def dirnames(self) -> List[str]:
return [backend.dirname for backend in self.backends]
@property
def all_schemes(self):
# type: () -> List[str]
schemes = [] # type: List[str]
def all_schemes(self) -> List[str]:
schemes: List[str] = []
for backend in self.backends:
schemes.extend(backend.schemes)
return schemes
def register(self, cls):
# type: (Type[VersionControl]) -> None
def register(self, cls: Type["VersionControl"]) -> None:
if not hasattr(cls, "name"):
logger.warning("Cannot register VCS %s", cls.__name__)
return
@ -230,13 +219,11 @@ class VcsSupport:
self._registry[cls.name] = cls()
logger.debug("Registered VCS backend: %s", cls.name)
def unregister(self, name):
# type: (str) -> None
def unregister(self, name: str) -> None:
if name in self._registry:
del self._registry[name]
def get_backend_for_dir(self, location):
# type: (str) -> Optional[VersionControl]
def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
"""
Return a VersionControl object if a repository of that type is found
at the given directory.
@ -259,8 +246,7 @@ class VcsSupport:
inner_most_repo_path = max(vcs_backends, key=len)
return vcs_backends[inner_most_repo_path]
def get_backend_for_scheme(self, scheme):
# type: (str) -> Optional[VersionControl]
def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
"""
Return a VersionControl object or None.
"""
@ -269,8 +255,7 @@ class VcsSupport:
return vcs_backend
return None
def get_backend(self, name):
# type: (str) -> Optional[VersionControl]
def get_backend(self, name: str) -> Optional["VersionControl"]:
"""
Return a VersionControl object or None.
"""
@ -286,14 +271,13 @@ class VersionControl:
dirname = ""
repo_name = ""
# List of supported schemes for this Version Control
schemes = () # type: Tuple[str, ...]
schemes: Tuple[str, ...] = ()
# Iterable of environment variable names to pass to call_subprocess().
unset_environ = () # type: Tuple[str, ...]
default_arg_rev = None # type: Optional[str]
unset_environ: Tuple[str, ...] = ()
default_arg_rev: Optional[str] = None
@classmethod
def should_add_vcs_url_prefix(cls, remote_url):
# type: (str) -> bool
def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
"""
Return whether the vcs prefix (e.g. "git+") should be added to a
repository's remote url when used in a requirement.
@ -301,8 +285,7 @@ class VersionControl:
return not remote_url.lower().startswith(f"{cls.name}:")
@classmethod
def get_subdirectory(cls, location):
# type: (str) -> Optional[str]
def get_subdirectory(cls, location: str) -> Optional[str]:
"""
Return the path to Python project root, relative to the repo root.
Return None if the project root is in the repo root.
@ -310,16 +293,14 @@ class VersionControl:
return None
@classmethod
def get_requirement_revision(cls, repo_dir):
# type: (str) -> str
def get_requirement_revision(cls, repo_dir: str) -> str:
"""
Return the revision string that should be used in a requirement.
"""
return cls.get_revision(repo_dir)
@classmethod
def get_src_requirement(cls, repo_dir, project_name):
# type: (str, str) -> str
def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
"""
Return the requirement string to use to redownload the files
currently at the given repository directory.
@ -343,8 +324,7 @@ class VersionControl:
return req
@staticmethod
def get_base_rev_args(rev):
# type: (str) -> List[str]
def get_base_rev_args(rev: str) -> List[str]:
"""
Return the base revision arguments for a vcs command.
@ -353,8 +333,7 @@ class VersionControl:
"""
raise NotImplementedError
def is_immutable_rev_checkout(self, url, dest):
# type: (str, str) -> bool
def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
"""
Return true if the commit hash checked out at dest matches
the revision in url.
@ -368,8 +347,9 @@ class VersionControl:
return False
@classmethod
def make_rev_options(cls, rev=None, extra_args=None):
# type: (Optional[str], Optional[CommandArgs]) -> RevOptions
def make_rev_options(
cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
) -> RevOptions:
"""
Return a RevOptions object.
@ -380,8 +360,7 @@ class VersionControl:
return RevOptions(cls, rev, extra_args=extra_args)
@classmethod
def _is_local_repository(cls, repo):
# type: (str) -> bool
def _is_local_repository(cls, repo: str) -> bool:
"""
posix absolute paths start with os.path.sep,
win32 ones start with drive (like c:\\folder)
@ -390,8 +369,9 @@ class VersionControl:
return repo.startswith(os.path.sep) or bool(drive)
@classmethod
def get_netloc_and_auth(cls, netloc, scheme):
# type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]]
def get_netloc_and_auth(
cls, netloc: str, scheme: str
) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
"""
Parse the repository URL's netloc, and return the new netloc to use
along with auth information.
@ -410,8 +390,7 @@ class VersionControl:
return netloc, (None, None)
@classmethod
def get_url_rev_and_auth(cls, url):
# type: (str) -> Tuple[str, Optional[str], AuthInfo]
def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
"""
Parse the repository URL to use, and return the URL, revision,
and auth info to use.
@ -441,22 +420,22 @@ class VersionControl:
return url, rev, user_pass
@staticmethod
def make_rev_args(username, password):
# type: (Optional[str], Optional[HiddenText]) -> CommandArgs
def make_rev_args(
username: Optional[str], password: Optional[HiddenText]
) -> CommandArgs:
"""
Return the RevOptions "extra arguments" to use in obtain().
"""
return []
def get_url_rev_options(self, url):
# type: (HiddenText) -> Tuple[HiddenText, RevOptions]
def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
"""
Return the URL and RevOptions object to use in obtain(),
as a tuple (url, rev_options).
"""
secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
username, secret_password = user_pass
password = None # type: Optional[HiddenText]
password: Optional[HiddenText] = None
if secret_password is not None:
password = hide_value(secret_password)
extra_args = self.make_rev_args(username, password)
@ -465,8 +444,7 @@ class VersionControl:
return hide_url(secret_url), rev_options
@staticmethod
def normalize_url(url):
# type: (str) -> str
def normalize_url(url: str) -> str:
"""
Normalize a URL for comparison by unquoting it and removing any
trailing slash.
@ -474,15 +452,13 @@ class VersionControl:
return urllib.parse.unquote(url).rstrip("/")
@classmethod
def compare_urls(cls, url1, url2):
# type: (str, str) -> bool
def compare_urls(cls, url1: str, url2: str) -> bool:
"""
Compare two repo URLs for identity, ignoring incidental differences.
"""
return cls.normalize_url(url1) == cls.normalize_url(url2)
def fetch_new(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
"""
Fetch a revision from a repository, in the case that this is the
first fetch from the repository.
@ -493,8 +469,7 @@ class VersionControl:
"""
raise NotImplementedError
def switch(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
"""
Switch the repo at ``dest`` to point to ``URL``.
@ -503,8 +478,7 @@ class VersionControl:
"""
raise NotImplementedError
def update(self, dest, url, rev_options):
# type: (str, HiddenText, RevOptions) -> None
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
"""
Update an already-existing repo to the given ``rev_options``.
@ -514,8 +488,7 @@ class VersionControl:
raise NotImplementedError
@classmethod
def is_commit_id_equal(cls, dest, name):
# type: (str, Optional[str]) -> bool
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
"""
Return whether the id of the current commit equals the given name.
@ -525,8 +498,7 @@ class VersionControl:
"""
raise NotImplementedError
def obtain(self, dest, url):
# type: (str, HiddenText) -> None
def obtain(self, dest: str, url: HiddenText) -> None:
"""
Install or update in editable mode the package represented by this
VersionControl object.
@ -614,8 +586,7 @@ class VersionControl:
)
self.switch(dest, url, rev_options)
def unpack(self, location, url):
# type: (str, HiddenText) -> None
def unpack(self, location: str, url: HiddenText) -> None:
"""
Clean up current location and download the url repository
(and vcs infos) into location
@ -627,8 +598,7 @@ class VersionControl:
self.obtain(location, url=url)
@classmethod
def get_remote_url(cls, location):
# type: (str) -> str
def get_remote_url(cls, location: str) -> str:
"""
Return the url used at location
@ -638,8 +608,7 @@ class VersionControl:
raise NotImplementedError
@classmethod
def get_revision(cls, location):
# type: (str) -> str
def get_revision(cls, location: str) -> str:
"""
Return the current commit id of the files at the given location.
"""
@ -648,18 +617,17 @@ class VersionControl:
@classmethod
def run_command(
cls,
cmd, # type: Union[List[str], CommandArgs]
show_stdout=True, # type: bool
cwd=None, # type: Optional[str]
on_returncode="raise", # type: Literal["raise", "warn", "ignore"]
extra_ok_returncodes=None, # type: Optional[Iterable[int]]
command_desc=None, # type: Optional[str]
extra_environ=None, # type: Optional[Mapping[str, Any]]
spinner=None, # type: Optional[SpinnerInterface]
log_failed_cmd=True, # type: bool
stdout_only=False, # type: bool
):
# type: (...) -> str
cmd: Union[List[str], CommandArgs],
show_stdout: bool = True,
cwd: Optional[str] = None,
on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
extra_ok_returncodes: Optional[Iterable[int]] = None,
command_desc: Optional[str] = None,
extra_environ: Optional[Mapping[str, Any]] = None,
spinner: Optional[SpinnerInterface] = None,
log_failed_cmd: bool = True,
stdout_only: bool = False,
) -> str:
"""
Run a VCS subcommand
This is simply a wrapper around call_subprocess that adds the VCS
@ -701,8 +669,7 @@ class VersionControl:
)
@classmethod
def is_repository_directory(cls, path):
# type: (str) -> bool
def is_repository_directory(cls, path: str) -> bool:
"""
Return whether a directory path is a repository directory.
"""
@ -710,8 +677,7 @@ class VersionControl:
return os.path.exists(os.path.join(path, cls.dirname))
@classmethod
def get_repository_root(cls, location):
# type: (str) -> Optional[str]
def get_repository_root(cls, location: str) -> Optional[str]:
"""
Return the "root" (top-level) directory controlled by the vcs,
or `None` if the directory is not in any.