mirror of
https://github.com/pypa/pip
synced 2023-12-13 21:30:23 +01:00
187 lines
6.2 KiB
Python
187 lines
6.2 KiB
Python
"""Logging
|
|
"""
|
|
|
|
import sys
|
|
import logging
|
|
|
|
from pip import backwardcompat
|
|
|
|
|
|
class Logger(object):
|
|
"""
|
|
Logging object for use in command-line script. Allows ranges of
|
|
levels, to avoid some redundancy of displayed information.
|
|
"""
|
|
VERBOSE_DEBUG = logging.DEBUG - 1
|
|
DEBUG = logging.DEBUG
|
|
INFO = logging.INFO
|
|
NOTIFY = (logging.INFO + logging.WARN) / 2
|
|
WARN = WARNING = logging.WARN
|
|
ERROR = logging.ERROR
|
|
FATAL = logging.FATAL
|
|
|
|
LEVELS = [VERBOSE_DEBUG, DEBUG, INFO, NOTIFY, WARN, ERROR, FATAL]
|
|
|
|
def __init__(self):
|
|
self.consumers = []
|
|
self.indent = 0
|
|
self.explicit_levels = False
|
|
self.in_progress = None
|
|
self.in_progress_hanging = False
|
|
|
|
def debug(self, msg, *args, **kw):
|
|
self.log(self.DEBUG, msg, *args, **kw)
|
|
|
|
def info(self, msg, *args, **kw):
|
|
self.log(self.INFO, msg, *args, **kw)
|
|
|
|
def notify(self, msg, *args, **kw):
|
|
self.log(self.NOTIFY, msg, *args, **kw)
|
|
|
|
def warn(self, msg, *args, **kw):
|
|
self.log(self.WARN, msg, *args, **kw)
|
|
|
|
def error(self, msg, *args, **kw):
|
|
self.log(self.WARN, msg, *args, **kw)
|
|
|
|
def fatal(self, msg, *args, **kw):
|
|
self.log(self.FATAL, msg, *args, **kw)
|
|
|
|
def log(self, level, msg, *args, **kw):
|
|
if args:
|
|
if kw:
|
|
raise TypeError(
|
|
"You may give positional or keyword arguments, not both")
|
|
args = args or kw
|
|
rendered = None
|
|
for consumer_level, consumer in self.consumers:
|
|
if self.level_matches(level, consumer_level):
|
|
if (self.in_progress_hanging
|
|
and consumer in (sys.stdout, sys.stderr)):
|
|
self.in_progress_hanging = False
|
|
sys.stdout.write('\n')
|
|
sys.stdout.flush()
|
|
if rendered is None:
|
|
if args:
|
|
rendered = msg % args
|
|
else:
|
|
rendered = msg
|
|
rendered = ' ' * self.indent + rendered
|
|
if self.explicit_levels:
|
|
## FIXME: should this be a name, not a level number?
|
|
rendered = '%02i %s' % (level, rendered)
|
|
if hasattr(consumer, 'write'):
|
|
rendered += '\n'
|
|
backwardcompat.fwrite(consumer, rendered)
|
|
else:
|
|
consumer(rendered)
|
|
|
|
def _show_progress(self):
|
|
"""Should we display download progress?"""
|
|
return (self.stdout_level_matches(self.NOTIFY) and sys.stdout.isatty())
|
|
|
|
def start_progress(self, msg):
|
|
assert not self.in_progress, (
|
|
"Tried to start_progress(%r) while in_progress %r"
|
|
% (msg, self.in_progress))
|
|
if self._show_progress():
|
|
sys.stdout.write(' ' * self.indent + msg)
|
|
sys.stdout.flush()
|
|
self.in_progress_hanging = True
|
|
else:
|
|
self.in_progress_hanging = False
|
|
self.in_progress = msg
|
|
self.last_message = None
|
|
|
|
def end_progress(self, msg='done.'):
|
|
assert self.in_progress, (
|
|
"Tried to end_progress without start_progress")
|
|
if self._show_progress():
|
|
if not self.in_progress_hanging:
|
|
# Some message has been printed out since start_progress
|
|
sys.stdout.write('...' + self.in_progress + msg + '\n')
|
|
sys.stdout.flush()
|
|
else:
|
|
# These erase any messages shown with show_progress (besides .'s)
|
|
logger.show_progress('')
|
|
logger.show_progress('')
|
|
sys.stdout.write(msg + '\n')
|
|
sys.stdout.flush()
|
|
self.in_progress = None
|
|
self.in_progress_hanging = False
|
|
|
|
def show_progress(self, message=None):
|
|
"""If we are in a progress scope, and no log messages have been
|
|
shown, write out another '.'"""
|
|
if self.in_progress_hanging:
|
|
if message is None:
|
|
sys.stdout.write('.')
|
|
sys.stdout.flush()
|
|
else:
|
|
if self.last_message:
|
|
padding = ' ' * max(0, len(self.last_message) - len(message))
|
|
else:
|
|
padding = ''
|
|
sys.stdout.write('\r%s%s%s%s' %
|
|
(' ' * self.indent, self.in_progress, message, padding))
|
|
sys.stdout.flush()
|
|
self.last_message = message
|
|
|
|
def stdout_level_matches(self, level):
|
|
"""Returns true if a message at this level will go to stdout"""
|
|
return self.level_matches(level, self._stdout_level())
|
|
|
|
def _stdout_level(self):
|
|
"""Returns the level that stdout runs at"""
|
|
for level, consumer in self.consumers:
|
|
if consumer is sys.stdout:
|
|
return level
|
|
return self.FATAL
|
|
|
|
def level_matches(self, level, consumer_level):
|
|
"""
|
|
>>> l = Logger()
|
|
>>> l.level_matches(3, 4)
|
|
False
|
|
>>> l.level_matches(3, 2)
|
|
True
|
|
>>> l.level_matches(slice(None, 3), 3)
|
|
False
|
|
>>> l.level_matches(slice(None, 3), 2)
|
|
True
|
|
>>> l.level_matches(slice(1, 3), 1)
|
|
True
|
|
>>> l.level_matches(slice(2, 3), 1)
|
|
False
|
|
"""
|
|
if isinstance(level, slice):
|
|
start, stop = level.start, level.stop
|
|
if start is not None and start > consumer_level:
|
|
return False
|
|
if stop is not None or stop <= consumer_level:
|
|
return False
|
|
return True
|
|
else:
|
|
return level >= consumer_level
|
|
|
|
@classmethod
|
|
def level_for_integer(cls, level):
|
|
levels = cls.LEVELS
|
|
if level < 0:
|
|
return levels[0]
|
|
if level >= len(levels):
|
|
return levels[-1]
|
|
return levels[level]
|
|
|
|
def move_stdout_to_stderr(self):
|
|
to_remove = []
|
|
to_add = []
|
|
for consumer_level, consumer in self.consumers:
|
|
if consumer == sys.stdout:
|
|
to_remove.append((consumer_level, consumer))
|
|
to_add.append((consumer_level, sys.stderr))
|
|
for item in to_remove:
|
|
self.consumers.remove(item)
|
|
self.consumers.extend(to_add)
|
|
|
|
logger = Logger()
|