[core] Process header cookies on loading

This commit is contained in:
Simon Sawicki 2023-07-04 21:41:04 +02:00 committed by dirkf
parent 3801d36416
commit 8334ec961b
4 changed files with 357 additions and 33 deletions

View file

@ -10,14 +10,30 @@ import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import copy
import json
from test.helper import FakeYDL, assertRegexpMatches
from test.helper import (
FakeYDL,
assertRegexpMatches,
try_rm,
)
from youtube_dl import YoutubeDL
from youtube_dl.compat import compat_str, compat_urllib_error
from youtube_dl.compat import (
compat_http_cookiejar_Cookie,
compat_http_cookies_SimpleCookie,
compat_kwargs,
compat_str,
compat_urllib_error,
)
from youtube_dl.extractor import YoutubeIE
from youtube_dl.extractor.common import InfoExtractor
from youtube_dl.postprocessor.common import PostProcessor
from youtube_dl.utils import ExtractorError, match_filter_func
from youtube_dl.utils import (
ExtractorError,
match_filter_func,
traverse_obj,
)
TEST_URL = 'http://localhost/sample.mp4'
@ -29,11 +45,14 @@ class YDL(FakeYDL):
self.msgs = []
def process_info(self, info_dict):
self.downloaded_info_dicts.append(info_dict)
self.downloaded_info_dicts.append(info_dict.copy())
def to_screen(self, msg):
self.msgs.append(msg)
def dl(self, *args, **kwargs):
assert False, 'Downloader must not be invoked for test_YoutubeDL'
def _make_result(formats, **kwargs):
res = {
@ -42,8 +61,9 @@ def _make_result(formats, **kwargs):
'title': 'testttitle',
'extractor': 'testex',
'extractor_key': 'TestEx',
'webpage_url': 'http://example.com/watch?v=shenanigans',
}
res.update(**kwargs)
res.update(**compat_kwargs(kwargs))
return res
@ -1011,5 +1031,160 @@ class TestYoutubeDL(unittest.TestCase):
self.assertEqual(out_info['release_date'], '20210930')
class TestYoutubeDLCookies(unittest.TestCase):
@staticmethod
def encode_cookie(cookie):
if not isinstance(cookie, dict):
cookie = vars(cookie)
for name, value in cookie.items():
yield name, compat_str(value)
@classmethod
def comparable_cookies(cls, cookies):
# Work around cookiejar cookies not being unicode strings
return sorted(map(tuple, map(sorted, map(cls.encode_cookie, cookies))))
def assertSameCookies(self, c1, c2, msg=None):
return self.assertEqual(
*map(self.comparable_cookies, (c1, c2)),
msg=msg)
def assertSameCookieStrings(self, c1, c2, msg=None):
return self.assertSameCookies(
*map(lambda c: compat_http_cookies_SimpleCookie(c).values(), (c1, c2)),
msg=msg)
def test_header_cookies(self):
ydl = FakeYDL()
ydl.report_warning = lambda *_, **__: None
def cookie(name, value, version=None, domain='', path='', secure=False, expires=None):
return compat_http_cookiejar_Cookie(
version or 0, name, value, None, False,
domain, bool(domain), bool(domain), path, bool(path),
secure, expires, False, None, None, rest={})
test_url, test_domain = (t % ('yt.dl',) for t in ('https://%s/test', '.%s'))
def test(encoded_cookies, cookies, headers=False, round_trip=None, error_re=None):
def _test():
ydl.cookiejar.clear()
ydl._load_cookies(encoded_cookies, autoscope=headers)
if headers:
ydl._apply_header_cookies(test_url)
data = {'url': test_url}
ydl._calc_headers(data)
self.assertSameCookies(
cookies, ydl.cookiejar,
'Extracted cookiejar.Cookie is not the same')
if not headers:
self.assertSameCookieStrings(
data.get('cookies'), round_trip or encoded_cookies,
msg='Cookie is not the same as round trip')
ydl.__dict__['_YoutubeDL__header_cookies'] = []
try:
_test()
except AssertionError:
raise
except Exception as e:
if not error_re:
raise
assertRegexpMatches(self, e.args[0], error_re.join(('.*',) * 2))
test('test=value; Domain=' + test_domain, [cookie('test', 'value', domain=test_domain)])
test('test=value', [cookie('test', 'value')], error_re='Unscoped cookies are not allowed')
test('cookie1=value1; Domain={0}; Path=/test; cookie2=value2; Domain={0}; Path=/'.format(test_domain), [
cookie('cookie1', 'value1', domain=test_domain, path='/test'),
cookie('cookie2', 'value2', domain=test_domain, path='/')])
cookie_kw = compat_kwargs(
{'domain': test_domain, 'path': '/test', 'secure': True, 'expires': '9999999999', })
test('test=value; Domain={domain}; Path={path}; Secure; Expires={expires}'.format(**cookie_kw), [
cookie('test', 'value', **cookie_kw)])
test('test="value; "; path=/test; domain=' + test_domain, [
cookie('test', 'value; ', domain=test_domain, path='/test')],
round_trip='test="value\\073 "; Domain={0}; Path=/test'.format(test_domain))
test('name=; Domain=' + test_domain, [cookie('name', '', domain=test_domain)],
round_trip='name=""; Domain=' + test_domain)
test('test=value', [cookie('test', 'value', domain=test_domain)], headers=True)
test('cookie1=value; Domain={0}; cookie2=value'.format(test_domain), [],
headers=True, error_re='Invalid syntax')
ydl.report_warning = ydl.report_error
test('test=value', [], headers=True, error_re='Passing cookies as a header is a potential security risk')
def test_infojson_cookies(self):
TEST_FILE = 'test_infojson_cookies.info.json'
TEST_URL = 'https://example.com/example.mp4'
COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com'
COOKIE_HEADER = {'Cookie': 'a=b; c=d'}
ydl = FakeYDL()
ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE)
def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False):
fmt = {'url': TEST_URL}
if fmts_header_cookies:
fmt['http_headers'] = COOKIE_HEADER
if cookies_field:
fmt['cookies'] = COOKIES
return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None)
def test(initial_info, note):
def failure_msg(why):
return ' when '.join((why, note))
result = {}
result['processed'] = ydl.process_ie_result(initial_info)
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
msg=failure_msg('No cookies set in cookiejar after initial process'))
ydl.cookiejar.clear()
with open(TEST_FILE) as infojson:
result['loaded'] = ydl.sanitize_info(json.load(infojson), True)
result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False)
self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL),
msg=failure_msg('No cookies set in cookiejar after final process'))
ydl.cookiejar.clear()
for key in ('processed', 'loaded', 'final'):
info = result[key]
self.assertIsNone(
traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False),
msg=failure_msg('Cookie header not removed in {0} result'.format(key)))
self.assertSameCookieStrings(
traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES,
msg=failure_msg('No cookies field found in {0} result'.format(key)))
test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field')
test(make_info(info_header_cookies=True), 'info_dict header cokies')
test(make_info(fmts_header_cookies=True), 'format header cookies')
test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies')
test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields')
test(make_info(cookies_field=True), 'cookies format field')
test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only')
try_rm(TEST_FILE)
def test_add_headers_cookie(self):
def check_for_cookie_header(result):
return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False)
ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}})
ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com
fmt = {'url': 'https://example.com/video.mp4'}
result = ydl.process_ie_result(_make_result([fmt]), download=False)
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict')
self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field')
self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar')
fmt = {'url': 'https://wrong.com/video.mp4'}
result = ydl.process_ie_result(_make_result([fmt]), download=False)
self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain')
self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain')
self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain')
if __name__ == '__main__':
unittest.main()

View file

@ -46,6 +46,20 @@ class TestYoutubeDLCookieJar(unittest.TestCase):
# will be ignored
self.assertFalse(cookiejar._cookies)
def test_get_cookie_header(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt')
cookiejar.load(ignore_discard=True, ignore_expires=True)
header = cookiejar.get_cookie_header('https://www.foobar.foobar')
self.assertIn('HTTPONLY_COOKIE', header)
def test_get_cookies_for_url(self):
cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt')
cookiejar.load(ignore_discard=True, ignore_expires=True)
cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/')
self.assertEqual(len(cookies), 2)
cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/')
self.assertFalse(cookies)
if __name__ == '__main__':
unittest.main()

View file

@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals
import collections
import contextlib
import copy
import datetime
import errno
import fileinput
@ -34,10 +35,12 @@ from string import ascii_letters
from .compat import (
compat_basestring,
compat_cookiejar,
compat_collections_chain_map as ChainMap,
compat_filter as filter,
compat_get_terminal_size,
compat_http_client,
compat_http_cookiejar_Cookie,
compat_http_cookies_SimpleCookie,
compat_integer_types,
compat_kwargs,
compat_map as map,
@ -53,6 +56,7 @@ from .compat import (
from .utils import (
age_restricted,
args_to_str,
bug_reports_message,
ContentTooShortError,
date_from_str,
DateRange,
@ -97,6 +101,7 @@ from .utils import (
std_headers,
str_or_none,
subtitles_filename,
traverse_obj,
UnavailableVideoError,
url_basename,
version_tuple,
@ -376,6 +381,9 @@ class YoutubeDL(object):
self.params.update(params)
self.cache = Cache(self)
self._header_cookies = []
self._load_cookies_from_headers(self.params.get('http_headers'))
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
self.report_warning(
@ -870,8 +878,83 @@ class YoutubeDL(object):
raise
return wrapper
def _remove_cookie_header(self, http_headers):
"""Filters out `Cookie` header from an `http_headers` dict
The `Cookie` header is removed to prevent leaks as a result of unscoped cookies.
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
@param http_headers An `http_headers` dict from which any `Cookie` header
should be removed, or None
"""
return dict(filter(lambda pair: pair[0].lower() != 'cookie', (http_headers or {}).items()))
def _load_cookies(self, data, **kwargs):
"""Loads cookies from a `Cookie` header
This tries to work around the security vulnerability of passing cookies to every domain.
@param data The Cookie header as a string to load the cookies from
@param autoscope If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains
If `True`, save cookies for later to be stored in the jar with a limited scope
If a URL, save cookies in the jar with the domain of the URL
"""
# autoscope=True (kw-only)
autoscope = kwargs.get('autoscope', True)
for cookie in compat_http_cookies_SimpleCookie(data).values() if data else []:
if autoscope and any(cookie.values()):
raise ValueError('Invalid syntax in Cookie Header')
domain = cookie.get('domain') or ''
expiry = cookie.get('expires')
if expiry == '': # 0 is valid so we check for `''` explicitly
expiry = None
prepared_cookie = compat_http_cookiejar_Cookie(
cookie.get('version') or 0, cookie.key, cookie.value, None, False,
domain, True, True, cookie.get('path') or '', bool(cookie.get('path')),
bool(cookie.get('secure')), expiry, False, None, None, {})
if domain:
self.cookiejar.set_cookie(prepared_cookie)
elif autoscope is True:
self.report_warning(
'Passing cookies as a header is a potential security risk; '
'they will be scoped to the domain of the downloaded urls. '
'Please consider loading cookies from a file or browser instead.',
only_once=True)
self._header_cookies.append(prepared_cookie)
elif autoscope:
self.report_warning(
'The extractor result contains an unscoped cookie as an HTTP header. '
'If you are specifying an input URL, ' + bug_reports_message(),
only_once=True)
self._apply_header_cookies(autoscope, [prepared_cookie])
else:
self.report_unscoped_cookies()
def _load_cookies_from_headers(self, headers):
self._load_cookies(traverse_obj(headers, 'cookie', casesense=False))
def _apply_header_cookies(self, url, cookies=None):
"""This method applies stray header cookies to the provided url
This loads header cookies and scopes them to the domain provided in `url`.
While this is not ideal, it helps reduce the risk of them being sent to
an unintended destination.
"""
parsed = compat_urllib_parse.urlparse(url)
if not parsed.hostname:
return
for cookie in map(copy.copy, cookies or self._header_cookies):
cookie.domain = '.' + parsed.hostname
self.cookiejar.set_cookie(cookie)
@__handle_extraction_exceptions
def __extract_info(self, url, ie, download, extra_info, process):
# Compat with passing cookies in http headers
self._apply_header_cookies(url)
ie_result = ie.extract(url)
if ie_result is None: # Finished already (backwards compatibility; listformats and friends should be moved here)
return
@ -897,7 +980,7 @@ class YoutubeDL(object):
def process_ie_result(self, ie_result, download=True, extra_info={}):
"""
Take the result of the ie(may be modified) and resolve all unresolved
Take the result of the ie (may be modified) and resolve all unresolved
references (URLs, playlist items).
It will also download the videos if 'download'.
@ -1468,23 +1551,45 @@ class YoutubeDL(object):
parsed_selector = _parse_format_selection(iter(TokenIterator(tokens)))
return _build_selector_function(parsed_selector)
def _calc_headers(self, info_dict):
res = std_headers.copy()
def _calc_headers(self, info_dict, load_cookies=False):
if load_cookies: # For --load-info-json
# load cookies from http_headers in legacy info.json
self._load_cookies(traverse_obj(info_dict, ('http_headers', 'Cookie'), casesense=False),
autoscope=info_dict['url'])
# load scoped cookies from info.json
self._load_cookies(info_dict.get('cookies'), autoscope=False)
add_headers = info_dict.get('http_headers')
if add_headers:
res.update(add_headers)
cookies = self._calc_cookies(info_dict)
cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])
if cookies:
res['Cookie'] = cookies
# Make a string like name1=val1; attr1=a_val1; ...name2=val2; ...
# By convention a cookie name can't be a well-known attribute name
# so this syntax is unambiguous and can be parsed by (eg) SimpleCookie
encoder = compat_http_cookies_SimpleCookie()
values = []
attributes = (('Domain', '='), ('Path', '='), ('Secure',), ('Expires', '='), ('Version', '='))
attributes = tuple([x[0].lower()] + list(x) for x in attributes)
for cookie in cookies:
_, value = encoder.value_encode(cookie.value)
# Py 2 '' --> '', Py 3 '' --> '""'
if value == '':
value = '""'
values.append('='.join((cookie.name, value)))
for attr in attributes:
value = getattr(cookie, attr[0], None)
if value:
values.append('%s%s' % (''.join(attr[1:]), value if len(attr) == 3 else ''))
info_dict['cookies'] = '; '.join(values)
res = std_headers.copy()
res.update(info_dict.get('http_headers') or {})
res = self._remove_cookie_header(res)
if 'X-Forwarded-For' not in res:
x_forwarded_for_ip = info_dict.get('__x_forwarded_for_ip')
if x_forwarded_for_ip:
res['X-Forwarded-For'] = x_forwarded_for_ip
return res
return res or None
def _calc_cookies(self, info_dict):
pr = sanitized_Request(info_dict['url'])
@ -1663,10 +1768,13 @@ class YoutubeDL(object):
format['protocol'] = determine_protocol(format)
# Add HTTP headers, so that external programs can use them from the
# json output
full_format_info = info_dict.copy()
full_format_info.update(format)
format['http_headers'] = self._calc_headers(full_format_info)
# Remove private housekeeping stuff
format['http_headers'] = self._calc_headers(ChainMap(format, info_dict), load_cookies=True)
# Safeguard against old/insecure infojson when using --load-info-json
info_dict['http_headers'] = self._remove_cookie_header(
info_dict.get('http_headers') or {}) or None
# Remove private housekeeping stuff (copied to http_headers in _calc_headers())
if '__x_forwarded_for_ip' in info_dict:
del info_dict['__x_forwarded_for_ip']
@ -1927,17 +2035,9 @@ class YoutubeDL(object):
(sub_lang, error_to_compat_str(err)))
continue
if self.params.get('writeinfojson', False):
infofn = replace_extension(filename, 'info.json', info_dict.get('ext'))
if self.params.get('nooverwrites', False) and os.path.exists(encodeFilename(infofn)):
self.to_screen('[info] Video description metadata is already present')
else:
self.to_screen('[info] Writing video description metadata as JSON to: ' + infofn)
try:
write_json_file(self.filter_requested_info(info_dict), infofn)
except (OSError, IOError):
self.report_error('Cannot write metadata to JSON file ' + infofn)
return
self._write_info_json(
'video description', info_dict,
replace_extension(filename, 'info.json', info_dict.get('ext')))
self._write_thumbnails(info_dict, filename)
@ -1958,7 +2058,11 @@ class YoutubeDL(object):
fd.add_progress_hook(ph)
if self.params.get('verbose'):
self.to_screen('[debug] Invoking downloader on %r' % info.get('url'))
return fd.download(name, info)
new_info = dict((k, v) for k, v in info.items() if not k.startswith('__p'))
new_info['http_headers'] = self._calc_headers(new_info)
return fd.download(name, new_info)
if info_dict.get('requested_formats') is not None:
downloaded = []
@ -2484,7 +2588,7 @@ class YoutubeDL(object):
opts_proxy = self.params.get('proxy')
if opts_cookiefile is None:
self.cookiejar = compat_cookiejar.CookieJar()
self.cookiejar = YoutubeDLCookieJar()
else:
opts_cookiefile = expand_path(opts_cookiefile)
self.cookiejar = YoutubeDLCookieJar(opts_cookiefile)
@ -2545,6 +2649,28 @@ class YoutubeDL(object):
encoding = preferredencoding()
return encoding
def _write_info_json(self, label, info_dict, infofn, overwrite=None):
if not self.params.get('writeinfojson', False):
return False
def msg(fmt, lbl):
return fmt % (lbl + ' metadata',)
if overwrite is None:
overwrite = not self.params.get('nooverwrites', False)
if not overwrite and os.path.exists(encodeFilename(infofn)):
self.to_screen(msg('[info] %s is already present', label.title()))
return 'exists'
else:
self.to_screen(msg('[info] Writing %s as JSON to: ' + infofn, label))
try:
write_json_file(self.filter_requested_info(info_dict), infofn)
return True
except (OSError, IOError):
self.report_error(msg('Cannot write %s to JSON file ' + infofn, label))
return
def _write_thumbnails(self, info_dict, filename):
if self.params.get('writethumbnail', False):
thumbnails = info_dict.get('thumbnails')

View file

@ -13,7 +13,9 @@ from ..utils import (
error_to_compat_str,
format_bytes,
shell_quote,
T,
timeconvert,
traverse_obj,
)
@ -339,6 +341,10 @@ class FileDownloader(object):
def download(self, filename, info_dict):
"""Download to a filename using the info from info_dict
Return True on success and False otherwise
This method filters the `Cookie` header from the info_dict to prevent leaks.
Downloaders have their own way of handling cookies.
See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj
"""
nooverwrites_and_exists = (
@ -373,6 +379,9 @@ class FileDownloader(object):
else '%.2f' % sleep_interval))
time.sleep(sleep_interval)
info_dict['http_headers'] = dict(traverse_obj(info_dict, (
'http_headers', T(dict.items), lambda _, pair: pair[0].lower() != 'cookie'))) or None
return self.real_download(filename, info_dict)
def real_download(self, filename, info_dict):