sphinx.util.logging 源代码

"""
    sphinx.util.logging
    ~~~~~~~~~~~~~~~~~~~

    Logging utility functions for Sphinx.

    :copyright: Copyright 2007-2021 by the Sphinx team, see AUTHORS.
    :license: BSD, see LICENSE for details.
"""

import logging
import logging.handlers
from collections import defaultdict
from contextlib import contextmanager
from typing import IO, TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union

from docutils import nodes
from docutils.nodes import Node
from docutils.utils import get_source_line

from sphinx.errors import SphinxWarning
from sphinx.util.console import colorize

if TYPE_CHECKING:
    from sphinx.application import Sphinx


NAMESPACE = 'sphinx'
VERBOSE = 15

LEVEL_NAMES: Dict[str, int] = defaultdict(lambda: logging.WARNING)
LEVEL_NAMES.update({
    'CRITICAL': logging.CRITICAL,
    'SEVERE': logging.CRITICAL,
    'ERROR': logging.ERROR,
    'WARNING': logging.WARNING,
    'INFO': logging.INFO,
    'VERBOSE': VERBOSE,
    'DEBUG': logging.DEBUG,
})

VERBOSITY_MAP: Dict[int, int] = defaultdict(lambda: 0)
VERBOSITY_MAP.update({
    0: logging.INFO,
    1: VERBOSE,
    2: logging.DEBUG,
})

COLOR_MAP = defaultdict(lambda: 'blue',
                        {
                            logging.ERROR: 'darkred',
                            logging.WARNING: 'red',
                            logging.DEBUG: 'darkgray'
                        })


[文档]def getLogger(name: str) -> "SphinxLoggerAdapter": """Get logger wrapped by :class:`sphinx.util.logging.SphinxLoggerAdapter`. Sphinx logger always uses ``sphinx.*`` namespace to be independent from settings of root logger. It ensures logging is consistent even if a third-party extension or imported application resets logger settings. Example usage:: >>> from sphinx.util import logging >>> logger = logging.getLogger(__name__) >>> logger.info('Hello, this is an extension!') Hello, this is an extension! """ # add sphinx prefix to name forcely logger = logging.getLogger(NAMESPACE + '.' + name) # Forcely enable logger logger.disabled = False # wrap logger by SphinxLoggerAdapter return SphinxLoggerAdapter(logger, {})
def convert_serializable(records: List[logging.LogRecord]) -> None: """Convert LogRecord serializable.""" for r in records: # extract arguments to a message and clear them r.msg = r.getMessage() r.args = () location = getattr(r, 'location', None) if isinstance(location, nodes.Node): r.location = get_node_location(location) # type: ignore class SphinxLogRecord(logging.LogRecord): """Log record class supporting location""" prefix = '' location: Any = None def getMessage(self) -> str: message = super().getMessage() location = getattr(self, 'location', None) if location: message = '%s: %s%s' % (location, self.prefix, message) elif self.prefix not in message: message = self.prefix + message return message class SphinxInfoLogRecord(SphinxLogRecord): """Info log record class supporting location""" prefix = '' # do not show any prefix for INFO messages class SphinxWarningLogRecord(SphinxLogRecord): """Warning log record class supporting location""" prefix = 'WARNING: '
[文档]class SphinxLoggerAdapter(logging.LoggerAdapter): """LoggerAdapter allowing ``type`` and ``subtype`` keywords.""" KEYWORDS = ['type', 'subtype', 'location', 'nonl', 'color', 'once']
[文档] def log(self, level: Union[int, str], msg: str, *args: Any, **kwargs: Any) -> None: if isinstance(level, int): super().log(level, msg, *args, **kwargs) else: levelno = LEVEL_NAMES[level] super().log(levelno, msg, *args, **kwargs)
[文档] def verbose(self, msg: str, *args: Any, **kwargs: Any) -> None: self.log(VERBOSE, msg, *args, **kwargs)
def process(self, msg: str, kwargs: Dict) -> Tuple[str, Dict]: # type: ignore extra = kwargs.setdefault('extra', {}) for keyword in self.KEYWORDS: if keyword in kwargs: extra[keyword] = kwargs.pop(keyword) return msg, kwargs def handle(self, record: logging.LogRecord) -> None: self.logger.handle(record)
class WarningStreamHandler(logging.StreamHandler): """StreamHandler for warnings.""" pass class NewLineStreamHandler(logging.StreamHandler): """StreamHandler which switches line terminator by record.nonl flag.""" def emit(self, record: logging.LogRecord) -> None: try: self.acquire() if getattr(record, 'nonl', False): # skip appending terminator when nonl=True self.terminator = '' super().emit(record) finally: self.terminator = '\n' self.release() class MemoryHandler(logging.handlers.BufferingHandler): """Handler buffering all logs.""" buffer: List[logging.LogRecord] def __init__(self) -> None: super().__init__(-1) def shouldFlush(self, record: logging.LogRecord) -> bool: return False # never flush def flushTo(self, logger: logging.Logger) -> None: self.acquire() try: for record in self.buffer: logger.handle(record) self.buffer = [] finally: self.release() def clear(self) -> List[logging.LogRecord]: buffer, self.buffer = self.buffer, [] return buffer
[文档]@contextmanager def pending_warnings() -> Generator[logging.Handler, None, None]: """Context manager to postpone logging warnings temporarily. Similar to :func:`pending_logging`. """ logger = logging.getLogger(NAMESPACE) memhandler = MemoryHandler() memhandler.setLevel(logging.WARNING) try: handlers = [] for handler in logger.handlers[:]: if isinstance(handler, WarningStreamHandler): logger.removeHandler(handler) handlers.append(handler) logger.addHandler(memhandler) yield memhandler finally: logger.removeHandler(memhandler) for handler in handlers: logger.addHandler(handler) memhandler.flushTo(logger)
@contextmanager def suppress_logging() -> Generator[MemoryHandler, None, None]: """Context manager to suppress logging all logs temporarily. For example:: >>> with suppress_logging(): >>> logger.warning('Warning message!') # suppressed >>> some_long_process() >>> """ logger = logging.getLogger(NAMESPACE) memhandler = MemoryHandler() try: handlers = [] for handler in logger.handlers[:]: logger.removeHandler(handler) handlers.append(handler) logger.addHandler(memhandler) yield memhandler finally: logger.removeHandler(memhandler) for handler in handlers: logger.addHandler(handler)
[文档]@contextmanager def pending_logging() -> Generator[MemoryHandler, None, None]: """Context manager to postpone logging all logs temporarily. For example:: >>> with pending_logging(): >>> logger.warning('Warning message!') # not flushed yet >>> some_long_process() >>> Warning message! # the warning is flushed here """ logger = logging.getLogger(NAMESPACE) try: with suppress_logging() as memhandler: yield memhandler finally: memhandler.flushTo(logger)
@contextmanager def skip_warningiserror(skip: bool = True) -> Generator[None, None, None]: """Context manager to skip WarningIsErrorFilter temporarily.""" logger = logging.getLogger(NAMESPACE) if skip is False: yield else: try: disabler = DisableWarningIsErrorFilter() for handler in logger.handlers: # use internal method; filters.insert() directly to install disabler # before WarningIsErrorFilter handler.filters.insert(0, disabler) yield finally: for handler in logger.handlers: handler.removeFilter(disabler)
[文档]@contextmanager def prefixed_warnings(prefix: str) -> Generator[None, None, None]: """Context manager to prepend prefix to all warning log records temporarily. For example:: >>> with prefixed_warnings("prefix:"): >>> logger.warning('Warning message!') # => prefix: Warning message! .. versionadded:: 2.0 """ logger = logging.getLogger(NAMESPACE) warning_handler = None for handler in logger.handlers: if isinstance(handler, WarningStreamHandler): warning_handler = handler break else: # warning stream not found yield return prefix_filter = None for _filter in warning_handler.filters: if isinstance(_filter, MessagePrefixFilter): prefix_filter = _filter break if prefix_filter: # already prefixed try: previous = prefix_filter.prefix prefix_filter.prefix = prefix yield finally: prefix_filter.prefix = previous else: # not prefixed yet prefix_filter = MessagePrefixFilter(prefix) try: warning_handler.addFilter(prefix_filter) yield finally: warning_handler.removeFilter(prefix_filter)
class LogCollector: def __init__(self) -> None: self.logs: List[logging.LogRecord] = [] @contextmanager def collect(self) -> Generator[None, None, None]: with pending_logging() as memhandler: yield self.logs = memhandler.clear() class InfoFilter(logging.Filter): """Filter error and warning messages.""" def filter(self, record: logging.LogRecord) -> bool: if record.levelno < logging.WARNING: return True else: return False def is_suppressed_warning(type: str, subtype: str, suppress_warnings: List[str]) -> bool: """Check whether the warning is suppressed or not.""" if type is None: return False subtarget: Optional[str] for warning_type in suppress_warnings: if '.' in warning_type: target, subtarget = warning_type.split('.', 1) else: target, subtarget = warning_type, None if target == type: if (subtype is None or subtarget is None or subtarget == subtype or subtarget == '*'): return True return False class WarningSuppressor(logging.Filter): """Filter logs by `suppress_warnings`.""" def __init__(self, app: "Sphinx") -> None: self.app = app super().__init__() def filter(self, record: logging.LogRecord) -> bool: type = getattr(record, 'type', None) subtype = getattr(record, 'subtype', None) try: suppress_warnings = self.app.config.suppress_warnings except AttributeError: # config is not initialized yet (ex. in conf.py) suppress_warnings = [] if is_suppressed_warning(type, subtype, suppress_warnings): return False else: self.app._warncount += 1 return True class WarningIsErrorFilter(logging.Filter): """Raise exception if warning emitted.""" def __init__(self, app: "Sphinx") -> None: self.app = app super().__init__() def filter(self, record: logging.LogRecord) -> bool: if getattr(record, 'skip_warningsiserror', False): # disabled by DisableWarningIsErrorFilter return True elif self.app.warningiserror: location = getattr(record, 'location', '') try: message = record.msg % record.args except (TypeError, ValueError): message = record.msg # use record.msg itself if location: exc = SphinxWarning(location + ":" + str(message)) else: exc = SphinxWarning(message) if record.exc_info is not None: raise exc from record.exc_info[1] else: raise exc else: return True class DisableWarningIsErrorFilter(logging.Filter): """Disable WarningIsErrorFilter if this filter installed.""" def filter(self, record: logging.LogRecord) -> bool: record.skip_warningsiserror = True # type: ignore return True class MessagePrefixFilter(logging.Filter): """Prepend prefix to all log records.""" def __init__(self, prefix: str) -> None: self.prefix = prefix super().__init__() def filter(self, record: logging.LogRecord) -> bool: if self.prefix: record.msg = self.prefix + ' ' + record.msg return True class OnceFilter(logging.Filter): """Show the message only once.""" def __init__(self, name: str = '') -> None: super().__init__(name) self.messages: Dict[str, List] = {} def filter(self, record: logging.LogRecord) -> bool: once = getattr(record, 'once', '') if not once: return True else: params = self.messages.setdefault(record.msg, []) if record.args in params: return False params.append(record.args) return True class SphinxLogRecordTranslator(logging.Filter): """Converts a log record to one Sphinx expects * Make a instance of SphinxLogRecord * docname to path if location given """ LogRecordClass: Type[logging.LogRecord] def __init__(self, app: "Sphinx") -> None: self.app = app super().__init__() def filter(self, record: SphinxWarningLogRecord) -> bool: # type: ignore if isinstance(record, logging.LogRecord): # force subclassing to handle location record.__class__ = self.LogRecordClass # type: ignore location = getattr(record, 'location', None) if isinstance(location, tuple): docname, lineno = location if docname and lineno: record.location = '%s:%s' % (self.app.env.doc2path(docname), lineno) elif docname: record.location = '%s' % self.app.env.doc2path(docname) else: record.location = None elif isinstance(location, nodes.Node): record.location = get_node_location(location) elif location and ':' not in location: record.location = '%s' % self.app.env.doc2path(location) return True class InfoLogRecordTranslator(SphinxLogRecordTranslator): """LogRecordTranslator for INFO level log records.""" LogRecordClass = SphinxInfoLogRecord class WarningLogRecordTranslator(SphinxLogRecordTranslator): """LogRecordTranslator for WARNING level log records.""" LogRecordClass = SphinxWarningLogRecord def get_node_location(node: Node) -> Optional[str]: (source, line) = get_source_line(node) if source and line: return "%s:%s" % (source, line) elif source: return "%s:" % source elif line: return "<unknown>:%s" % line else: return None class ColorizeFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: message = super().format(record) color = getattr(record, 'color', None) if color is None: color = COLOR_MAP.get(record.levelno) if color: return colorize(color, message) else: return message class SafeEncodingWriter: """Stream writer which ignores UnicodeEncodeError silently""" def __init__(self, stream: IO) -> None: self.stream = stream self.encoding = getattr(stream, 'encoding', 'ascii') or 'ascii' def write(self, data: str) -> None: try: self.stream.write(data) except UnicodeEncodeError: # stream accept only str, not bytes. So, we encode and replace # non-encodable characters, then decode them. self.stream.write(data.encode(self.encoding, 'replace').decode(self.encoding)) def flush(self) -> None: if hasattr(self.stream, 'flush'): self.stream.flush() class LastMessagesWriter: """Stream writer storing last 10 messages in memory to save trackback""" def __init__(self, app: "Sphinx", stream: IO) -> None: self.app = app def write(self, data: str) -> None: self.app.messagelog.append(data) def setup(app: "Sphinx", status: IO, warning: IO) -> None: """Setup root logger for Sphinx""" logger = logging.getLogger(NAMESPACE) logger.setLevel(logging.DEBUG) logger.propagate = False # clear all handlers for handler in logger.handlers[:]: logger.removeHandler(handler) info_handler = NewLineStreamHandler(SafeEncodingWriter(status)) info_handler.addFilter(InfoFilter()) info_handler.addFilter(InfoLogRecordTranslator(app)) info_handler.setLevel(VERBOSITY_MAP[app.verbosity]) info_handler.setFormatter(ColorizeFormatter()) warning_handler = WarningStreamHandler(SafeEncodingWriter(warning)) warning_handler.addFilter(WarningSuppressor(app)) warning_handler.addFilter(WarningLogRecordTranslator(app)) warning_handler.addFilter(WarningIsErrorFilter(app)) warning_handler.addFilter(OnceFilter()) warning_handler.setLevel(logging.WARNING) warning_handler.setFormatter(ColorizeFormatter()) messagelog_handler = logging.StreamHandler(LastMessagesWriter(app, status)) messagelog_handler.addFilter(InfoFilter()) messagelog_handler.setLevel(VERBOSITY_MAP[app.verbosity]) messagelog_handler.setFormatter(ColorizeFormatter()) logger.addHandler(info_handler) logger.addHandler(warning_handler) logger.addHandler(messagelog_handler)