pip/pip/basecommand.py

240 lines
7.6 KiB
Python

"""Base Command class, and related routines"""
from __future__ import absolute_import
import logging
import os
import sys
import traceback
import optparse
import warnings
from pip import appdirs, cmdoptions
from pip.locations import running_under_virtualenv
from pip.download import PipSession
from pip.exceptions import (BadCommand, InstallationError, UninstallationError,
CommandError, PreviousBuildDirError)
from pip.compat import StringIO, logging_dictConfig
from pip.baseparser import ConfigOptionParser, UpdatingDefaultsHelpFormatter
from pip.status_codes import (
SUCCESS, ERROR, UNKNOWN_ERROR, VIRTUALENV_NOT_FOUND,
PREVIOUS_BUILD_DIR_ERROR,
)
from pip.utils import get_prog, normalize_path
from pip.utils.deprecation import RemovedInPip18Warning
from pip.utils.logging import IndentingFormatter
__all__ = ['Command']
logger = logging.getLogger(__name__)
class Command(object):
name = None
usage = None
hidden = False
log_stream = "ext://sys.stdout"
def __init__(self):
parser_kw = {
'usage': self.usage,
'prog': '%s %s' % (get_prog(), self.name),
'formatter': UpdatingDefaultsHelpFormatter(),
'add_help_option': False,
'name': self.name,
'description': self.__doc__,
}
self.parser = ConfigOptionParser(**parser_kw)
# Commands should add options to this option group
optgroup_name = '%s Options' % self.name.capitalize()
self.cmd_opts = optparse.OptionGroup(self.parser, optgroup_name)
# Add the general options
gen_opts = cmdoptions.make_option_group(
cmdoptions.general_group,
self.parser,
)
self.parser.add_option_group(gen_opts)
def _build_session(self, options):
session = PipSession(
cache=(
normalize_path(os.path.join(options.cache_dir, "http"))
if options.cache_dir else None
),
retries=options.retries,
)
# Handle custom ca-bundles from the user
if options.cert:
session.verify = options.cert
elif options.no_check_certificate:
session.verify = False
# Handle SSL client certificate
if options.client_cert:
session.cert = options.client_cert
# Handle timeouts
if options.timeout:
session.timeout = options.timeout
# Handle configured proxies
if options.proxy:
session.proxies = {
"http": options.proxy,
"https": options.proxy,
}
# Determine if we can prompt the user for authentication or not
session.auth.prompting = not options.no_input
return session
def parse_args(self, args):
# factored out for testability
return self.parser.parse_args(args)
def main(self, args):
options, args = self.parse_args(args)
if options.quiet:
level = "WARNING"
elif options.verbose:
level = "DEBUG"
else:
level = "INFO"
logging_dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"indent": {
"()": IndentingFormatter,
"format": (
"%(message)s"
if not options.log_explicit_levels
else "[%(levelname)s] %(message)s"
),
},
},
"handlers": {
"console": {
"level": level,
"class": "pip.utils.logging.ColorizedStreamHandler",
"stream": self.log_stream,
"formatter": "indent",
},
"debug_log": {
"level": "DEBUG",
"class": "pip.utils.logging.BetterRotatingFileHandler",
"filename": os.path.join(
appdirs.user_log_dir("pip"), "debug.log",
),
"maxBytes": 10 * 1000 * 1000, # 10 MB
"backupCount": 1,
"delay": True,
"formatter": "indent",
},
"user_log": {
"level": "DEBUG",
"class": "pip.utils.logging.BetterRotatingFileHandler",
"filename": options.log or "/dev/null",
"delay": True,
"formatter": "indent",
},
},
"root": {
"level": level,
"handlers": list(filter(None, [
"console",
"debug_log",
"user_log" if options.log else None,
])),
},
# Disable any logging besides WARNING unless we have DEBUG level
# logging enabled. These use both pip._vendor and the bare names
# for the case where someone unbundles our libraries.
"loggers": dict(
(
name,
{
"level": (
"WARNING"
if level in ["INFO", "ERROR"]
else "DEBUG"
),
},
)
for name in ["pip._vendor", "distlib", "requests", "urllib3"]
),
})
if options.log_explicit_levels:
warnings.warn(
"--log-explicit-levels has been deprecated and will be removed"
" in a future version.",
RemovedInPip18Warning,
)
# TODO: try to get these passing down from the command?
# without resorting to os.environ to hold these.
if options.no_input:
os.environ['PIP_NO_INPUT'] = '1'
if options.exists_action:
os.environ['PIP_EXISTS_ACTION'] = ' '.join(options.exists_action)
if options.require_venv:
# If a venv is required check if it can really be found
if not running_under_virtualenv():
logger.critical(
'Could not find an activated virtualenv (required).'
)
sys.exit(VIRTUALENV_NOT_FOUND)
try:
status = self.run(options, args)
# FIXME: all commands should return an exit status
# and when it is done, isinstance is not needed anymore
if isinstance(status, int):
return status
except PreviousBuildDirError as exc:
logger.critical(str(exc))
logger.debug('Exception information:\n%s', format_exc())
return PREVIOUS_BUILD_DIR_ERROR
except (InstallationError, UninstallationError, BadCommand) as exc:
logger.critical(str(exc))
logger.debug('Exception information:\n%s', format_exc())
return ERROR
except CommandError as exc:
logger.critical('ERROR: %s', exc)
logger.debug('Exception information:\n%s', format_exc())
return ERROR
except KeyboardInterrupt:
logger.critical('Operation cancelled by user')
logger.debug('Exception information:\n%s', format_exc())
return ERROR
except:
logger.critical('Exception:\n%s', format_exc())
return UNKNOWN_ERROR
return SUCCESS
def format_exc(exc_info=None):
if exc_info is None:
exc_info = sys.exc_info()
out = StringIO()
traceback.print_exception(*exc_info, **dict(file=out))
return out.getvalue()