Use python logging

This commit is contained in:
Connor Mason 2025-02-09 10:43:23 -08:00
parent 711e72c292
commit 593b352fdb

View file

@ -12,6 +12,7 @@ import io
import itertools import itertools
import json import json
import locale import locale
import logging
import operator import operator
import os import os
import platform import platform
@ -24,6 +25,8 @@ import time
import tokenize import tokenize
import traceback import traceback
import random import random
from typing import Any
from typing import cast
try: try:
from ssl import OPENSSL_VERSION from ssl import OPENSSL_VERSION
@ -130,6 +133,10 @@ from .version import __version__
if compat_os_name == 'nt': if compat_os_name == 'nt':
import ctypes import ctypes
logger = logging.getLogger('soundcloudutil.downloader')
TAGGED_LOG_MSG_REGEX = re.compile(r'^\[(?P<tag>\w+)(:(?P<subtag>\w+))?\]\s*(?P<msg>.+)$')
def _catch_unsafe_file_extension(func): def _catch_unsafe_file_extension(func):
@functools.wraps(func) @functools.wraps(func)
@ -536,33 +543,53 @@ class YoutubeDL(object):
for _ in range(line_count)) for _ in range(line_count))
return res[:-len('\n')] return res[:-len('\n')]
def to_screen(self, message, skip_eol=False): def to_screen(self, message, skip_eol: bool = False):
"""Print message to stdout if not in quiet mode.""" """Print message to stdout if not in quiet mode."""
return self.to_stdout(message, skip_eol, check_quiet=True) return self.to_stdout(message, skip_eol, check_quiet=True)
def _write_string(self, s, out=None): @property
def user_logger(self) -> logging.Logger | None:
return cast(logging.Logger | None, self.params.get('logger'))
def _write_string(self, s: str, out: io.TextIOWrapper | None = None) -> None:
write_string(s, out=out, encoding=self.params.get('encoding')) write_string(s, out=out, encoding=self.params.get('encoding'))
def to_stdout(self, message, skip_eol=False, check_quiet=False): def to_stdout(self, message, skip_eol: bool = False, check_quiet: bool = False):
"""Print message to stdout if not in quiet mode.""" """Print message to stdout if not in quiet mode."""
if self.params.get('logger'): quiet = check_quiet and self.params.get('quiet', False)
self.params['logger'].debug(message)
elif not check_quiet or not self.params.get('quiet', False):
message = self._bidi_workaround(message)
terminator = ['\n', ''][skip_eol]
output = message + terminator
self._write_string(output, self._screen_file) debug: bool
if message.startswith(f'[debug]'):
def to_stderr(self, message): debug = True
"""Print message to stderr.""" message = message.removeprefix('[debug]').lstrip()
assert isinstance(message, compat_str) elif message.startswith('[info]'):
if self.params.get('logger'): debug = False
self.params['logger'].error(message) message = message.removeprefix('[info]').lstrip()
elif quiet:
debug = True
else: else:
message = self._bidi_workaround(message) debug = False
output = message + '\n'
self._write_string(output, self._err_file) _logger = logger
if m := TAGGED_LOG_MSG_REGEX.match(message):
tag = m.group('tag')
subtag = m.group('subtag')
_logger_name = f'youtube_dl.{tag}'
if m.group('subtag'):
_logger_name += f'.{subtag}'
_logger = logging.getLogger(_logger_name)
message = m.group('msg')
if debug:
_logger.debug(message)
else:
_logger.info(message)
def to_stderr(self, message: str) -> None:
if self.user_logger is not None:
self.user_logger.error(message)
else:
logger.error(message)
def to_console_title(self, message): def to_console_title(self, message):
if not self.params.get('consoletitle', False): if not self.params.get('consoletitle', False):
@ -641,11 +668,8 @@ class YoutubeDL(object):
raise DownloadError(message, exc_info) raise DownloadError(message, exc_info)
self._download_retcode = 1 self._download_retcode = 1
def report_warning(self, message, only_once=False, _cache={}): def report_warning(self, message: str, only_once: bool = False, _cache: dict[int, int] | None = None) -> None:
''' _cache = _cache or {}
Print the message to stderr, it will be prefixed with 'WARNING:'
If stderr is a tty file the 'WARNING:' will be colored
'''
if only_once: if only_once:
m_hash = hash((self, message)) m_hash = hash((self, message))
m_cnt = _cache.setdefault(m_hash, 0) m_cnt = _cache.setdefault(m_hash, 0)
@ -653,28 +677,17 @@ class YoutubeDL(object):
if m_cnt > 0: if m_cnt > 0:
return return
if self.params.get('logger') is not None: if self.user_logger is not None:
self.params['logger'].warning(message) self.user_logger.warning(message)
else: else:
if self.params.get('no_warnings'): if self.params.get('no_warnings'):
return return
if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt': logger.warning(message)
_msg_header = '\033[0;33mWARNING:\033[0m'
else:
_msg_header = 'WARNING:'
warning_message = '%s %s' % (_msg_header, message)
self.to_stderr(warning_message)
def report_error(self, message, *args, **kwargs): # TODO: re-implement :meth:`trouble` to output tracebacks with RichHandler
''' def report_error(self, message: str, *args: Any, **kwargs: Any) -> None:
Do the same as trouble, but prefixes the message with 'ERROR:', colored logger.error(message)
in red if stderr is a tty file. kwargs['message'] = f'ERROR: {message}'
'''
if not self.params.get('no_color') and self._err_file.isatty() and compat_os_name != 'nt':
_msg_header = '\033[0;31mERROR:\033[0m'
else:
_msg_header = 'ERROR:'
kwargs['message'] = '%s %s' % (_msg_header, message)
self.trouble(*args, **kwargs) self.trouble(*args, **kwargs)
def report_unscoped_cookies(self, *args, **kwargs): def report_unscoped_cookies(self, *args, **kwargs):
@ -2656,7 +2669,13 @@ class YoutubeDL(object):
encoding = preferredencoding() encoding = preferredencoding()
return encoding return encoding
def _write_info_json(self, label, info_dict, infofn, overwrite=None): def _write_info_json(
self,
label: str,
info_dict: dict[str, Any],
infofn: str,
overwrite: bool | None = None,
) -> bool | str | None:
if not self.params.get('writeinfojson', False): if not self.params.get('writeinfojson', False):
return False return False
@ -2676,7 +2695,7 @@ class YoutubeDL(object):
return True return True
except (OSError, IOError): except (OSError, IOError):
self.report_error(msg('Cannot write %s to JSON file ', label) + infofn) self.report_error(msg('Cannot write %s to JSON file ', label) + infofn)
return return None
def _write_thumbnails(self, info_dict, filename): def _write_thumbnails(self, info_dict, filename):
if self.params.get('writethumbnail', False): if self.params.get('writethumbnail', False):