Handle several mypy TODO comments and exceptions

Remove mypy exceptions that are straightforward to remove.
This commit is contained in:
Jon Dufresne 2020-12-25 07:54:37 -08:00
parent 8ba9917c0f
commit 63ea2e8c08
12 changed files with 165 additions and 55 deletions

View File

@ -1,6 +1,3 @@
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import os import os
import sys import sys
@ -8,6 +5,7 @@ from setuptools import find_packages, setup
def read(rel_path): def read(rel_path):
# type: (str) -> str
here = os.path.abspath(os.path.dirname(__file__)) here = os.path.abspath(os.path.dirname(__file__))
# intentionally *not* adding an encoding option to open, See: # intentionally *not* adding an encoding option to open, See:
# https://github.com/pypa/virtualenv/issues/201#issuecomment-3145690 # https://github.com/pypa/virtualenv/issues/201#issuecomment-3145690
@ -16,6 +14,7 @@ def read(rel_path):
def get_version(rel_path): def get_version(rel_path):
# type: (str) -> str
for line in read(rel_path).splitlines(): for line in read(rel_path).splitlines():
if line.startswith('__version__'): if line.startswith('__version__'):
# __version__ = "0.9" # __version__ = "0.9"

View File

@ -1,15 +1,12 @@
"""Base option parser setup""" """Base option parser setup"""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import logging import logging
import optparse import optparse
import shutil import shutil
import sys import sys
import textwrap import textwrap
from contextlib import suppress from contextlib import suppress
from typing import Any from typing import Any, Dict, Iterator, List, Tuple
from pip._internal.cli.status_codes import UNKNOWN_ERROR from pip._internal.cli.status_codes import UNKNOWN_ERROR
from pip._internal.configuration import Configuration, ConfigurationError from pip._internal.configuration import Configuration, ConfigurationError
@ -22,6 +19,7 @@ class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
"""A prettier/less verbose help formatter for optparse.""" """A prettier/less verbose help formatter for optparse."""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
# help position must be aligned with __init__.parseopts.description # help position must be aligned with __init__.parseopts.description
kwargs["max_help_position"] = 30 kwargs["max_help_position"] = 30
kwargs["indent_increment"] = 1 kwargs["indent_increment"] = 1
@ -29,9 +27,11 @@ class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def format_option_strings(self, option): def format_option_strings(self, option):
# type: (optparse.Option) -> str
return self._format_option_strings(option) return self._format_option_strings(option)
def _format_option_strings(self, option, mvarfmt=" <{}>", optsep=", "): def _format_option_strings(self, option, mvarfmt=" <{}>", optsep=", "):
# type: (optparse.Option, str, str) -> str
""" """
Return a comma-separated list of option strings and metavars. Return a comma-separated list of option strings and metavars.
@ -49,17 +49,20 @@ class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
opts.insert(1, optsep) opts.insert(1, optsep)
if option.takes_value(): if option.takes_value():
assert option.dest is not None
metavar = option.metavar or option.dest.lower() metavar = option.metavar or option.dest.lower()
opts.append(mvarfmt.format(metavar.lower())) opts.append(mvarfmt.format(metavar.lower()))
return "".join(opts) return "".join(opts)
def format_heading(self, heading): def format_heading(self, heading):
# type: (str) -> str
if heading == "Options": if heading == "Options":
return "" return ""
return heading + ":\n" return heading + ":\n"
def format_usage(self, usage): def format_usage(self, usage):
# type: (str) -> str
""" """
Ensure there is only one newline between usage and the first heading Ensure there is only one newline between usage and the first heading
if there is no description. if there is no description.
@ -68,6 +71,7 @@ class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
return msg return msg
def format_description(self, description): def format_description(self, description):
# type: (str) -> str
# leave full control over description to us # leave full control over description to us
if description: if description:
if hasattr(self.parser, "main"): if hasattr(self.parser, "main"):
@ -86,6 +90,7 @@ class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
return "" return ""
def format_epilog(self, epilog): def format_epilog(self, epilog):
# type: (str) -> str
# leave full control over epilog to us # leave full control over epilog to us
if epilog: if epilog:
return epilog return epilog
@ -93,6 +98,7 @@ class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
return "" return ""
def indent_lines(self, text, indent): def indent_lines(self, text, indent):
# type: (str, str) -> str
new_lines = [indent + line for line in text.split("\n")] new_lines = [indent + line for line in text.split("\n")]
return "\n".join(new_lines) return "\n".join(new_lines)
@ -107,9 +113,12 @@ class UpdatingDefaultsHelpFormatter(PrettyHelpFormatter):
""" """
def expand_default(self, option): def expand_default(self, option):
# type: (optparse.Option) -> str
default_values = None default_values = None
if self.parser is not None: if self.parser is not None:
assert isinstance(self.parser, ConfigOptionParser)
self.parser._update_defaults(self.parser.defaults) self.parser._update_defaults(self.parser.defaults)
assert option.dest is not None
default_values = self.parser.defaults.get(option.dest) default_values = self.parser.defaults.get(option.dest)
help_text = super().expand_default(option) help_text = super().expand_default(option)
@ -129,6 +138,7 @@ class UpdatingDefaultsHelpFormatter(PrettyHelpFormatter):
class CustomOptionParser(optparse.OptionParser): class CustomOptionParser(optparse.OptionParser):
def insert_option_group(self, idx, *args, **kwargs): def insert_option_group(self, idx, *args, **kwargs):
# type: (int, Any, Any) -> optparse.OptionGroup
"""Insert an OptionGroup at a given position.""" """Insert an OptionGroup at a given position."""
group = self.add_option_group(*args, **kwargs) group = self.add_option_group(*args, **kwargs)
@ -139,6 +149,7 @@ class CustomOptionParser(optparse.OptionParser):
@property @property
def option_list_all(self): def option_list_all(self):
# type: () -> List[optparse.Option]
"""Get a list of all options, including those in option groups.""" """Get a list of all options, including those in option groups."""
res = self.option_list[:] res = self.option_list[:]
for i in self.option_groups: for i in self.option_groups:
@ -166,6 +177,7 @@ class ConfigOptionParser(CustomOptionParser):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def check_default(self, option, key, val): def check_default(self, option, key, val):
# type: (optparse.Option, str, Any) -> Any
try: try:
return option.check_value(key, val) return option.check_value(key, val)
except optparse.OptionValueError as exc: except optparse.OptionValueError as exc:
@ -173,11 +185,14 @@ class ConfigOptionParser(CustomOptionParser):
sys.exit(3) sys.exit(3)
def _get_ordered_configuration_items(self): def _get_ordered_configuration_items(self):
# type: () -> Iterator[Tuple[str, Any]]
# Configuration gives keys in an unordered manner. Order them. # Configuration gives keys in an unordered manner. Order them.
override_order = ["global", self.name, ":env:"] override_order = ["global", self.name, ":env:"]
# Pool the options into different groups # Pool the options into different groups
section_items = {name: [] for name in override_order} section_items = {
name: [] for name in override_order
} # type: Dict[str, List[Tuple[str, Any]]]
for section_key, val in self.config.items(): for section_key, val in self.config.items():
# ignore empty values # ignore empty values
if not val: if not val:
@ -197,6 +212,7 @@ class ConfigOptionParser(CustomOptionParser):
yield key, val yield key, val
def _update_defaults(self, defaults): def _update_defaults(self, defaults):
# type: (Dict[str, Any]) -> Dict[str, Any]
"""Updates the given defaults with values from the config files and """Updates the given defaults with values from the config files and
the environ. Does a little special handling for certain types of the environ. Does a little special handling for certain types of
options (lists).""" options (lists)."""
@ -215,6 +231,8 @@ class ConfigOptionParser(CustomOptionParser):
if option is None: if option is None:
continue continue
assert option.dest is not None
if option.action in ("store_true", "store_false"): if option.action in ("store_true", "store_false"):
try: try:
val = strtobool(val) val = strtobool(val)
@ -240,6 +258,7 @@ class ConfigOptionParser(CustomOptionParser):
val = val.split() val = val.split()
val = [self.check_default(option, key, v) for v in val] val = [self.check_default(option, key, v) for v in val]
elif option.action == "callback": elif option.action == "callback":
assert option.callback is not None
late_eval.add(option.dest) late_eval.add(option.dest)
opt_str = option.get_opt_string() opt_str = option.get_opt_string()
val = option.convert_value(opt_str, val) val = option.convert_value(opt_str, val)
@ -258,6 +277,7 @@ class ConfigOptionParser(CustomOptionParser):
return defaults return defaults
def get_default_values(self): def get_default_values(self):
# type: () -> optparse.Values
"""Overriding to make updating the defaults after instantiation of """Overriding to make updating the defaults after instantiation of
the option parser possible, _update_defaults() does the dirty work.""" the option parser possible, _update_defaults() does the dirty work."""
if not self.process_default_values: if not self.process_default_values:
@ -272,6 +292,7 @@ class ConfigOptionParser(CustomOptionParser):
defaults = self._update_defaults(self.defaults.copy()) # ours defaults = self._update_defaults(self.defaults.copy()) # ours
for option in self._get_all_options(): for option in self._get_all_options():
assert option.dest is not None
default = defaults.get(option.dest) default = defaults.get(option.dest)
if isinstance(default, str): if isinstance(default, str):
opt_str = option.get_opt_string() opt_str = option.get_opt_string()
@ -279,5 +300,6 @@ class ConfigOptionParser(CustomOptionParser):
return optparse.Values(defaults) return optparse.Values(defaults)
def error(self, msg): def error(self, msg):
# type: (str) -> None
self.print_usage(sys.stderr) self.print_usage(sys.stderr)
self.exit(UNKNOWN_ERROR, f"{msg}\n") self.exit(UNKNOWN_ERROR, f"{msg}\n")

View File

@ -37,7 +37,7 @@ except Exception as exc:
def get_keyring_auth(url, username): def get_keyring_auth(url, username):
# type: (str, str) -> Optional[AuthInfo] # type: (Optional[str], Optional[str]) -> Optional[AuthInfo]
"""Return the tuple auth for a given url from keyring.""" """Return the tuple auth for a given url from keyring."""
global keyring global keyring
if not url or not keyring: if not url or not keyring:

View File

@ -2,8 +2,15 @@
network request configuration and behavior. network request configuration and behavior.
""" """
# The following comment should be removed at some point in the future. # When mypy runs on Windows the call to distro.linux_distribution() is skipped
# mypy: disallow-untyped-defs=False # resulting in the failure:
#
# error: unused 'type: ignore' comment
#
# If the upstream module adds typing, this comment should be removed. See
# https://github.com/nir0s/distro/pull/269
#
# mypy: warn-unused-ignores=False
import email.utils import email.utils
import ipaddress import ipaddress
@ -15,13 +22,14 @@ import platform
import sys import sys
import urllib.parse import urllib.parse
import warnings import warnings
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple, Union
from pip._vendor import requests, urllib3 from pip._vendor import requests, urllib3
from pip._vendor.cachecontrol import CacheControlAdapter from pip._vendor.cachecontrol import CacheControlAdapter
from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter from pip._vendor.requests.adapters import BaseAdapter, HTTPAdapter
from pip._vendor.requests.models import Response from pip._vendor.requests.models import PreparedRequest, Response
from pip._vendor.requests.structures import CaseInsensitiveDict from pip._vendor.requests.structures import CaseInsensitiveDict
from pip._vendor.urllib3.connectionpool import ConnectionPool
from pip._vendor.urllib3.exceptions import InsecureRequestWarning from pip._vendor.urllib3.exceptions import InsecureRequestWarning
from pip import __version__ from pip import __version__
@ -89,6 +97,7 @@ def looks_like_ci():
def user_agent(): def user_agent():
# type: () -> str
""" """
Return a string representing the user agent. Return a string representing the user agent.
""" """
@ -98,15 +107,14 @@ def user_agent():
"implementation": { "implementation": {
"name": platform.python_implementation(), "name": platform.python_implementation(),
}, },
} } # type: Dict[str, Any]
if data["implementation"]["name"] == 'CPython': if data["implementation"]["name"] == 'CPython':
data["implementation"]["version"] = platform.python_version() data["implementation"]["version"] = platform.python_version()
elif data["implementation"]["name"] == 'PyPy': elif data["implementation"]["name"] == 'PyPy':
if sys.pypy_version_info.releaselevel == 'final': pypy_version_info = sys.pypy_version_info # type: ignore
pypy_version_info = sys.pypy_version_info[:3] if pypy_version_info.releaselevel == 'final':
else: pypy_version_info = pypy_version_info[:3]
pypy_version_info = sys.pypy_version_info
data["implementation"]["version"] = ".".join( data["implementation"]["version"] = ".".join(
[str(x) for x in pypy_version_info] [str(x) for x in pypy_version_info]
) )
@ -119,9 +127,12 @@ def user_agent():
if sys.platform.startswith("linux"): if sys.platform.startswith("linux"):
from pip._vendor import distro from pip._vendor import distro
# https://github.com/nir0s/distro/pull/269
linux_distribution = distro.linux_distribution() # type: ignore
distro_infos = dict(filter( distro_infos = dict(filter(
lambda x: x[1], lambda x: x[1],
zip(["name", "version", "id"], distro.linux_distribution()), zip(["name", "version", "id"], linux_distribution),
)) ))
libc = dict(filter( libc = dict(filter(
lambda x: x[1], lambda x: x[1],
@ -170,8 +181,16 @@ def user_agent():
class LocalFSAdapter(BaseAdapter): class LocalFSAdapter(BaseAdapter):
def send(self, request, stream=None, timeout=None, verify=None, cert=None, def send(
proxies=None): self,
request, # type: PreparedRequest
stream=False, # type: bool
timeout=None, # type: Optional[Union[float, Tuple[float, float]]]
verify=True, # type: Union[bool, str]
cert=None, # type: Optional[Union[str, Tuple[str, str]]]
proxies=None, # type:Optional[Mapping[str, str]]
):
# type: (...) -> Response
pathname = url_to_path(request.url) pathname = url_to_path(request.url)
resp = Response() resp = Response()
@ -198,18 +217,33 @@ class LocalFSAdapter(BaseAdapter):
return resp return resp
def close(self): def close(self):
# type: () -> None
pass pass
class InsecureHTTPAdapter(HTTPAdapter): class InsecureHTTPAdapter(HTTPAdapter):
def cert_verify(self, conn, url, verify, cert): def cert_verify(
self,
conn, # type: ConnectionPool
url, # type: str
verify, # type: Union[bool, str]
cert, # type: Optional[Union[str, Tuple[str, str]]]
):
# type: (...) -> None
super().cert_verify(conn=conn, url=url, verify=False, cert=cert) super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
class InsecureCacheControlAdapter(CacheControlAdapter): class InsecureCacheControlAdapter(CacheControlAdapter):
def cert_verify(self, conn, url, verify, cert): def cert_verify(
self,
conn, # type: ConnectionPool
url, # type: str
verify, # type: Union[bool, str]
cert, # type: Optional[Union[str, Tuple[str, str]]]
):
# type: (...) -> None
super().cert_verify(conn=conn, url=url, verify=False, cert=cert) super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
@ -407,6 +441,7 @@ class PipSession(requests.Session):
return False return False
def request(self, method, url, *args, **kwargs): def request(self, method, url, *args, **kwargs):
# type: (str, str, *Any, **Any) -> Response
# Allow setting a default timeout on a session # Allow setting a default timeout on a session
kwargs.setdefault("timeout", self.timeout) kwargs.setdefault("timeout", self.timeout)

View File

@ -12,13 +12,12 @@ for sub-dependencies
# The following comment should be removed at some point in the future. # The following comment should be removed at some point in the future.
# mypy: strict-optional=False # mypy: strict-optional=False
# mypy: disallow-untyped-defs=False
import logging import logging
import sys import sys
from collections import defaultdict from collections import defaultdict
from itertools import chain from itertools import chain
from typing import DefaultDict, List, Optional, Set, Tuple from typing import DefaultDict, Iterable, List, Optional, Set, Tuple
from pip._vendor.packaging import specifiers from pip._vendor.packaging import specifiers
from pip._vendor.pkg_resources import Distribution from pip._vendor.pkg_resources import Distribution
@ -388,6 +387,7 @@ class Resolver(BaseResolver):
more_reqs = [] # type: List[InstallRequirement] more_reqs = [] # type: List[InstallRequirement]
def add_req(subreq, extras_requested): def add_req(subreq, extras_requested):
# type: (Distribution, Iterable[str]) -> None
sub_install_req = self._make_install_req( sub_install_req = self._make_install_req(
str(subreq), str(subreq),
req_to_install, req_to_install,
@ -447,6 +447,7 @@ class Resolver(BaseResolver):
ordered_reqs = set() # type: Set[InstallRequirement] ordered_reqs = set() # type: Set[InstallRequirement]
def schedule(req): def schedule(req):
# type: (InstallRequirement) -> None
if req.satisfied_by or req in ordered_reqs: if req.satisfied_by or req in ordered_reqs:
return return
if req.constraint: if req.constraint:

View File

@ -1,9 +1,6 @@
"""Stuff that differs in different Python versions and platform """Stuff that differs in different Python versions and platform
distributions.""" distributions."""
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import logging import logging
import os import os
import sys import sys

View File

@ -2,12 +2,9 @@
A module that implements tooling to enable easy warnings about deprecations. A module that implements tooling to enable easy warnings about deprecations.
""" """
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import logging import logging
import warnings import warnings
from typing import Any, Optional from typing import Any, Optional, TextIO, Type, Union
from pip._vendor.packaging.version import parse from pip._vendor.packaging.version import parse
@ -24,7 +21,15 @@ _original_showwarning = None # type: Any
# Warnings <-> Logging Integration # Warnings <-> Logging Integration
def _showwarning(message, category, filename, lineno, file=None, line=None): def _showwarning(
message, # type: Union[Warning, str]
category, # type: Type[Warning]
filename, # type: str
lineno, # type: int
file=None, # type: Optional[TextIO]
line=None, # type: Optional[str]
):
# type: (...) -> None
if file is not None: if file is not None:
if _original_showwarning is not None: if _original_showwarning is not None:
_original_showwarning(message, category, filename, lineno, file, line) _original_showwarning(message, category, filename, lineno, file, line)

View File

@ -1,6 +1,3 @@
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import contextlib import contextlib
import errno import errno
import logging import logging
@ -8,7 +5,7 @@ import logging.handlers
import os import os
import sys import sys
from logging import Filter, getLogger from logging import Filter, getLogger
from typing import Any from typing import IO, Any, Callable, Iterator, Optional, TextIO, Type, cast
from pip._internal.utils.compat import WINDOWS from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
@ -46,15 +43,17 @@ if WINDOWS:
# https://bugs.python.org/issue19612 # https://bugs.python.org/issue19612
# https://bugs.python.org/issue30418 # https://bugs.python.org/issue30418
def _is_broken_pipe_error(exc_class, exc): def _is_broken_pipe_error(exc_class, exc):
# type: (Type[BaseException], BaseException) -> bool
"""See the docstring for non-Windows below.""" """See the docstring for non-Windows below."""
return (exc_class is BrokenPipeError) or ( return (exc_class is BrokenPipeError) or (
exc_class is OSError and exc.errno in (errno.EINVAL, errno.EPIPE) isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE)
) )
else: else:
# Then we are in the non-Windows case. # Then we are in the non-Windows case.
def _is_broken_pipe_error(exc_class, exc): def _is_broken_pipe_error(exc_class, exc):
# type: (Type[BaseException], BaseException) -> bool
""" """
Return whether an exception is a broken pipe error. Return whether an exception is a broken pipe error.
@ -67,6 +66,7 @@ else:
@contextlib.contextmanager @contextlib.contextmanager
def indent_log(num=2): def indent_log(num=2):
# type: (int) -> Iterator[None]
""" """
A context manager which will cause the log output to be indented for any A context manager which will cause the log output to be indented for any
log messages emitted inside it. log messages emitted inside it.
@ -81,6 +81,7 @@ def indent_log(num=2):
def get_indentation(): def get_indentation():
# type: () -> int
return getattr(_log_state, "indentation", 0) return getattr(_log_state, "indentation", 0)
@ -104,6 +105,7 @@ class IndentingFormatter(logging.Formatter):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def get_message_start(self, formatted, levelno): def get_message_start(self, formatted, levelno):
# type: (str, int) -> str
""" """
Return the start of the formatted log message (not counting the Return the start of the formatted log message (not counting the
prefix to add to each line). prefix to add to each line).
@ -120,6 +122,7 @@ class IndentingFormatter(logging.Formatter):
return "ERROR: " return "ERROR: "
def format(self, record): def format(self, record):
# type: (logging.LogRecord) -> str
""" """
Calls the standard formatter, but will indent all of the log message Calls the standard formatter, but will indent all of the log message
lines by our current indentation level. lines by our current indentation level.
@ -137,7 +140,9 @@ class IndentingFormatter(logging.Formatter):
def _color_wrap(*colors): def _color_wrap(*colors):
# type: (*str) -> Callable[[str], str]
def wrapped(inp): def wrapped(inp):
# type: (str) -> str
return "".join(list(colors) + [inp, colorama.Style.RESET_ALL]) return "".join(list(colors) + [inp, colorama.Style.RESET_ALL])
return wrapped return wrapped
@ -156,6 +161,7 @@ class ColorizedStreamHandler(logging.StreamHandler):
COLORS = [] COLORS = []
def __init__(self, stream=None, no_color=None): def __init__(self, stream=None, no_color=None):
# type: (Optional[TextIO], bool) -> None
super().__init__(stream) super().__init__(stream)
self._no_color = no_color self._no_color = no_color
@ -163,16 +169,19 @@ class ColorizedStreamHandler(logging.StreamHandler):
self.stream = colorama.AnsiToWin32(self.stream) self.stream = colorama.AnsiToWin32(self.stream)
def _using_stdout(self): def _using_stdout(self):
# type: () -> bool
""" """
Return whether the handler is using sys.stdout. Return whether the handler is using sys.stdout.
""" """
if WINDOWS and colorama: if WINDOWS and colorama:
# Then self.stream is an AnsiToWin32 object. # Then self.stream is an AnsiToWin32 object.
return self.stream.wrapped is sys.stdout stream = cast(colorama.AnsiToWin32, self.stream)
return stream.wrapped is sys.stdout
return self.stream is sys.stdout return self.stream is sys.stdout
def should_color(self): def should_color(self):
# type: () -> bool
# Don't colorize things if we do not have colorama or if told not to # Don't colorize things if we do not have colorama or if told not to
if not colorama or self._no_color: if not colorama or self._no_color:
return False return False
@ -195,6 +204,7 @@ class ColorizedStreamHandler(logging.StreamHandler):
return False return False
def format(self, record): def format(self, record):
# type: (logging.LogRecord) -> str
msg = logging.StreamHandler.format(self, record) msg = logging.StreamHandler.format(self, record)
if self.should_color(): if self.should_color():
@ -207,12 +217,18 @@ class ColorizedStreamHandler(logging.StreamHandler):
# The logging module says handleError() can be customized. # The logging module says handleError() can be customized.
def handleError(self, record): def handleError(self, record):
# type: (logging.LogRecord) -> None
exc_class, exc = sys.exc_info()[:2] exc_class, exc = sys.exc_info()[:2]
# If a broken pipe occurred while calling write() or flush() on the # If a broken pipe occurred while calling write() or flush() on the
# stdout stream in logging's Handler.emit(), then raise our special # stdout stream in logging's Handler.emit(), then raise our special
# exception so we can handle it in main() instead of logging the # exception so we can handle it in main() instead of logging the
# broken pipe error and continuing. # broken pipe error and continuing.
if exc_class and self._using_stdout() and _is_broken_pipe_error(exc_class, exc): if (
exc_class
and exc
and self._using_stdout()
and _is_broken_pipe_error(exc_class, exc)
):
raise BrokenStdoutLoggingError() raise BrokenStdoutLoggingError()
return super().handleError(record) return super().handleError(record)
@ -220,15 +236,18 @@ class ColorizedStreamHandler(logging.StreamHandler):
class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler): class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
def _open(self): def _open(self):
# type: () -> IO[Any]
ensure_dir(os.path.dirname(self.baseFilename)) ensure_dir(os.path.dirname(self.baseFilename))
return logging.handlers.RotatingFileHandler._open(self) return logging.handlers.RotatingFileHandler._open(self)
class MaxLevelFilter(Filter): class MaxLevelFilter(Filter):
def __init__(self, level): def __init__(self, level):
# type: (int) -> None
self.level = level self.level = level
def filter(self, record): def filter(self, record):
# type: (logging.LogRecord) -> bool
return record.levelno < self.level return record.levelno < self.level
@ -239,12 +258,14 @@ class ExcludeLoggerFilter(Filter):
""" """
def filter(self, record): def filter(self, record):
# type: (logging.LogRecord) -> bool
# The base Filter class allows only records from a logger (or its # The base Filter class allows only records from a logger (or its
# children). # children).
return not super().filter(record) return not super().filter(record)
def setup_logging(verbosity, no_color, user_log_file): def setup_logging(verbosity, no_color, user_log_file):
# type: (int, bool, Optional[str]) -> int
"""Configures and sets up all of the logging """Configures and sets up all of the logging
Returns the requested logging level, as its integer value. Returns the requested logging level, as its integer value.

View File

@ -1,6 +1,5 @@
# The following comment should be removed at some point in the future. # The following comment should be removed at some point in the future.
# mypy: strict-optional=False # mypy: strict-optional=False
# mypy: disallow-untyped-defs=False
import contextlib import contextlib
import errno import errno
@ -16,16 +15,21 @@ import sys
import urllib.parse import urllib.parse
from io import StringIO from io import StringIO
from itertools import filterfalse, tee, zip_longest from itertools import filterfalse, tee, zip_longest
from types import TracebackType
from typing import ( from typing import (
Any, Any,
AnyStr, AnyStr,
BinaryIO,
Callable, Callable,
Container, Container,
ContextManager,
Iterable, Iterable,
Iterator, Iterator,
List, List,
Optional, Optional,
TextIO,
Tuple, Tuple,
Type,
TypeVar, TypeVar,
cast, cast,
) )
@ -64,8 +68,10 @@ __all__ = [
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
VersionInfo = Tuple[int, int, int]
T = TypeVar("T") T = TypeVar("T")
ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
VersionInfo = Tuple[int, int, int]
NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
def get_pip_version(): def get_pip_version():
@ -132,6 +138,7 @@ def rmtree(dir, ignore_errors=False):
def rmtree_errorhandler(func, path, exc_info): def rmtree_errorhandler(func, path, exc_info):
# type: (Callable[..., Any], str, ExcInfo) -> None
"""On Windows, the files in .svn are read-only, so when rmtree() tries to """On Windows, the files in .svn are read-only, so when rmtree() tries to
remove them, an exception is thrown. We catch that here, remove the remove them, an exception is thrown. We catch that here, remove the
read-only attribute, and hopefully continue without problems.""" read-only attribute, and hopefully continue without problems."""
@ -279,6 +286,7 @@ def is_installable_dir(path):
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE):
# type: (BinaryIO, int) -> Iterator[bytes]
"""Yield pieces of data from a file-like object until EOF.""" """Yield pieces of data from a file-like object until EOF."""
while True: while True:
chunk = file.read(size) chunk = file.read(size)
@ -491,19 +499,24 @@ def write_output(msg, *args):
class StreamWrapper(StringIO): class StreamWrapper(StringIO):
orig_stream = None # type: TextIO
@classmethod @classmethod
def from_stream(cls, orig_stream): def from_stream(cls, orig_stream):
# type: (TextIO) -> StreamWrapper
cls.orig_stream = orig_stream cls.orig_stream = orig_stream
return cls() return cls()
# compileall.compile_dir() needs stdout.encoding to print to stdout # compileall.compile_dir() needs stdout.encoding to print to stdout
# https://github.com/python/mypy/issues/4125
@property @property
def encoding(self): def encoding(self): # type: ignore
return self.orig_stream.encoding return self.orig_stream.encoding
@contextlib.contextmanager @contextlib.contextmanager
def captured_output(stream_name): def captured_output(stream_name):
# type: (str) -> Iterator[StreamWrapper]
"""Return a context manager used by captured_stdout/stdin/stderr """Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO. that temporarily replaces the sys stream *stream_name* with a StringIO.
@ -518,6 +531,7 @@ def captured_output(stream_name):
def captured_stdout(): def captured_stdout():
# type: () -> ContextManager[StreamWrapper]
"""Capture the output of sys.stdout: """Capture the output of sys.stdout:
with captured_stdout() as stdout: with captured_stdout() as stdout:
@ -530,6 +544,7 @@ def captured_stdout():
def captured_stderr(): def captured_stderr():
# type: () -> ContextManager[StreamWrapper]
""" """
See captured_stdout(). See captured_stdout().
""" """
@ -538,6 +553,7 @@ def captured_stderr():
# Simulates an enum # Simulates an enum
def enum(*sequential, **named): def enum(*sequential, **named):
# type: (*Any, **Any) -> Type[Any]
enums = dict(zip(sequential, range(len(sequential))), **named) enums = dict(zip(sequential, range(len(sequential))), **named)
reverse = {value: key for key, value in enums.items()} reverse = {value: key for key, value in enums.items()}
enums["reverse_mapping"] = reverse enums["reverse_mapping"] = reverse
@ -579,6 +595,7 @@ def parse_netloc(netloc):
def split_auth_from_netloc(netloc): def split_auth_from_netloc(netloc):
# type: (str) -> NetlocTuple
""" """
Parse out and remove the auth information from a netloc. Parse out and remove the auth information from a netloc.
@ -591,17 +608,20 @@ def split_auth_from_netloc(netloc):
# behaves if more than one @ is present (which can be checked using # behaves if more than one @ is present (which can be checked using
# the password attribute of urlsplit()'s return value). # the password attribute of urlsplit()'s return value).
auth, netloc = netloc.rsplit("@", 1) auth, netloc = netloc.rsplit("@", 1)
pw = None # type: Optional[str]
if ":" in auth: if ":" in auth:
# Split from the left because that's how urllib.parse.urlsplit() # Split from the left because that's how urllib.parse.urlsplit()
# behaves if more than one : is present (which again can be checked # behaves if more than one : is present (which again can be checked
# using the password attribute of the return value) # using the password attribute of the return value)
user_pass = auth.split(":", 1) user, pw = auth.split(":", 1)
else: else:
user_pass = auth, None user, pw = auth, None
user_pass = tuple(None if x is None else urllib.parse.unquote(x) for x in user_pass) user = urllib.parse.unquote(user)
if pw is not None:
pw = urllib.parse.unquote(pw)
return netloc, user_pass return netloc, (user, pw)
def redact_netloc(netloc): def redact_netloc(netloc):
@ -628,6 +648,7 @@ def redact_netloc(netloc):
def _transform_url(url, transform_netloc): def _transform_url(url, transform_netloc):
# type: (str, Callable[[str], Tuple[Any, ...]]) -> Tuple[str, NetlocTuple]
"""Transform and replace netloc in a url. """Transform and replace netloc in a url.
transform_netloc is a function taking the netloc and returning a transform_netloc is a function taking the netloc and returning a
@ -642,14 +663,16 @@ def _transform_url(url, transform_netloc):
# stripped url # stripped url
url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment) url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
surl = urllib.parse.urlunsplit(url_pieces) surl = urllib.parse.urlunsplit(url_pieces)
return surl, netloc_tuple return surl, cast("NetlocTuple", netloc_tuple)
def _get_netloc(netloc): def _get_netloc(netloc):
# type: (str) -> NetlocTuple
return split_auth_from_netloc(netloc) return split_auth_from_netloc(netloc)
def _redact_netloc(netloc): def _redact_netloc(netloc):
# type: (str) -> Tuple[str,]
return (redact_netloc(netloc),) return (redact_netloc(netloc),)
@ -765,6 +788,7 @@ def hash_file(path, blocksize=1 << 20):
def is_wheel_installed(): def is_wheel_installed():
# type: () -> bool
""" """
Return whether the wheel package is installed. Return whether the wheel package is installed.
""" """

View File

@ -1,9 +1,8 @@
"""Utilities for defining models """Utilities for defining models
""" """
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import operator import operator
from typing import Any, Callable, Type
class KeyBasedCompareMixin: class KeyBasedCompareMixin:
@ -12,28 +11,36 @@ class KeyBasedCompareMixin:
__slots__ = ["_compare_key", "_defining_class"] __slots__ = ["_compare_key", "_defining_class"]
def __init__(self, key, defining_class): def __init__(self, key, defining_class):
# type: (Any, Type[KeyBasedCompareMixin]) -> None
self._compare_key = key self._compare_key = key
self._defining_class = defining_class self._defining_class = defining_class
def __hash__(self): def __hash__(self):
# type: () -> int
return hash(self._compare_key) return hash(self._compare_key)
def __lt__(self, other): def __lt__(self, other):
# type: (Any) -> bool
return self._compare(other, operator.__lt__) return self._compare(other, operator.__lt__)
def __le__(self, other): def __le__(self, other):
# type: (Any) -> bool
return self._compare(other, operator.__le__) return self._compare(other, operator.__le__)
def __gt__(self, other): def __gt__(self, other):
# type: (Any) -> bool
return self._compare(other, operator.__gt__) return self._compare(other, operator.__gt__)
def __ge__(self, other): def __ge__(self, other):
# type: (Any) -> bool
return self._compare(other, operator.__ge__) return self._compare(other, operator.__ge__)
def __eq__(self, other): def __eq__(self, other):
# type: (Any) -> bool
return self._compare(other, operator.__eq__) return self._compare(other, operator.__eq__)
def _compare(self, other, method): def _compare(self, other, method):
# type: (Any, Callable[[Any, Any], bool]) -> bool
if not isinstance(other, self._defining_class): if not isinstance(other, self._defining_class):
return NotImplemented return NotImplemented

View File

@ -1,17 +1,16 @@
# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
import os import os
import shutil import shutil
import subprocess import subprocess
import sys import sys
from glob import glob from glob import glob
from typing import List
VIRTUAL_ENV = os.environ['VIRTUAL_ENV'] VIRTUAL_ENV = os.environ['VIRTUAL_ENV']
TOX_PIP_DIR = os.path.join(VIRTUAL_ENV, 'pip') TOX_PIP_DIR = os.path.join(VIRTUAL_ENV, 'pip')
def pip(args): def pip(args):
# type: (List[str]) -> None
# First things first, get a recent (stable) version of pip. # First things first, get a recent (stable) version of pip.
if not os.path.exists(TOX_PIP_DIR): if not os.path.exists(TOX_PIP_DIR):
subprocess.check_call([sys.executable, '-m', 'pip', subprocess.check_call([sys.executable, '-m', 'pip',
@ -20,8 +19,8 @@ def pip(args):
'pip']) 'pip'])
shutil.rmtree(glob(os.path.join(TOX_PIP_DIR, 'pip-*.dist-info'))[0]) shutil.rmtree(glob(os.path.join(TOX_PIP_DIR, 'pip-*.dist-info'))[0])
# And use that version. # And use that version.
pypath = os.environ.get('PYTHONPATH') pypath_env = os.environ.get('PYTHONPATH')
pypath = pypath.split(os.pathsep) if pypath is not None else [] pypath = pypath_env.split(os.pathsep) if pypath_env is not None else []
pypath.insert(0, TOX_PIP_DIR) pypath.insert(0, TOX_PIP_DIR)
os.environ['PYTHONPATH'] = os.pathsep.join(pypath) os.environ['PYTHONPATH'] = os.pathsep.join(pypath)
subprocess.check_call([sys.executable, '-m', 'pip'] + args) subprocess.check_call([sys.executable, '-m', 'pip'] + args)