This commit is contained in:
Tejas Attarde 2026-03-13 17:23:03 -04:00 committed by GitHub
commit b5e0c5e602
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 159 additions and 43 deletions

12
youtube_dl/YoutubeDL.py Executable file → Normal file
View file

@ -540,7 +540,9 @@ class YoutubeDL(object):
"""Print message to stdout if not in quiet mode."""
return self.to_stdout(message, skip_eol, check_quiet=True)
def _write_string(self, s, out=None, only_once=False, _cache=set()):
def _write_string(self, s, out=None, only_once=False, _cache=None):
if _cache is None:
_cache = set()
if only_once and s in _cache:
return
write_string(s, out=out, encoding=self.params.get('encoding'))
@ -838,7 +840,7 @@ class YoutubeDL(object):
for key, value in extra_info.items():
info_dict.setdefault(key, value)
def extract_info(self, url, download=True, ie_key=None, extra_info={},
def extract_info(self, url, download=True, ie_key=None, extra_info=None,
process=True, force_generic_extractor=False):
"""
Return a list with a dictionary for each video extracted.
@ -854,6 +856,8 @@ class YoutubeDL(object):
must be True for download to work.
force_generic_extractor -- force using the generic extractor
"""
if extra_info is None:
extra_info = {}
if not ie_key and force_generic_extractor:
ie_key = 'Generic'
@ -998,7 +1002,7 @@ class YoutubeDL(object):
'extractor_key': ie.ie_key(),
})
def process_ie_result(self, ie_result, download=True, extra_info={}):
def process_ie_result(self, ie_result, download=True, extra_info=None):
"""
Take the result of the ie (may be modified) and resolve all unresolved
references (URLs, playlist items).
@ -1006,6 +1010,8 @@ class YoutubeDL(object):
It will also download the videos if 'download'.
Returns the resolved ie_result.
"""
if extra_info is None:
extra_info = {}
result_type = ie_result.get('_type', 'video')
if result_type in ('url', 'url_transparent'):

View file

@ -3740,8 +3740,10 @@ else:
except Exception:
pass
def __repr__(self, _repr_running={}):
def __repr__(self, _repr_running=None):
# skip recursive items ...
if _repr_running is None:
_repr_running = {}
call_key = id(self), _get_ident()
if _repr_running.get(call_key):
return '...'

View file

@ -5,7 +5,9 @@ from ..utils import (
)
def get_suitable_downloader(info_dict, params={}):
def get_suitable_downloader(info_dict, params=None):
if params is None:
params = {}
info_dict['protocol'] = determine_protocol(info_dict)
info_copy = info_dict.copy()
return _get_suitable_downloader(info_copy, params)
@ -39,8 +41,10 @@ PROTOCOL_MAP = {
}
def _get_suitable_downloader(info_dict, params={}):
def _get_suitable_downloader(info_dict, params=None):
"""Get the downloader class that can handle the info dict."""
if params is None:
params = {}
# if (info_dict.get('start_time') or info_dict.get('end_time')) and not info_dict.get('requested_formats') and FFmpegFD.can_download(info_dict):
# return FFmpegFD

View file

@ -111,7 +111,9 @@ class ExternalFD(FileDownloader):
def _valueless_option(self, command_option, param, expected_value=True):
return cli_valueless_option(self.params, command_option, param, expected_value)
def _configuration_args(self, default=[]):
def _configuration_args(self, default=None):
if default is None:
default = []
return cli_configuration_args(self.params, 'external_downloader_args', default)
def _write_cookies(self):

View file

@ -1360,7 +1360,9 @@ class AdobePassIE(InfoExtractor):
token_expires = unified_timestamp(re.sub(r'[_ ]GMT', '', xml_text(token, date_ele)))
return token_expires and token_expires <= int(time.time())
def post_form(form_page_res, note, data={}):
def post_form(form_page_res, note, data=None):
if data is None:
data = {}
form_page, urlh = form_page_res
post_url = self._html_search_regex(r'<form[^>]+action=(["\'])(?P<url>.+?)\1', form_page, 'post url', group='url')
if not re.match(r'https?://', post_url):

View file

@ -468,7 +468,9 @@ class BrightcoveNewIE(AdobePassIE):
return entries
def _parse_brightcove_metadata(self, json_data, video_id, headers={}):
def _parse_brightcove_metadata(self, json_data, video_id, headers=None):
if headers is None:
headers = {}
title = json_data['name'].strip()
num_drm_sources = 0

View file

@ -644,12 +644,16 @@ class InfoExtractor(object):
else:
assert False
def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fatal=True, data=None, headers={}, query={}, expected_status=None):
def _request_webpage(self, url_or_request, video_id, note=None, errnote=None, fatal=True, data=None, headers=None, query=None, expected_status=None):
"""
Return the response handle.
See _download_webpage docstring for arguments specification.
"""
if headers is None:
headers = {}
if query is None:
query = {}
if note is None:
self.report_download_webpage(video_id)
elif note is not False:
@ -702,12 +706,16 @@ class InfoExtractor(object):
self.report_warning(errmsg)
return False
def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True, encoding=None, data=None, headers={}, query={}, expected_status=None):
def _download_webpage_handle(self, url_or_request, video_id, note=None, errnote=None, fatal=True, encoding=None, data=None, headers=None, query=None, expected_status=None):
"""
Return a tuple (page content as string, URL handle).
See _download_webpage docstring for arguments specification.
"""
if headers is None:
headers = {}
if query is None:
query = {}
# Strip hashes from the URL (#1038)
if isinstance(url_or_request, (compat_str, str)):
url_or_request = url_or_request.partition('#')[0]
@ -804,7 +812,7 @@ class InfoExtractor(object):
def _download_webpage(
self, url_or_request, video_id, note=None, errnote=None,
fatal=True, tries=1, timeout=5, encoding=None, data=None,
headers={}, query={}, expected_status=None):
headers=None, query=None, expected_status=None):
"""
Return the data of the page as a string.
@ -838,6 +846,10 @@ class InfoExtractor(object):
Note that this argument does not affect success status codes (2xx)
which are always accepted.
"""
if headers is None:
headers = {}
if query is None:
query = {}
success = False
try_count = 0
@ -862,13 +874,17 @@ class InfoExtractor(object):
def _download_xml_handle(
self, url_or_request, video_id, note='Downloading XML',
errnote='Unable to download XML', transform_source=None,
fatal=True, encoding=None, data=None, headers={}, query={},
fatal=True, encoding=None, data=None, headers=None, query=None,
expected_status=None):
"""
Return a tuple (xml as an compat_etree_Element, URL handle).
See _download_webpage docstring for arguments specification.
"""
if headers is None:
headers = {}
if query is None:
query = {}
res = self._download_webpage_handle(
url_or_request, video_id, note, errnote, fatal=fatal,
encoding=encoding, data=data, headers=headers, query=query,
@ -884,12 +900,16 @@ class InfoExtractor(object):
self, url_or_request, video_id,
note='Downloading XML', errnote='Unable to download XML',
transform_source=None, fatal=True, encoding=None,
data=None, headers={}, query={}, expected_status=None):
data=None, headers=None, query=None, expected_status=None):
"""
Return the xml as an compat_etree_Element.
See _download_webpage docstring for arguments specification.
"""
if headers is None:
headers = {}
if query is None:
query = {}
res = self._download_xml_handle(
url_or_request, video_id, note=note, errnote=errnote,
transform_source=transform_source, fatal=fatal, encoding=encoding,
@ -912,13 +932,17 @@ class InfoExtractor(object):
def _download_json_handle(
self, url_or_request, video_id, note='Downloading JSON metadata',
errnote='Unable to download JSON metadata', transform_source=None,
fatal=True, encoding=None, data=None, headers={}, query={},
fatal=True, encoding=None, data=None, headers=None, query=None,
expected_status=None):
"""
Return a tuple (JSON object, URL handle).
See _download_webpage docstring for arguments specification.
"""
if headers is None:
headers = {}
if query is None:
query = {}
res = self._download_webpage_handle(
url_or_request, video_id, note, errnote, fatal=fatal,
encoding=encoding, data=data, headers=headers, query=query,
@ -933,13 +957,17 @@ class InfoExtractor(object):
def _download_json(
self, url_or_request, video_id, note='Downloading JSON metadata',
errnote='Unable to download JSON metadata', transform_source=None,
fatal=True, encoding=None, data=None, headers={}, query={},
fatal=True, encoding=None, data=None, headers=None, query=None,
expected_status=None):
"""
Return the JSON object as a dict.
See _download_webpage docstring for arguments specification.
"""
if headers is None:
headers = {}
if query is None:
query = {}
res = self._download_json_handle(
url_or_request, video_id, note=note, errnote=errnote,
transform_source=transform_source, fatal=fatal, encoding=encoding,
@ -1640,7 +1668,9 @@ class InfoExtractor(object):
unique_formats.append(f)
formats[:] = unique_formats
def _is_valid_url(self, url, video_id, item='video', headers={}):
def _is_valid_url(self, url, video_id, item='video', headers=None):
if headers is None:
headers = {}
url = self._proto_relative_url(url, scheme='http:')
# For now assume non HTTP(S) URLs always valid
if not (url.startswith('http://') or url.startswith('https://')):
@ -1680,7 +1710,11 @@ class InfoExtractor(object):
def _extract_f4m_formats(self, manifest_url, video_id, preference=None, f4m_id=None,
transform_source=lambda s: fix_xml_ampersands(s).strip(),
fatal=True, m3u8_id=None, data=None, headers={}, query={}):
fatal=True, m3u8_id=None, data=None, headers=None, query=None):
if headers is None:
headers = {}
if query is None:
query = {}
manifest = self._download_xml(
manifest_url, video_id, 'Downloading f4m manifest',
'Unable to download f4m manifest',
@ -1817,8 +1851,12 @@ class InfoExtractor(object):
def _extract_m3u8_formats(self, m3u8_url, video_id, ext=None,
entry_protocol='m3u8', preference=None,
m3u8_id=None, note=None, errnote=None,
fatal=True, live=False, data=None, headers={},
query={}):
fatal=True, live=False, data=None, headers=None,
query=None):
if headers is None:
headers = {}
if query is None:
query = {}
res = self._download_webpage_handle(
m3u8_url, video_id,
note=note or 'Downloading m3u8 information',
@ -2673,7 +2711,11 @@ class InfoExtractor(object):
subtitles.setdefault(lang or 'und', []).append(f)
return formats, subtitles
def _extract_ism_formats(self, ism_url, video_id, ism_id=None, note=None, errnote=None, fatal=True, data=None, headers={}, query={}):
def _extract_ism_formats(self, ism_url, video_id, ism_id=None, note=None, errnote=None, fatal=True, data=None, headers=None, query=None):
if headers is None:
headers = {}
if query is None:
query = {}
res = self._download_xml_handle(
ism_url, video_id,
note=note or 'Downloading ISM manifest',
@ -2908,7 +2950,9 @@ class InfoExtractor(object):
entries.append(media_info)
return entries
def _extract_akamai_formats(self, manifest_url, video_id, hosts={}):
def _extract_akamai_formats(self, manifest_url, video_id, hosts=None):
if hosts is None:
hosts = {}
signed = 'hdnea=' in manifest_url
if not signed:
# https://learn.akamai.com/en-us/webhelp/media-services-on-demand/stream-packaging-user-guide/GUID-BE6C0F73-1E06-483B-B0EA-57984B91B7F9.html
@ -2964,7 +3008,9 @@ class InfoExtractor(object):
return formats
def _extract_wowza_formats(self, url, video_id, m3u8_entry_protocol='m3u8_native', skip_protocols=[]):
def _extract_wowza_formats(self, url, video_id, m3u8_entry_protocol='m3u8_native', skip_protocols=None):
if skip_protocols is None:
skip_protocols = []
query = compat_urlparse.urlparse(url).query
url = re.sub(r'/(?:manifest|playlist|jwplayer)\.(?:m3u8|f4m|mpd|smil)', '', url)
mobj = re.search(
@ -3204,7 +3250,9 @@ class InfoExtractor(object):
return res
def _set_cookie(self, domain, name, value, expire_time=None, port=None,
path='/', secure=False, discard=False, rest={}, **kwargs):
path='/', secure=False, discard=False, rest=None, **kwargs):
if rest is None:
rest = {}
cookie = compat_cookiejar_Cookie(
0, name, value, port, port is not None, domain, True,
domain.startswith('.'), path, True, secure, expire_time,

View file

@ -29,7 +29,9 @@ class NaverBaseIE(InfoExtractor):
formats = []
get_list = lambda x: try_get(video_data, lambda y: y[x + 's']['list'], list) or []
def extract_formats(streams, stream_type, query={}):
def extract_formats(streams, stream_type, query=None):
if query is None:
query = {}
for stream in streams:
stream_url = stream.get('source')
if not stream_url:

View file

@ -146,7 +146,9 @@ class NexxIE(InfoExtractor):
'%s said: %s' % (self.IE_NAME, response['metadata']['errorhint']),
expected=True)
def _call_api(self, domain_id, path, video_id, data=None, headers={}):
def _call_api(self, domain_id, path, video_id, data=None, headers=None):
if headers is None:
headers = {}
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
result = self._download_json(
'https://api.nexx.cloud/v3/%s/%s' % (domain_id, path), video_id,

View file

@ -10,7 +10,9 @@ from ..utils import (
class NuevoBaseIE(InfoExtractor):
def _extract_nuevo(self, config_url, video_id, headers={}):
def _extract_nuevo(self, config_url, video_id, headers=None):
if headers is None:
headers = {}
config = self._download_xml(
config_url, video_id, transform_source=lambda s: s.strip(),
headers=headers)

View file

@ -162,7 +162,7 @@ class PhantomJSwrapper(object):
cookie['expire_time'] = cookie['expiry']
self.extractor._set_cookie(**compat_kwargs(cookie))
def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on webpage', headers={}, jscode='saveAndExit();'):
def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on webpage', headers=None, jscode='saveAndExit();'):
"""
Downloads webpage (if needed) and executes JS
@ -198,6 +198,8 @@ class PhantomJSwrapper(object):
});
check();
"""
if headers is None:
headers = {}
if 'saveAndExit();' not in jscode:
raise ExtractorError('`saveAndExit();` not found in `jscode`')
if not html:

View file

@ -343,7 +343,9 @@ class ThePlatformFeedIE(ThePlatformBaseIE):
'only_matching': True,
}]
def _extract_feed_info(self, provider_id, feed_id, filter_query, video_id, custom_fields=None, asset_types_query={}, account_id=None):
def _extract_feed_info(self, provider_id, feed_id, filter_query, video_id, custom_fields=None, asset_types_query=None, account_id=None):
if asset_types_query is None:
asset_types_query = {}
real_url = self._URL_TEMPLATE % (self.http_scheme(), provider_id, feed_id, filter_query)
entry = self._download_json(real_url, video_id)['entries'][0]
main_smil_url = 'http://link.theplatform.com/s/%s/media/guid/%d/%s' % (provider_id, account_id, entry['guid']) if account_id else entry.get('plmedia$publicUrl')

View file

@ -50,7 +50,11 @@ class TurnerBaseIE(AdobePassIE):
self._AKAMAI_SPE_TOKEN_CACHE[secure_path] = token
return video_url + '?hdnea=' + token
def _extract_cvp_info(self, data_src, video_id, path_data={}, ap_data={}, fatal=False):
def _extract_cvp_info(self, data_src, video_id, path_data=None, ap_data=None, fatal=False):
if path_data is None:
path_data = {}
if ap_data is None:
ap_data = {}
video_data = self._download_xml(
data_src, video_id,
transform_source=lambda s: fix_xml_ampersands(s).strip(),

View file

@ -205,7 +205,9 @@ class TVNowNewIE(InfoExtractor):
class TVNowNewBaseIE(InfoExtractor):
def _call_api(self, path, video_id, query={}):
def _call_api(self, path, video_id, query=None):
if query is None:
query = {}
result = self._download_json(
'https://apigw.tvnow.de/module/' + path, video_id, query=query)
error = result.get('error')

View file

@ -79,7 +79,9 @@ class TwitterBaseIE(InfoExtractor):
'height': int(m.group('height')),
})
def _call_api(self, path, video_id, query={}):
def _call_api(self, path, video_id, query=None):
if query is None:
query = {}
headers = {
'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAAPYXBAAAAAAACLXUNDekMxqa8h%2F40K4moUkGsoc%3DTYfbDKbT3jJPCEVnMYqilB28NHfOPqkca3qaAxGfsyKCs0wRbw',
}

View file

@ -484,7 +484,9 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
T(compat_str)))
# @functools.cached_property
def is_authenticated(self, _cache={}):
def is_authenticated(self, _cache=None):
if _cache is None:
_cache = {}
if self not in _cache:
_cache[self] = bool(self._generate_sapisidhash_header())
return _cache[self]
@ -633,7 +635,13 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
('owner', 'videoOwner'), 'videoOwnerRenderer', 'title',
'runs', Ellipsis]
def _extract_channel_id(self, webpage, videodetails={}, metadata={}, renderers=[]):
def _extract_channel_id(self, webpage, videodetails=None, metadata=None, renderers=None):
if videodetails is None:
videodetails = {}
if metadata is None:
metadata = {}
if renderers is None:
renderers = []
channel_id = None
if any((videodetails, metadata, renderers)):
channel_id = (
@ -648,7 +656,13 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
'channelId', webpage, 'channel id', default=None)
def _extract_author_var(self, webpage, var_name,
videodetails={}, metadata={}, renderers=[]):
videodetails=None, metadata=None, renderers=None):
if videodetails is None:
videodetails = {}
if metadata is None:
metadata = {}
if renderers is None:
renderers = []
result = None
paths = {
# (HTML, videodetails, metadata, renderers)
@ -1774,7 +1788,9 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
raise ExtractorError(
'Cannot identify player %r' % (player_url,), cause=e)
def _player_js_cache_key(self, player_url, extra_id=None, _cache={}):
def _player_js_cache_key(self, player_url, extra_id=None, _cache=None):
if _cache is None:
_cache = {}
if player_url not in _cache:
player_id = self._extract_player_info(player_url)
player_path = remove_start(

View file

@ -700,7 +700,9 @@ class JSInterpreter(object):
return separated[0][1:].strip(), separated[1].strip()
@staticmethod
def _all_operators(_cached=[]):
def _all_operators(_cached=None):
if _cached is None:
_cached = []
if not _cached:
_cached.extend(itertools.chain(
# Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/Operator_Precedence

View file

@ -40,7 +40,9 @@ def _hide_login_info(opts):
def parseOpts(overrideArguments=None):
def _readOptions(filename_bytes, default=[]):
def _readOptions(filename_bytes, default=None):
if default is None:
default = []
try:
optionf = open(filename_bytes, encoding=preferredencoding())
except IOError:

View file

@ -61,7 +61,9 @@ class PostProcessor(object):
except Exception:
self._downloader.report_warning(errnote)
def _configuration_args(self, default=[]):
def _configuration_args(self, default=None):
if default is None:
default = []
return cli_configuration_args(self._downloader.params, 'postprocessor_args', default)

View file

@ -4006,9 +4006,11 @@ prepend_extension = functools.partial(_change_extension, True)
replace_extension = functools.partial(_change_extension, False)
def check_executable(exe, args=[]):
def check_executable(exe, args=None):
""" Checks if the given binary is installed somewhere in PATH, and returns its name.
args can be a list of arguments for a short output (like -version) """
if args is None:
args = []
try:
process_communicate_or_kill(subprocess.Popen(
[exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE))
@ -4017,10 +4019,12 @@ def check_executable(exe, args=[]):
return exe
def get_exe_version(exe, args=['--version'],
def get_exe_version(exe, args=None,
version_re=None, unrecognized='present'):
""" Returns the version of the specified executable,
or False if the executable is not present """
if args is None:
args = []
try:
# STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
# SIGTTOU if youtube-dl is run in the background.
@ -4322,7 +4326,11 @@ def update_url_query(url, query):
return update_url(url, query_update=query)
def update_Request(req, url=None, data=None, headers={}, query={}):
def update_Request(req, url=None, data=None, headers=None, query=None):
if headers is None:
headers = {}
if query is None:
query = {}
req_headers = req.headers.copy()
req_headers.update(headers)
req_data = data if data is not None else req.data
@ -5153,7 +5161,9 @@ def cli_valueless_option(params, command_option, param, expected_value=True):
return [command_option] if param == expected_value else []
def cli_configuration_args(params, param, default=[]):
def cli_configuration_args(params, param, default=None):
if default is None:
default = []
ex_args = params.get(param)
if ex_args is None:
return default