From 44a034a1315ce7f19242c089d1a686a5ac5bfed8 Mon Sep 17 00:00:00 2001 From: Jon Dufresne Date: Fri, 27 Aug 2021 18:29:39 -0700 Subject: [PATCH] Move all remaining type comments to annotations Use the com2ann tool to convert remaining comments to annotations. Now, no type comments remain. https://github.com/ilevkivskyi/com2ann Some types are not available at runtime (e.g. Literal) or require a forward reference and so were quoted. --- ...af-2178-4e27-9292-2066782aba0b.trivial.rst | 0 src/pip/_internal/utils/hashes.py | 43 ++-- src/pip/_internal/utils/misc.py | 200 ++++++------------ src/pip/_internal/utils/models.py | 24 +-- src/pip/_internal/utils/subprocess.py | 61 +++--- src/pip/_internal/vcs/git.py | 62 ++---- src/pip/_internal/vcs/subversion.py | 56 ++--- src/pip/_internal/vcs/versioncontrol.py | 184 +++++++--------- 8 files changed, 237 insertions(+), 393 deletions(-) create mode 100644 news/dba409af-2178-4e27-9292-2066782aba0b.trivial.rst diff --git a/news/dba409af-2178-4e27-9292-2066782aba0b.trivial.rst b/news/dba409af-2178-4e27-9292-2066782aba0b.trivial.rst new file mode 100644 index 000000000..e69de29bb diff --git a/src/pip/_internal/utils/hashes.py b/src/pip/_internal/utils/hashes.py index f011f6591..82eb035a0 100644 --- a/src/pip/_internal/utils/hashes.py +++ b/src/pip/_internal/utils/hashes.py @@ -28,8 +28,7 @@ class Hashes: """ - def __init__(self, hashes=None): - # type: (Dict[str, List[str]]) -> None + def __init__(self, hashes: Dict[str, List[str]] = None) -> None: """ :param hashes: A dict of algorithm names pointing to lists of allowed hex digests @@ -41,8 +40,7 @@ class Hashes: allowed[alg] = sorted(keys) self._allowed = allowed - def __and__(self, other): - # type: (Hashes) -> Hashes + def __and__(self, other: "Hashes") -> "Hashes": if not isinstance(other, Hashes): return NotImplemented @@ -62,21 +60,14 @@ class Hashes: return Hashes(new) @property - def digest_count(self): - # type: () -> int + def digest_count(self) -> int: return sum(len(digests) for digests in self._allowed.values()) - def is_hash_allowed( - self, - hash_name, # type: str - hex_digest, # type: str - ): - # type: (...) -> bool + def is_hash_allowed(self, hash_name: str, hex_digest: str) -> bool: """Return whether the given hex digest is allowed.""" return hex_digest in self._allowed.get(hash_name, []) - def check_against_chunks(self, chunks): - # type: (Iterator[bytes]) -> None + def check_against_chunks(self, chunks: Iterator[bytes]) -> None: """Check good hashes against ones built from iterable of chunks of data. @@ -99,12 +90,10 @@ class Hashes: return self._raise(gots) - def _raise(self, gots): - # type: (Dict[str, _Hash]) -> NoReturn + def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn": raise HashMismatch(self._allowed, gots) - def check_against_file(self, file): - # type: (BinaryIO) -> None + def check_against_file(self, file: BinaryIO) -> None: """Check good hashes against a file-like object Raise HashMismatch if none match. @@ -112,24 +101,20 @@ class Hashes: """ return self.check_against_chunks(read_chunks(file)) - def check_against_path(self, path): - # type: (str) -> None + def check_against_path(self, path: str) -> None: with open(path, "rb") as file: return self.check_against_file(file) - def __bool__(self): - # type: () -> bool + def __bool__(self) -> bool: """Return whether I know any known-good hashes.""" return bool(self._allowed) - def __eq__(self, other): - # type: (object) -> bool + def __eq__(self, other: object) -> bool: if not isinstance(other, Hashes): return NotImplemented return self._allowed == other._allowed - def __hash__(self): - # type: () -> int + def __hash__(self) -> int: return hash( ",".join( sorted( @@ -149,13 +134,11 @@ class MissingHashes(Hashes): """ - def __init__(self): - # type: () -> None + def __init__(self) -> None: """Don't offer the ``hashes`` kwarg.""" # Pass our favorite hash in to generate a "gotten hash". With the # empty list, it will never match, so an error will always raise. super().__init__(hashes={FAVORITE_HASH: []}) - def _raise(self, gots): - # type: (Dict[str, _Hash]) -> NoReturn + def _raise(self, gots: Dict[str, "_Hash"]) -> "NoReturn": raise HashMissing(gots[FAVORITE_HASH].hexdigest()) diff --git a/src/pip/_internal/utils/misc.py b/src/pip/_internal/utils/misc.py index 3df0b4c0e..8b1081f29 100644 --- a/src/pip/_internal/utils/misc.py +++ b/src/pip/_internal/utils/misc.py @@ -70,8 +70,7 @@ VersionInfo = Tuple[int, int, int] NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]] -def get_pip_version(): - # type: () -> str +def get_pip_version() -> str: pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..") pip_pkg_dir = os.path.abspath(pip_pkg_dir) @@ -82,8 +81,7 @@ def get_pip_version(): ) -def normalize_version_info(py_version_info): - # type: (Tuple[int, ...]) -> Tuple[int, int, int] +def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]: """ Convert a tuple of ints representing a Python version to one of length three. @@ -102,8 +100,7 @@ def normalize_version_info(py_version_info): return cast("VersionInfo", py_version_info) -def ensure_dir(path): - # type: (str) -> None +def ensure_dir(path: str) -> None: """os.path.makedirs without EEXIST.""" try: os.makedirs(path) @@ -113,8 +110,7 @@ def ensure_dir(path): raise -def get_prog(): - # type: () -> str +def get_prog() -> str: try: prog = os.path.basename(sys.argv[0]) if prog in ("__main__.py", "-c"): @@ -129,13 +125,11 @@ def get_prog(): # Retry every half second for up to 3 seconds # Tenacity raises RetryError by default, explicitly raise the original exception @retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5)) -def rmtree(dir, ignore_errors=False): - # type: (str, bool) -> None +def rmtree(dir: str, ignore_errors: bool = False) -> None: shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler) -def rmtree_errorhandler(func, path, exc_info): - # type: (Callable[..., Any], str, ExcInfo) -> None +def rmtree_errorhandler(func: Callable[..., Any], path: str, exc_info: ExcInfo) -> None: """On Windows, the files in .svn are read-only, so when rmtree() tries to remove them, an exception is thrown. We catch that here, remove the read-only attribute, and hopefully continue without problems.""" @@ -155,8 +149,7 @@ def rmtree_errorhandler(func, path, exc_info): raise -def display_path(path): - # type: (str) -> str +def display_path(path: str) -> str: """Gives the display value for a given path, making it relative to cwd if possible.""" path = os.path.normcase(os.path.abspath(path)) @@ -165,8 +158,7 @@ def display_path(path): return path -def backup_dir(dir, ext=".bak"): - # type: (str, str) -> str +def backup_dir(dir: str, ext: str = ".bak") -> str: """Figure out the name of a directory to back up the given dir to (adding .bak, .bak2, etc)""" n = 1 @@ -177,16 +169,14 @@ def backup_dir(dir, ext=".bak"): return dir + extension -def ask_path_exists(message, options): - # type: (str, Iterable[str]) -> str +def ask_path_exists(message: str, options: Iterable[str]) -> str: for action in os.environ.get("PIP_EXISTS_ACTION", "").split(): if action in options: return action return ask(message, options) -def _check_no_input(message): - # type: (str) -> None +def _check_no_input(message: str) -> None: """Raise an error if no input is allowed.""" if os.environ.get("PIP_NO_INPUT"): raise Exception( @@ -194,8 +184,7 @@ def _check_no_input(message): ) -def ask(message, options): - # type: (str, Iterable[str]) -> str +def ask(message: str, options: Iterable[str]) -> str: """Ask the message interactively, with the given possible responses""" while 1: _check_no_input(message) @@ -210,22 +199,19 @@ def ask(message, options): return response -def ask_input(message): - # type: (str) -> str +def ask_input(message: str) -> str: """Ask for input interactively.""" _check_no_input(message) return input(message) -def ask_password(message): - # type: (str) -> str +def ask_password(message: str) -> str: """Ask for a password interactively.""" _check_no_input(message) return getpass.getpass(message) -def strtobool(val): - # type: (str) -> int +def strtobool(val: str) -> int: """Convert a string representation of truth to true (1) or false (0). True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values @@ -241,8 +227,7 @@ def strtobool(val): raise ValueError(f"invalid truth value {val!r}") -def format_size(bytes): - # type: (float) -> str +def format_size(bytes: float) -> str: if bytes > 1000 * 1000: return "{:.1f} MB".format(bytes / 1000.0 / 1000) elif bytes > 10 * 1000: @@ -253,8 +238,7 @@ def format_size(bytes): return "{} bytes".format(int(bytes)) -def tabulate(rows): - # type: (Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]] +def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]: """Return a list of formatted rows and a list of column sizes. For example:: @@ -285,8 +269,7 @@ def is_installable_dir(path: str) -> bool: return False -def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): - # type: (BinaryIO, int) -> Iterator[bytes] +def read_chunks(file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE) -> Iterator[bytes]: """Yield pieces of data from a file-like object until EOF.""" while True: chunk = file.read(size) @@ -295,8 +278,7 @@ def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): yield chunk -def normalize_path(path, resolve_symlinks=True): - # type: (str, bool) -> str +def normalize_path(path: str, resolve_symlinks: bool = True) -> str: """ Convert a path to its canonical, case-normalized, absolute version. @@ -309,8 +291,7 @@ def normalize_path(path, resolve_symlinks=True): return os.path.normcase(path) -def splitext(path): - # type: (str) -> Tuple[str, str] +def splitext(path: str) -> Tuple[str, str]: """Like os.path.splitext, but take off .tar too""" base, ext = posixpath.splitext(path) if base.lower().endswith(".tar"): @@ -319,8 +300,7 @@ def splitext(path): return base, ext -def renames(old, new): - # type: (str, str) -> None +def renames(old: str, new: str) -> None: """Like os.renames(), but handles renaming across devices.""" # Implementation borrowed from os.renames(). head, tail = os.path.split(new) @@ -337,8 +317,7 @@ def renames(old, new): pass -def is_local(path): - # type: (str) -> bool +def is_local(path: str) -> bool: """ Return True if path is within sys.prefix, if we're running in a virtualenv. @@ -352,8 +331,7 @@ def is_local(path): return path.startswith(normalize_path(sys.prefix)) -def dist_is_local(dist): - # type: (Distribution) -> bool +def dist_is_local(dist: Distribution) -> bool: """ Return True if given Distribution object is installed locally (i.e. within current virtualenv). @@ -364,16 +342,14 @@ def dist_is_local(dist): return is_local(dist_location(dist)) -def dist_in_usersite(dist): - # type: (Distribution) -> bool +def dist_in_usersite(dist: Distribution) -> bool: """ Return True if given Distribution is installed in user site. """ return dist_location(dist).startswith(normalize_path(user_site)) -def dist_in_site_packages(dist): - # type: (Distribution) -> bool +def dist_in_site_packages(dist: Distribution) -> bool: """ Return True if given Distribution is installed in sysconfig.get_python_lib(). @@ -381,8 +357,7 @@ def dist_in_site_packages(dist): return dist_location(dist).startswith(normalize_path(site_packages)) -def dist_is_editable(dist): - # type: (Distribution) -> bool +def dist_is_editable(dist: Distribution) -> bool: """ Return True if given Distribution is an editable install. """ @@ -394,14 +369,13 @@ def dist_is_editable(dist): def get_installed_distributions( - local_only=True, # type: bool - skip=stdlib_pkgs, # type: Container[str] - include_editables=True, # type: bool - editables_only=False, # type: bool - user_only=False, # type: bool - paths=None, # type: Optional[List[str]] -): - # type: (...) -> List[Distribution] + local_only: bool = True, + skip: Container[str] = stdlib_pkgs, + include_editables: bool = True, + editables_only: bool = False, + user_only: bool = False, + paths: Optional[List[str]] = None, +) -> List[Distribution]: """Return a list of installed Distribution objects. Left for compatibility until direct pkg_resources uses are refactored out. @@ -423,8 +397,7 @@ def get_installed_distributions( return [cast(_Dist, dist)._dist for dist in dists] -def get_distribution(req_name): - # type: (str) -> Optional[Distribution] +def get_distribution(req_name: str) -> Optional[Distribution]: """Given a requirement name, return the installed Distribution object. This searches from *all* distributions available in the environment, to @@ -441,8 +414,7 @@ def get_distribution(req_name): return cast(_Dist, dist)._dist -def egg_link_path(dist): - # type: (Distribution) -> Optional[str] +def egg_link_path(dist: Distribution) -> Optional[str]: """ Return the path for the .egg-link file if it exists, otherwise, None. @@ -477,8 +449,7 @@ def egg_link_path(dist): return None -def dist_location(dist): - # type: (Distribution) -> str +def dist_location(dist: Distribution) -> str: """ Get the site-packages location of this distribution. Generally this is dist.location, except in the case of develop-installed @@ -493,17 +464,15 @@ def dist_location(dist): return normalize_path(dist.location) -def write_output(msg, *args): - # type: (Any, Any) -> None +def write_output(msg: Any, *args: Any) -> None: logger.info(msg, *args) class StreamWrapper(StringIO): - orig_stream = None # type: TextIO + orig_stream: TextIO = None @classmethod - def from_stream(cls, orig_stream): - # type: (TextIO) -> StreamWrapper + def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper": cls.orig_stream = orig_stream return cls() @@ -515,8 +484,7 @@ class StreamWrapper(StringIO): @contextlib.contextmanager -def captured_output(stream_name): - # type: (str) -> Iterator[StreamWrapper] +def captured_output(stream_name: str) -> Iterator[StreamWrapper]: """Return a context manager used by captured_stdout/stdin/stderr that temporarily replaces the sys stream *stream_name* with a StringIO. @@ -530,8 +498,7 @@ def captured_output(stream_name): setattr(sys, stream_name, orig_stdout) -def captured_stdout(): - # type: () -> ContextManager[StreamWrapper] +def captured_stdout() -> ContextManager[StreamWrapper]: """Capture the output of sys.stdout: with captured_stdout() as stdout: @@ -543,8 +510,7 @@ def captured_stdout(): return captured_output("stdout") -def captured_stderr(): - # type: () -> ContextManager[StreamWrapper] +def captured_stderr() -> ContextManager[StreamWrapper]: """ See captured_stdout(). """ @@ -552,16 +518,14 @@ def captured_stderr(): # Simulates an enum -def enum(*sequential, **named): - # type: (*Any, **Any) -> Type[Any] +def enum(*sequential: Any, **named: Any) -> Type[Any]: enums = dict(zip(sequential, range(len(sequential))), **named) reverse = {value: key for key, value in enums.items()} enums["reverse_mapping"] = reverse return type("Enum", (), enums) -def build_netloc(host, port): - # type: (str, Optional[int]) -> str +def build_netloc(host: str, port: Optional[int]) -> str: """ Build a netloc from a host-port pair """ @@ -573,8 +537,7 @@ def build_netloc(host, port): return f"{host}:{port}" -def build_url_from_netloc(netloc, scheme="https"): - # type: (str, str) -> str +def build_url_from_netloc(netloc: str, scheme: str = "https") -> str: """ Build a full URL from a netloc. """ @@ -584,8 +547,7 @@ def build_url_from_netloc(netloc, scheme="https"): return f"{scheme}://{netloc}" -def parse_netloc(netloc): - # type: (str) -> Tuple[str, Optional[int]] +def parse_netloc(netloc: str) -> Tuple[str, Optional[int]]: """ Return the host-port pair from a netloc. """ @@ -594,8 +556,7 @@ def parse_netloc(netloc): return parsed.hostname, parsed.port -def split_auth_from_netloc(netloc): - # type: (str) -> NetlocTuple +def split_auth_from_netloc(netloc: str) -> NetlocTuple: """ Parse out and remove the auth information from a netloc. @@ -608,7 +569,7 @@ def split_auth_from_netloc(netloc): # behaves if more than one @ is present (which can be checked using # the password attribute of urlsplit()'s return value). auth, netloc = netloc.rsplit("@", 1) - pw = None # type: Optional[str] + pw: Optional[str] = None if ":" in auth: # Split from the left because that's how urllib.parse.urlsplit() # behaves if more than one : is present (which again can be checked @@ -624,8 +585,7 @@ def split_auth_from_netloc(netloc): return netloc, (user, pw) -def redact_netloc(netloc): - # type: (str) -> str +def redact_netloc(netloc: str) -> str: """ Replace the sensitive data in a netloc with "****", if it exists. @@ -647,8 +607,9 @@ def redact_netloc(netloc): ) -def _transform_url(url, transform_netloc): - # type: (str, Callable[[str], Tuple[Any, ...]]) -> Tuple[str, NetlocTuple] +def _transform_url( + url: str, transform_netloc: Callable[[str], Tuple[Any, ...]] +) -> Tuple[str, NetlocTuple]: """Transform and replace netloc in a url. transform_netloc is a function taking the netloc and returning a @@ -666,18 +627,15 @@ def _transform_url(url, transform_netloc): return surl, cast("NetlocTuple", netloc_tuple) -def _get_netloc(netloc): - # type: (str) -> NetlocTuple +def _get_netloc(netloc: str) -> NetlocTuple: return split_auth_from_netloc(netloc) -def _redact_netloc(netloc): - # type: (str) -> Tuple[str,] +def _redact_netloc(netloc: str) -> Tuple[str]: return (redact_netloc(netloc),) -def split_auth_netloc_from_url(url): - # type: (str) -> Tuple[str, str, Tuple[str, str]] +def split_auth_netloc_from_url(url: str) -> Tuple[str, str, Tuple[str, str]]: """ Parse a url into separate netloc, auth, and url with no auth. @@ -687,41 +645,31 @@ def split_auth_netloc_from_url(url): return url_without_auth, netloc, auth -def remove_auth_from_url(url): - # type: (str) -> str +def remove_auth_from_url(url: str) -> str: """Return a copy of url with 'username:password@' removed.""" # username/pass params are passed to subversion through flags # and are not recognized in the url. return _transform_url(url, _get_netloc)[0] -def redact_auth_from_url(url): - # type: (str) -> str +def redact_auth_from_url(url: str) -> str: """Replace the password in a given url with ****.""" return _transform_url(url, _redact_netloc)[0] class HiddenText: - def __init__( - self, - secret, # type: str - redacted, # type: str - ): - # type: (...) -> None + def __init__(self, secret: str, redacted: str) -> None: self.secret = secret self.redacted = redacted - def __repr__(self): - # type: (...) -> str + def __repr__(self) -> str: return "".format(str(self)) - def __str__(self): - # type: (...) -> str + def __str__(self) -> str: return self.redacted # This is useful for testing. - def __eq__(self, other): - # type: (Any) -> bool + def __eq__(self, other: Any) -> bool: if type(self) != type(other): return False @@ -730,19 +678,16 @@ class HiddenText: return self.secret == other.secret -def hide_value(value): - # type: (str) -> HiddenText +def hide_value(value: str) -> HiddenText: return HiddenText(value, redacted="****") -def hide_url(url): - # type: (str) -> HiddenText +def hide_url(url: str) -> HiddenText: redacted = redact_auth_from_url(url) return HiddenText(url, redacted=redacted) -def protect_pip_from_modification_on_windows(modifying_pip): - # type: (bool) -> None +def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None: """Protection of pip.exe from modification on Windows On Windows, any operation modifying pip should be run as: @@ -768,14 +713,12 @@ def protect_pip_from_modification_on_windows(modifying_pip): ) -def is_console_interactive(): - # type: () -> bool +def is_console_interactive() -> bool: """Is this console interactive?""" return sys.stdin is not None and sys.stdin.isatty() -def hash_file(path, blocksize=1 << 20): - # type: (str, int) -> Tuple[Any, int] +def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]: """Return (hash, length) for path using hashlib.sha256()""" h = hashlib.sha256() @@ -787,8 +730,7 @@ def hash_file(path, blocksize=1 << 20): return h, length -def is_wheel_installed(): - # type: () -> bool +def is_wheel_installed() -> bool: """ Return whether the wheel package is installed. """ @@ -800,8 +742,7 @@ def is_wheel_installed(): return True -def pairwise(iterable): - # type: (Iterable[Any]) -> Iterator[Tuple[Any, Any]] +def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]: """ Return paired elements. @@ -813,10 +754,9 @@ def pairwise(iterable): def partition( - pred, # type: Callable[[T], bool] - iterable, # type: Iterable[T] -): - # type: (...) -> Tuple[Iterable[T], Iterable[T]] + pred: Callable[[T], bool], + iterable: Iterable[T], +) -> Tuple[Iterable[T], Iterable[T]]: """ Use a predicate to partition entries into false entries and true entries, like diff --git a/src/pip/_internal/utils/models.py b/src/pip/_internal/utils/models.py index 0e02bc7a5..b6bb21a8b 100644 --- a/src/pip/_internal/utils/models.py +++ b/src/pip/_internal/utils/models.py @@ -10,37 +10,29 @@ class KeyBasedCompareMixin: __slots__ = ["_compare_key", "_defining_class"] - def __init__(self, key, defining_class): - # type: (Any, Type[KeyBasedCompareMixin]) -> None + def __init__(self, key: Any, defining_class: Type["KeyBasedCompareMixin"]) -> None: self._compare_key = key self._defining_class = defining_class - def __hash__(self): - # type: () -> int + def __hash__(self) -> int: return hash(self._compare_key) - def __lt__(self, other): - # type: (Any) -> bool + def __lt__(self, other: Any) -> bool: return self._compare(other, operator.__lt__) - def __le__(self, other): - # type: (Any) -> bool + def __le__(self, other: Any) -> bool: return self._compare(other, operator.__le__) - def __gt__(self, other): - # type: (Any) -> bool + def __gt__(self, other: Any) -> bool: return self._compare(other, operator.__gt__) - def __ge__(self, other): - # type: (Any) -> bool + def __ge__(self, other: Any) -> bool: return self._compare(other, operator.__ge__) - def __eq__(self, other): - # type: (Any) -> bool + def __eq__(self, other: Any) -> bool: return self._compare(other, operator.__eq__) - def _compare(self, other, method): - # type: (Any, Callable[[Any, Any], bool]) -> bool + def _compare(self, other: Any, method: Callable[[Any, Any], bool]) -> bool: if not isinstance(other, self._defining_class): return NotImplemented diff --git a/src/pip/_internal/utils/subprocess.py b/src/pip/_internal/utils/subprocess.py index 436de4e8d..f6e8b219a 100644 --- a/src/pip/_internal/utils/subprocess.py +++ b/src/pip/_internal/utils/subprocess.py @@ -30,12 +30,11 @@ CommandArgs = List[Union[str, HiddenText]] LOG_DIVIDER = "----------------------------------------" -def make_command(*args): - # type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs +def make_command(*args: Union[str, HiddenText, CommandArgs]) -> CommandArgs: """ Create a CommandArgs object. """ - command_args = [] # type: CommandArgs + command_args: CommandArgs = [] for arg in args: # Check for list instead of CommandArgs since CommandArgs is # only known during type-checking. @@ -48,8 +47,7 @@ def make_command(*args): return command_args -def format_command_args(args): - # type: (Union[List[str], CommandArgs]) -> str +def format_command_args(args: Union[List[str], CommandArgs]) -> str: """ Format command arguments for display. """ @@ -64,8 +62,7 @@ def format_command_args(args): ) -def reveal_command_args(args): - # type: (Union[List[str], CommandArgs]) -> List[str] +def reveal_command_args(args: Union[List[str], CommandArgs]) -> List[str]: """ Return the arguments in their raw, unredacted form. """ @@ -73,12 +70,11 @@ def reveal_command_args(args): def make_subprocess_output_error( - cmd_args, # type: Union[List[str], CommandArgs] - cwd, # type: Optional[str] - lines, # type: List[str] - exit_status, # type: int -): - # type: (...) -> str + cmd_args: Union[List[str], CommandArgs], + cwd: Optional[str], + lines: List[str], + exit_status: int, +) -> str: """ Create and return the error message to use to log a subprocess error with command output. @@ -109,19 +105,18 @@ def make_subprocess_output_error( def call_subprocess( - cmd, # type: Union[List[str], CommandArgs] - show_stdout=False, # type: bool - cwd=None, # type: Optional[str] - on_returncode="raise", # type: Literal["raise", "warn", "ignore"] - extra_ok_returncodes=None, # type: Optional[Iterable[int]] - command_desc=None, # type: Optional[str] - extra_environ=None, # type: Optional[Mapping[str, Any]] - unset_environ=None, # type: Optional[Iterable[str]] - spinner=None, # type: Optional[SpinnerInterface] - log_failed_cmd=True, # type: Optional[bool] - stdout_only=False, # type: Optional[bool] -): - # type: (...) -> str + cmd: Union[List[str], CommandArgs], + show_stdout: bool = False, + cwd: Optional[str] = None, + on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise", + extra_ok_returncodes: Optional[Iterable[int]] = None, + command_desc: Optional[str] = None, + extra_environ: Optional[Mapping[str, Any]] = None, + unset_environ: Optional[Iterable[str]] = None, + spinner: Optional[SpinnerInterface] = None, + log_failed_cmd: Optional[bool] = True, + stdout_only: Optional[bool] = False, +) -> str: """ Args: show_stdout: if true, use INFO to log the subprocess's stderr and @@ -206,7 +201,7 @@ def call_subprocess( proc.stdin.close() # In this mode, stdout and stderr are in the same pipe. while True: - line = proc.stdout.readline() # type: str + line: str = proc.stdout.readline() if not line: break line = line.rstrip() @@ -271,8 +266,7 @@ def call_subprocess( return output -def runner_with_spinner_message(message): - # type: (str) -> Callable[..., None] +def runner_with_spinner_message(message: str) -> Callable[..., None]: """Provide a subprocess_runner that shows a spinner message. Intended for use with for pep517's Pep517HookCaller. Thus, the runner has @@ -280,11 +274,10 @@ def runner_with_spinner_message(message): """ def runner( - cmd, # type: List[str] - cwd=None, # type: Optional[str] - extra_environ=None, # type: Optional[Mapping[str, Any]] - ): - # type: (...) -> None + cmd: List[str], + cwd: Optional[str] = None, + extra_environ: Optional[Mapping[str, Any]] = None, + ) -> None: with open_spinner(message) as spinner: call_subprocess( cmd, diff --git a/src/pip/_internal/vcs/git.py b/src/pip/_internal/vcs/git.py index 4f0025c59..3186553e7 100644 --- a/src/pip/_internal/vcs/git.py +++ b/src/pip/_internal/vcs/git.py @@ -52,8 +52,7 @@ SCP_REGEX = re.compile( ) -def looks_like_hash(sha): - # type: (str) -> bool +def looks_like_hash(sha: str) -> bool: return bool(HASH_REGEX.match(sha)) @@ -74,12 +73,10 @@ class Git(VersionControl): default_arg_rev = "HEAD" @staticmethod - def get_base_rev_args(rev): - # type: (str) -> List[str] + def get_base_rev_args(rev: str) -> List[str]: return [rev] - def is_immutable_rev_checkout(self, url, dest): - # type: (str, str) -> bool + def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: _, rev_options = self.get_url_rev_options(hide_url(url)) if not rev_options.rev: return False @@ -101,8 +98,7 @@ class Git(VersionControl): return tuple(int(c) for c in match.groups()) @classmethod - def get_current_branch(cls, location): - # type: (str) -> Optional[str] + def get_current_branch(cls, location: str) -> Optional[str]: """ Return the current branch, or None if HEAD isn't at a branch (e.g. detached HEAD). @@ -127,8 +123,7 @@ class Git(VersionControl): return None @classmethod - def get_revision_sha(cls, dest, rev): - # type: (str, str) -> Tuple[Optional[str], bool] + def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]: """ Return (sha_or_none, is_branch), where sha_or_none is a commit hash if the revision names a remote branch or tag, otherwise None. @@ -174,8 +169,7 @@ class Git(VersionControl): return (sha, False) @classmethod - def _should_fetch(cls, dest, rev): - # type: (str, str) -> bool + def _should_fetch(cls, dest: str, rev: str) -> bool: """ Return true if rev is a ref or is a commit that we don't have locally. @@ -198,8 +192,9 @@ class Git(VersionControl): return True @classmethod - def resolve_revision(cls, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> RevOptions + def resolve_revision( + cls, dest: str, url: HiddenText, rev_options: RevOptions + ) -> RevOptions: """ Resolve a revision to a new RevOptions object with the SHA1 of the branch, tag, or ref if found. @@ -243,8 +238,7 @@ class Git(VersionControl): return rev_options @classmethod - def is_commit_id_equal(cls, dest, name): - # type: (str, Optional[str]) -> bool + def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: """ Return whether the current commit hash equals the given name. @@ -258,8 +252,7 @@ class Git(VersionControl): return cls.get_revision(dest) == name - def fetch_new(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: rev_display = rev_options.to_display() logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest)) if self.get_git_version() >= (2, 17): @@ -314,8 +307,7 @@ class Git(VersionControl): #: repo may contain submodules self.update_submodules(dest) - def switch(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: self.run_command( make_command("config", "remote.origin.url", url), cwd=dest, @@ -325,8 +317,7 @@ class Git(VersionControl): self.update_submodules(dest) - def update(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: # First fetch changes from the default remote if self.get_git_version() >= (1, 9): # fetch tags in addition to everything else @@ -341,8 +332,7 @@ class Git(VersionControl): self.update_submodules(dest) @classmethod - def get_remote_url(cls, location): - # type: (str) -> str + def get_remote_url(cls, location: str) -> str: """ Return URL of the first remote encountered. @@ -372,8 +362,7 @@ class Git(VersionControl): return cls._git_remote_to_pip_url(url.strip()) @staticmethod - def _git_remote_to_pip_url(url): - # type: (str) -> str + def _git_remote_to_pip_url(url: str) -> str: """ Convert a remote url from what git uses to what pip accepts. @@ -404,8 +393,7 @@ class Git(VersionControl): raise RemoteNotValidError(url) @classmethod - def has_commit(cls, location, rev): - # type: (str, str) -> bool + def has_commit(cls, location: str, rev: str) -> bool: """ Check if rev is a commit that is available in the local repository. """ @@ -421,8 +409,7 @@ class Git(VersionControl): return True @classmethod - def get_revision(cls, location, rev=None): - # type: (str, Optional[str]) -> str + def get_revision(cls, location: str, rev: Optional[str] = None) -> str: if rev is None: rev = "HEAD" current_rev = cls.run_command( @@ -434,8 +421,7 @@ class Git(VersionControl): return current_rev.strip() @classmethod - def get_subdirectory(cls, location): - # type: (str) -> Optional[str] + def get_subdirectory(cls, location: str) -> Optional[str]: """ Return the path to Python project root, relative to the repo root. Return None if the project root is in the repo root. @@ -453,8 +439,7 @@ class Git(VersionControl): return find_path_to_project_root_from_repo_root(location, repo_root) @classmethod - def get_url_rev_and_auth(cls, url): - # type: (str) -> Tuple[str, Optional[str], AuthInfo] + def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: """ Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'. That's required because although they use SSH they sometimes don't @@ -485,8 +470,7 @@ class Git(VersionControl): return url, rev, user_pass @classmethod - def update_submodules(cls, location): - # type: (str) -> None + def update_submodules(cls, location: str) -> None: if not os.path.exists(os.path.join(location, ".gitmodules")): return cls.run_command( @@ -495,8 +479,7 @@ class Git(VersionControl): ) @classmethod - def get_repository_root(cls, location): - # type: (str) -> Optional[str] + def get_repository_root(cls, location: str) -> Optional[str]: loc = super().get_repository_root(location) if loc: return loc @@ -521,8 +504,7 @@ class Git(VersionControl): return os.path.normpath(r.rstrip("\r\n")) @staticmethod - def should_add_vcs_url_prefix(repo_url): - # type: (str) -> bool + def should_add_vcs_url_prefix(repo_url: str) -> bool: """In either https or ssh form, requirements must be prefixed with git+.""" return True diff --git a/src/pip/_internal/vcs/subversion.py b/src/pip/_internal/vcs/subversion.py index c95774c76..b5b6fd5c0 100644 --- a/src/pip/_internal/vcs/subversion.py +++ b/src/pip/_internal/vcs/subversion.py @@ -34,18 +34,15 @@ class Subversion(VersionControl): schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file") @classmethod - def should_add_vcs_url_prefix(cls, remote_url): - # type: (str) -> bool + def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: return True @staticmethod - def get_base_rev_args(rev): - # type: (str) -> List[str] + def get_base_rev_args(rev: str) -> List[str]: return ["-r", rev] @classmethod - def get_revision(cls, location): - # type: (str) -> str + def get_revision(cls, location: str) -> str: """ Return the maximum revision for all files under a given location """ @@ -74,8 +71,9 @@ class Subversion(VersionControl): return str(revision) @classmethod - def get_netloc_and_auth(cls, netloc, scheme): - # type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]] + def get_netloc_and_auth( + cls, netloc: str, scheme: str + ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]: """ This override allows the auth information to be passed to svn via the --username and --password options instead of via the URL. @@ -88,8 +86,7 @@ class Subversion(VersionControl): return split_auth_from_netloc(netloc) @classmethod - def get_url_rev_and_auth(cls, url): - # type: (str) -> Tuple[str, Optional[str], AuthInfo] + def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: # hotfix the URL scheme after removing svn+ from svn+ssh:// readd it url, rev, user_pass = super().get_url_rev_and_auth(url) if url.startswith("ssh://"): @@ -97,9 +94,10 @@ class Subversion(VersionControl): return url, rev, user_pass @staticmethod - def make_rev_args(username, password): - # type: (Optional[str], Optional[HiddenText]) -> CommandArgs - extra_args = [] # type: CommandArgs + def make_rev_args( + username: Optional[str], password: Optional[HiddenText] + ) -> CommandArgs: + extra_args: CommandArgs = [] if username: extra_args += ["--username", username] if password: @@ -108,8 +106,7 @@ class Subversion(VersionControl): return extra_args @classmethod - def get_remote_url(cls, location): - # type: (str) -> str + def get_remote_url(cls, location: str) -> str: # In cases where the source is in a subdirectory, we have to look up in # the location until we find a valid project root. orig_location = location @@ -133,8 +130,7 @@ class Subversion(VersionControl): return url @classmethod - def _get_svn_url_rev(cls, location): - # type: (str) -> Tuple[Optional[str], int] + def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]: from pip._internal.exceptions import InstallationError entries_path = os.path.join(location, cls.dirname, "entries") @@ -184,13 +180,11 @@ class Subversion(VersionControl): return url, rev @classmethod - def is_commit_id_equal(cls, dest, name): - # type: (str, Optional[str]) -> bool + def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: """Always assume the versions don't match""" return False - def __init__(self, use_interactive=None): - # type: (bool) -> None + def __init__(self, use_interactive: bool = None) -> None: if use_interactive is None: use_interactive = is_console_interactive() self.use_interactive = use_interactive @@ -200,12 +194,11 @@ class Subversion(VersionControl): # Special value definitions: # None: Not evaluated yet. # Empty tuple: Could not parse version. - self._vcs_version = None # type: Optional[Tuple[int, ...]] + self._vcs_version: Optional[Tuple[int, ...]] = None super().__init__() - def call_vcs_version(self): - # type: () -> Tuple[int, ...] + def call_vcs_version(self) -> Tuple[int, ...]: """Query the version of the currently installed Subversion client. :return: A tuple containing the parts of the version information or @@ -233,8 +226,7 @@ class Subversion(VersionControl): return parsed_version - def get_vcs_version(self): - # type: () -> Tuple[int, ...] + def get_vcs_version(self) -> Tuple[int, ...]: """Return the version of the currently installed Subversion client. If the version of the Subversion client has already been queried, @@ -254,8 +246,7 @@ class Subversion(VersionControl): self._vcs_version = vcs_version return vcs_version - def get_remote_call_options(self): - # type: () -> CommandArgs + def get_remote_call_options(self) -> CommandArgs: """Return options to be used on calls to Subversion that contact the server. These options are applicable for the following ``svn`` subcommands used @@ -286,8 +277,7 @@ class Subversion(VersionControl): return [] - def fetch_new(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: rev_display = rev_options.to_display() logger.info( "Checking out %s%s to %s", @@ -305,8 +295,7 @@ class Subversion(VersionControl): ) self.run_command(cmd_args) - def switch(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: cmd_args = make_command( "switch", self.get_remote_call_options(), @@ -316,8 +305,7 @@ class Subversion(VersionControl): ) self.run_command(cmd_args) - def update(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: cmd_args = make_command( "update", self.get_remote_call_options(), diff --git a/src/pip/_internal/vcs/versioncontrol.py b/src/pip/_internal/vcs/versioncontrol.py index 42a0c21d0..1139051f3 100644 --- a/src/pip/_internal/vcs/versioncontrol.py +++ b/src/pip/_internal/vcs/versioncontrol.py @@ -49,8 +49,7 @@ logger = logging.getLogger(__name__) AuthInfo = Tuple[Optional[str], Optional[str]] -def is_url(name): - # type: (str) -> bool +def is_url(name: str) -> bool: """ Return true if the name looks like a URL. """ @@ -60,8 +59,9 @@ def is_url(name): return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes -def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None): - # type: (str, str, str, Optional[str]) -> str +def make_vcs_requirement_url( + repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None +) -> str: """ Return the URL for a VCS requirement. @@ -77,8 +77,9 @@ def make_vcs_requirement_url(repo_url, rev, project_name, subdir=None): return req -def find_path_to_project_root_from_repo_root(location, repo_root): - # type: (str, str) -> Optional[str] +def find_path_to_project_root_from_repo_root( + location: str, repo_root: str +) -> Optional[str]: """ Find the the Python project's root by searching up the filesystem from `location`. Return the path to project root relative to `repo_root`. @@ -126,11 +127,10 @@ class RevOptions: def __init__( self, - vc_class, # type: Type[VersionControl] - rev=None, # type: Optional[str] - extra_args=None, # type: Optional[CommandArgs] - ): - # type: (...) -> None + vc_class: Type["VersionControl"], + rev: Optional[str] = None, + extra_args: Optional[CommandArgs] = None, + ) -> None: """ Args: vc_class: a VersionControl subclass. @@ -143,26 +143,23 @@ class RevOptions: self.extra_args = extra_args self.rev = rev self.vc_class = vc_class - self.branch_name = None # type: Optional[str] + self.branch_name: Optional[str] = None - def __repr__(self): - # type: () -> str + def __repr__(self) -> str: return f"" @property - def arg_rev(self): - # type: () -> Optional[str] + def arg_rev(self) -> Optional[str]: if self.rev is None: return self.vc_class.default_arg_rev return self.rev - def to_args(self): - # type: () -> CommandArgs + def to_args(self) -> CommandArgs: """ Return the VCS-specific command arguments. """ - args = [] # type: CommandArgs + args: CommandArgs = [] rev = self.arg_rev if rev is not None: args += self.vc_class.get_base_rev_args(rev) @@ -170,15 +167,13 @@ class RevOptions: return args - def to_display(self): - # type: () -> str + def to_display(self) -> str: if not self.rev: return "" return f" (to revision {self.rev})" - def make_new(self, rev): - # type: (str) -> RevOptions + def make_new(self, rev: str) -> "RevOptions": """ Make a copy of the current instance, but with a new rev. @@ -189,40 +184,34 @@ class RevOptions: class VcsSupport: - _registry = {} # type: Dict[str, VersionControl] + _registry: Dict[str, "VersionControl"] = {} schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"] - def __init__(self): - # type: () -> None + def __init__(self) -> None: # Register more schemes with urlparse for various version control # systems urllib.parse.uses_netloc.extend(self.schemes) super().__init__() - def __iter__(self): - # type: () -> Iterator[str] + def __iter__(self) -> Iterator[str]: return self._registry.__iter__() @property - def backends(self): - # type: () -> List[VersionControl] + def backends(self) -> List["VersionControl"]: return list(self._registry.values()) @property - def dirnames(self): - # type: () -> List[str] + def dirnames(self) -> List[str]: return [backend.dirname for backend in self.backends] @property - def all_schemes(self): - # type: () -> List[str] - schemes = [] # type: List[str] + def all_schemes(self) -> List[str]: + schemes: List[str] = [] for backend in self.backends: schemes.extend(backend.schemes) return schemes - def register(self, cls): - # type: (Type[VersionControl]) -> None + def register(self, cls: Type["VersionControl"]) -> None: if not hasattr(cls, "name"): logger.warning("Cannot register VCS %s", cls.__name__) return @@ -230,13 +219,11 @@ class VcsSupport: self._registry[cls.name] = cls() logger.debug("Registered VCS backend: %s", cls.name) - def unregister(self, name): - # type: (str) -> None + def unregister(self, name: str) -> None: if name in self._registry: del self._registry[name] - def get_backend_for_dir(self, location): - # type: (str) -> Optional[VersionControl] + def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]: """ Return a VersionControl object if a repository of that type is found at the given directory. @@ -259,8 +246,7 @@ class VcsSupport: inner_most_repo_path = max(vcs_backends, key=len) return vcs_backends[inner_most_repo_path] - def get_backend_for_scheme(self, scheme): - # type: (str) -> Optional[VersionControl] + def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]: """ Return a VersionControl object or None. """ @@ -269,8 +255,7 @@ class VcsSupport: return vcs_backend return None - def get_backend(self, name): - # type: (str) -> Optional[VersionControl] + def get_backend(self, name: str) -> Optional["VersionControl"]: """ Return a VersionControl object or None. """ @@ -286,14 +271,13 @@ class VersionControl: dirname = "" repo_name = "" # List of supported schemes for this Version Control - schemes = () # type: Tuple[str, ...] + schemes: Tuple[str, ...] = () # Iterable of environment variable names to pass to call_subprocess(). - unset_environ = () # type: Tuple[str, ...] - default_arg_rev = None # type: Optional[str] + unset_environ: Tuple[str, ...] = () + default_arg_rev: Optional[str] = None @classmethod - def should_add_vcs_url_prefix(cls, remote_url): - # type: (str) -> bool + def should_add_vcs_url_prefix(cls, remote_url: str) -> bool: """ Return whether the vcs prefix (e.g. "git+") should be added to a repository's remote url when used in a requirement. @@ -301,8 +285,7 @@ class VersionControl: return not remote_url.lower().startswith(f"{cls.name}:") @classmethod - def get_subdirectory(cls, location): - # type: (str) -> Optional[str] + def get_subdirectory(cls, location: str) -> Optional[str]: """ Return the path to Python project root, relative to the repo root. Return None if the project root is in the repo root. @@ -310,16 +293,14 @@ class VersionControl: return None @classmethod - def get_requirement_revision(cls, repo_dir): - # type: (str) -> str + def get_requirement_revision(cls, repo_dir: str) -> str: """ Return the revision string that should be used in a requirement. """ return cls.get_revision(repo_dir) @classmethod - def get_src_requirement(cls, repo_dir, project_name): - # type: (str, str) -> str + def get_src_requirement(cls, repo_dir: str, project_name: str) -> str: """ Return the requirement string to use to redownload the files currently at the given repository directory. @@ -343,8 +324,7 @@ class VersionControl: return req @staticmethod - def get_base_rev_args(rev): - # type: (str) -> List[str] + def get_base_rev_args(rev: str) -> List[str]: """ Return the base revision arguments for a vcs command. @@ -353,8 +333,7 @@ class VersionControl: """ raise NotImplementedError - def is_immutable_rev_checkout(self, url, dest): - # type: (str, str) -> bool + def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: """ Return true if the commit hash checked out at dest matches the revision in url. @@ -368,8 +347,9 @@ class VersionControl: return False @classmethod - def make_rev_options(cls, rev=None, extra_args=None): - # type: (Optional[str], Optional[CommandArgs]) -> RevOptions + def make_rev_options( + cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None + ) -> RevOptions: """ Return a RevOptions object. @@ -380,8 +360,7 @@ class VersionControl: return RevOptions(cls, rev, extra_args=extra_args) @classmethod - def _is_local_repository(cls, repo): - # type: (str) -> bool + def _is_local_repository(cls, repo: str) -> bool: """ posix absolute paths start with os.path.sep, win32 ones start with drive (like c:\\folder) @@ -390,8 +369,9 @@ class VersionControl: return repo.startswith(os.path.sep) or bool(drive) @classmethod - def get_netloc_and_auth(cls, netloc, scheme): - # type: (str, str) -> Tuple[str, Tuple[Optional[str], Optional[str]]] + def get_netloc_and_auth( + cls, netloc: str, scheme: str + ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]: """ Parse the repository URL's netloc, and return the new netloc to use along with auth information. @@ -410,8 +390,7 @@ class VersionControl: return netloc, (None, None) @classmethod - def get_url_rev_and_auth(cls, url): - # type: (str) -> Tuple[str, Optional[str], AuthInfo] + def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: """ Parse the repository URL to use, and return the URL, revision, and auth info to use. @@ -441,22 +420,22 @@ class VersionControl: return url, rev, user_pass @staticmethod - def make_rev_args(username, password): - # type: (Optional[str], Optional[HiddenText]) -> CommandArgs + def make_rev_args( + username: Optional[str], password: Optional[HiddenText] + ) -> CommandArgs: """ Return the RevOptions "extra arguments" to use in obtain(). """ return [] - def get_url_rev_options(self, url): - # type: (HiddenText) -> Tuple[HiddenText, RevOptions] + def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]: """ Return the URL and RevOptions object to use in obtain(), as a tuple (url, rev_options). """ secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret) username, secret_password = user_pass - password = None # type: Optional[HiddenText] + password: Optional[HiddenText] = None if secret_password is not None: password = hide_value(secret_password) extra_args = self.make_rev_args(username, password) @@ -465,8 +444,7 @@ class VersionControl: return hide_url(secret_url), rev_options @staticmethod - def normalize_url(url): - # type: (str) -> str + def normalize_url(url: str) -> str: """ Normalize a URL for comparison by unquoting it and removing any trailing slash. @@ -474,15 +452,13 @@ class VersionControl: return urllib.parse.unquote(url).rstrip("/") @classmethod - def compare_urls(cls, url1, url2): - # type: (str, str) -> bool + def compare_urls(cls, url1: str, url2: str) -> bool: """ Compare two repo URLs for identity, ignoring incidental differences. """ return cls.normalize_url(url1) == cls.normalize_url(url2) - def fetch_new(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def fetch_new(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: """ Fetch a revision from a repository, in the case that this is the first fetch from the repository. @@ -493,8 +469,7 @@ class VersionControl: """ raise NotImplementedError - def switch(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: """ Switch the repo at ``dest`` to point to ``URL``. @@ -503,8 +478,7 @@ class VersionControl: """ raise NotImplementedError - def update(self, dest, url, rev_options): - # type: (str, HiddenText, RevOptions) -> None + def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: """ Update an already-existing repo to the given ``rev_options``. @@ -514,8 +488,7 @@ class VersionControl: raise NotImplementedError @classmethod - def is_commit_id_equal(cls, dest, name): - # type: (str, Optional[str]) -> bool + def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: """ Return whether the id of the current commit equals the given name. @@ -525,8 +498,7 @@ class VersionControl: """ raise NotImplementedError - def obtain(self, dest, url): - # type: (str, HiddenText) -> None + def obtain(self, dest: str, url: HiddenText) -> None: """ Install or update in editable mode the package represented by this VersionControl object. @@ -614,8 +586,7 @@ class VersionControl: ) self.switch(dest, url, rev_options) - def unpack(self, location, url): - # type: (str, HiddenText) -> None + def unpack(self, location: str, url: HiddenText) -> None: """ Clean up current location and download the url repository (and vcs infos) into location @@ -627,8 +598,7 @@ class VersionControl: self.obtain(location, url=url) @classmethod - def get_remote_url(cls, location): - # type: (str) -> str + def get_remote_url(cls, location: str) -> str: """ Return the url used at location @@ -638,8 +608,7 @@ class VersionControl: raise NotImplementedError @classmethod - def get_revision(cls, location): - # type: (str) -> str + def get_revision(cls, location: str) -> str: """ Return the current commit id of the files at the given location. """ @@ -648,18 +617,17 @@ class VersionControl: @classmethod def run_command( cls, - cmd, # type: Union[List[str], CommandArgs] - show_stdout=True, # type: bool - cwd=None, # type: Optional[str] - on_returncode="raise", # type: Literal["raise", "warn", "ignore"] - extra_ok_returncodes=None, # type: Optional[Iterable[int]] - command_desc=None, # type: Optional[str] - extra_environ=None, # type: Optional[Mapping[str, Any]] - spinner=None, # type: Optional[SpinnerInterface] - log_failed_cmd=True, # type: bool - stdout_only=False, # type: bool - ): - # type: (...) -> str + cmd: Union[List[str], CommandArgs], + show_stdout: bool = True, + cwd: Optional[str] = None, + on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise", + extra_ok_returncodes: Optional[Iterable[int]] = None, + command_desc: Optional[str] = None, + extra_environ: Optional[Mapping[str, Any]] = None, + spinner: Optional[SpinnerInterface] = None, + log_failed_cmd: bool = True, + stdout_only: bool = False, + ) -> str: """ Run a VCS subcommand This is simply a wrapper around call_subprocess that adds the VCS @@ -701,8 +669,7 @@ class VersionControl: ) @classmethod - def is_repository_directory(cls, path): - # type: (str) -> bool + def is_repository_directory(cls, path: str) -> bool: """ Return whether a directory path is a repository directory. """ @@ -710,8 +677,7 @@ class VersionControl: return os.path.exists(os.path.join(path, cls.dirname)) @classmethod - def get_repository_root(cls, location): - # type: (str) -> Optional[str] + def get_repository_root(cls, location: str) -> Optional[str]: """ Return the "root" (top-level) directory controlled by the vcs, or `None` if the directory is not in any.