[core/egs] Use thread pool for fetching metadata

This commit is contained in:
derrod 2021-12-03 15:48:48 +01:00
parent 5671448264
commit d737ca57a7
2 changed files with 45 additions and 12 deletions

View file

@ -2,6 +2,7 @@
# coding: utf-8 # coding: utf-8
import requests import requests
import requests.adapters
import logging import logging
from requests.auth import HTTPBasicAuth from requests.auth import HTTPBasicAuth
@ -28,11 +29,16 @@ class EPCAPI:
_store_gql_host = 'store-launcher.epicgames.com' _store_gql_host = 'store-launcher.epicgames.com'
def __init__(self, lc='en', cc='US'): def __init__(self, lc='en', cc='US'):
self.session = requests.session()
self.log = logging.getLogger('EPCAPI') self.log = logging.getLogger('EPCAPI')
self.unauth_session = requests.session()
self.session = requests.session()
self.session.headers['User-Agent'] = self._user_agent self.session.headers['User-Agent'] = self._user_agent
# increase maximum pool size for multithreaded metadata requests
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_maxsize=16))
self.unauth_session = requests.session()
self.unauth_session.headers['User-Agent'] = self._user_agent self.unauth_session.headers['User-Agent'] = self._user_agent
self._oauth_basic = HTTPBasicAuth(self._user_basic, self._pw_basic) self._oauth_basic = HTTPBasicAuth(self._user_basic, self._pw_basic)
self.access_token = None self.access_token = None
@ -149,10 +155,11 @@ class EPCAPI:
r.raise_for_status() r.raise_for_status()
return r.json() return r.json()
def get_game_info(self, namespace, catalog_item_id): def get_game_info(self, namespace, catalog_item_id, timeout=None):
r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items', r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items',
params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True, params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True,
country=self.country_code, locale=self.language_code)) country=self.country_code, locale=self.language_code),
timeout=timeout)
r.raise_for_status() r.raise_for_status()
return r.json().get(catalog_item_id, None) return r.json().get(catalog_item_id, None)

View file

@ -8,6 +8,7 @@ import shutil
from base64 import b64decode from base64 import b64decode
from collections import defaultdict from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import timezone from datetime import timezone
from locale import getdefaultlocale from locale import getdefaultlocale
from multiprocessing import Queue from multiprocessing import Queue
@ -387,6 +388,9 @@ class LegendaryCore:
else: else:
assets[ga.app_name][_platform] = ga assets[ga.app_name][_platform] = ga
fetch_list = []
games = {}
for app_name, app_assets in sorted(assets.items()): for app_name, app_assets in sorted(assets.items()):
if skip_ue and any(v.namespace == 'ue' for v in app_assets.values()): if skip_ue and any(v.namespace == 'ue' for v in app_assets.values()):
continue continue
@ -395,19 +399,41 @@ class LegendaryCore:
asset_updated = False asset_updated = False
if game: if game:
asset_updated = any(game.app_version(_p) != app_assets[_p].build_version for _p in app_assets.keys()) asset_updated = any(game.app_version(_p) != app_assets[_p].build_version for _p in app_assets.keys())
games[app_name] = game
if update_assets and (not game or force_refresh or (game and asset_updated)): if update_assets and (not game or force_refresh or (game and asset_updated)):
if game and asset_updated: self.log.debug(f'Scheduling metadata update for {app_name}')
self.log.info(f'Updating meta for {game.app_name} due to build version mismatch')
# namespace/catalog item are the same for all platforms, so we can just use the first one # namespace/catalog item are the same for all platforms, so we can just use the first one
_ga = next(iter(app_assets.values())) _ga = next(iter(app_assets.values()))
eg_meta = self.egs.get_game_info(_ga.namespace, _ga.catalog_item_id) fetch_list.append((app_name, _ga.namespace, _ga.catalog_item_id))
game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta,
asset_infos=app_assets)
meta_updated = True meta_updated = True
self.lgd.set_game_meta(game.app_name, game)
def fetch_game_meta(args):
app_name, namespace, catalog_item_id = args
eg_meta = self.egs.get_game_info(namespace, catalog_item_id, timeout=10.0)
game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta, asset_infos=assets[app_name])
self.lgd.set_game_meta(game.app_name, game)
games[app_name] = game
# setup and teardown of thread pool takes some time, so only do it when it makes sense.
use_threads = len(fetch_list) > 5
self.log.info(f'Fetching metadata for {len(fetch_list)} apps.')
if use_threads:
with ThreadPoolExecutor(max_workers=16) as executor:
executor.map(fetch_game_meta, fetch_list, timeout=60.0)
for app_name, app_assets in sorted(assets.items()):
if skip_ue and any(v.namespace == 'ue' for v in app_assets.values()):
continue
game = games.get(app_name)
# retry if metadata is still missing/threaded loading wasn't used
if not game:
if use_threads:
self.log.warning(f'Fetching metadata for {app_name} failed, retrying')
_ga = next(iter(app_assets.values()))
fetch_game_meta((app_name, _ga.namespace, _ga.catalog_item_id))
game = games[app_name]
if game.is_dlc: if game.is_dlc:
_dlc[game.metadata['mainGameItem']['id']].append(game) _dlc[game.metadata['mainGameItem']['id']].append(game)