pip/pip/basecommand.py

317 lines
11 KiB
Python
Raw Normal View History

"""Base Command class, and related routines"""
from __future__ import absolute_import
import logging
import os
import sys
import traceback
import optparse
import warnings
from pip._vendor.six import StringIO
from pip import cmdoptions
from pip.locations import running_under_virtualenv
2013-08-16 14:04:27 +02:00
from pip.download import PipSession
from pip.exceptions import (BadCommand, InstallationError, UninstallationError,
CommandError, PreviousBuildDirError)
from pip.compat import logging_dictConfig
from pip.baseparser import ConfigOptionParser, UpdatingDefaultsHelpFormatter
from pip.req import InstallRequirement, parse_requirements
from pip.status_codes import (
SUCCESS, ERROR, UNKNOWN_ERROR, VIRTUALENV_NOT_FOUND,
PREVIOUS_BUILD_DIR_ERROR,
)
2015-05-21 23:32:44 +02:00
from pip.utils import get_prog, normalize_path
from pip.utils.deprecation import RemovedInPip8Warning
from pip.utils.logging import IndentingFormatter
from pip.utils.outdated import pip_version_check
__all__ = ['Command']
logger = logging.getLogger(__name__)
class Command(object):
name = None
usage = None
hidden = False
log_streams = ("ext://sys.stdout", "ext://sys.stderr")
def __init__(self, isolated=False):
parser_kw = {
'usage': self.usage,
'prog': '%s %s' % (get_prog(), self.name),
'formatter': UpdatingDefaultsHelpFormatter(),
'add_help_option': False,
'name': self.name,
'description': self.__doc__,
'isolated': isolated,
}
self.parser = ConfigOptionParser(**parser_kw)
# Commands should add options to this option group
optgroup_name = '%s Options' % self.name.capitalize()
self.cmd_opts = optparse.OptionGroup(self.parser, optgroup_name)
# Add the general options
gen_opts = cmdoptions.make_option_group(
cmdoptions.general_group,
self.parser,
)
self.parser.add_option_group(gen_opts)
def _build_session(self, options, retries=None, timeout=None):
session = PipSession(
2014-08-31 22:01:01 +02:00
cache=(
normalize_path(os.path.join(options.cache_dir, "http"))
if options.cache_dir else None
),
retries=retries if retries is not None else options.retries,
insecure_hosts=options.trusted_hosts,
)
2013-08-16 14:04:27 +02:00
# Handle custom ca-bundles from the user
if options.cert:
session.verify = options.cert
# Handle SSL client certificate
if options.client_cert:
session.cert = options.client_cert
2013-08-16 14:04:27 +02:00
# Handle timeouts
if options.timeout or timeout:
session.timeout = (
timeout if timeout is not None else options.timeout
)
2013-08-16 14:04:27 +02:00
# Handle configured proxies
if options.proxy:
session.proxies = {
"http": options.proxy,
"https": options.proxy,
}
# Determine if we can prompt the user for authentication or not
session.auth.prompting = not options.no_input
return session
def parse_args(self, args):
# factored out for testability
return self.parser.parse_args(args)
def main(self, args):
options, args = self.parse_args(args)
if options.quiet:
if options.quiet == 1:
level = "WARNING"
if options.quiet == 2:
level = "ERROR"
else:
level = "CRITICAL"
elif options.verbose:
level = "DEBUG"
else:
level = "INFO"
logging_dictConfig({
"version": 1,
"disable_existing_loggers": False,
"filters": {
"exclude_warnings": {
"()": "pip.utils.logging.MaxLevelFilter",
"level": logging.WARNING,
},
},
"formatters": {
"indent": {
"()": IndentingFormatter,
"format": (
"%(message)s"
if not options.log_explicit_levels
else "[%(levelname)s] %(message)s"
),
},
},
"handlers": {
"console": {
"level": level,
"class": "pip.utils.logging.ColorizedStreamHandler",
"stream": self.log_streams[0],
"filters": ["exclude_warnings"],
"formatter": "indent",
},
"console_errors": {
"level": "WARNING",
"class": "pip.utils.logging.ColorizedStreamHandler",
"stream": self.log_streams[1],
"formatter": "indent",
},
"user_log": {
"level": "DEBUG",
"class": "pip.utils.logging.BetterRotatingFileHandler",
"filename": options.log or "/dev/null",
"delay": True,
"formatter": "indent",
},
},
"root": {
"level": level,
"handlers": list(filter(None, [
"console",
"console_errors",
"user_log" if options.log else None,
])),
},
# Disable any logging besides WARNING unless we have DEBUG level
# logging enabled. These use both pip._vendor and the bare names
# for the case where someone unbundles our libraries.
"loggers": dict(
(
name,
{
"level": (
"WARNING"
if level in ["INFO", "ERROR"]
else "DEBUG"
),
},
)
for name in ["pip._vendor", "distlib", "requests", "urllib3"]
),
})
if options.log_explicit_levels:
warnings.warn(
"--log-explicit-levels has been deprecated and will be removed"
" in a future version.",
RemovedInPip8Warning,
)
2014-03-26 23:24:19 +01:00
# TODO: try to get these passing down from the command?
2013-02-19 04:38:26 +01:00
# without resorting to os.environ to hold these.
2011-10-04 16:06:34 +02:00
if options.no_input:
os.environ['PIP_NO_INPUT'] = '1'
2011-10-04 16:10:46 +02:00
if options.exists_action:
os.environ['PIP_EXISTS_ACTION'] = ' '.join(options.exists_action)
if options.require_venv:
# If a venv is required check if it can really be found
if not running_under_virtualenv():
logger.critical(
'Could not find an activated virtualenv (required).'
)
sys.exit(VIRTUALENV_NOT_FOUND)
# Check if we're using the latest version of pip available
2015-02-24 13:46:10 +01:00
if (not options.disable_pip_version_check and not
getattr(options, "no_index", False)):
with self._build_session(
options,
retries=0,
timeout=min(5, options.timeout)) as session:
pip_version_check(session)
try:
status = self.run(options, args)
# FIXME: all commands should return an exit status
# and when it is done, isinstance is not needed anymore
if isinstance(status, int):
return status
except PreviousBuildDirError as exc:
logger.critical(str(exc))
logger.debug('Exception information:\n%s', format_exc())
return PREVIOUS_BUILD_DIR_ERROR
except (InstallationError, UninstallationError, BadCommand) as exc:
logger.critical(str(exc))
logger.debug('Exception information:\n%s', format_exc())
return ERROR
except CommandError as exc:
logger.critical('ERROR: %s', exc)
logger.debug('Exception information:\n%s', format_exc())
return ERROR
except KeyboardInterrupt:
logger.critical('Operation cancelled by user')
logger.debug('Exception information:\n%s', format_exc())
return ERROR
except:
logger.critical('Exception:\n%s', format_exc())
return UNKNOWN_ERROR
return SUCCESS
class RequirementCommand(Command):
@staticmethod
def populate_requirement_set(requirement_set, args, options, finder,
session, name, wheel_cache):
"""
Marshal cmd line args into a requirement set.
"""
for filename in options.constraints:
for req in parse_requirements(
filename,
constraint=True, finder=finder, options=options,
session=session, wheel_cache=wheel_cache):
requirement_set.add_requirement(req)
for req in args:
requirement_set.add_requirement(
InstallRequirement.from_line(
req, None, isolated=options.isolated_mode,
wheel_cache=wheel_cache
)
)
for req in options.editables:
requirement_set.add_requirement(
InstallRequirement.from_editable(
req,
default_vcs=options.default_vcs,
isolated=options.isolated_mode,
wheel_cache=wheel_cache
)
)
found_req_in_file = False
for filename in options.requirements:
for req in parse_requirements(
filename,
finder=finder, options=options, session=session,
wheel_cache=wheel_cache):
found_req_in_file = True
requirement_set.add_requirement(req)
if not (args or options.editables or found_req_in_file):
opts = {'name': name}
if options.find_links:
msg = ('You must give at least one requirement to '
'%(name)s (maybe you meant "pip %(name)s '
'%(links)s"?)' %
dict(opts, links=' '.join(options.find_links)))
else:
msg = ('You must give at least one requirement '
'to %(name)s (see "pip help %(name)s")' % opts)
logger.warning(msg)
def format_exc(exc_info=None):
if exc_info is None:
exc_info = sys.exc_info()
out = StringIO()
traceback.print_exception(*exc_info, **dict(file=out))
return out.getvalue()