From d737ca57a765ef9af275851844cd6f380d54b042 Mon Sep 17 00:00:00 2001 From: derrod Date: Fri, 3 Dec 2021 15:48:48 +0100 Subject: [PATCH] [core/egs] Use thread pool for fetching metadata --- legendary/api/egs.py | 15 +++++++++++---- legendary/core.py | 42 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/legendary/api/egs.py b/legendary/api/egs.py index 9102bf7..435be09 100644 --- a/legendary/api/egs.py +++ b/legendary/api/egs.py @@ -2,6 +2,7 @@ # coding: utf-8 import requests +import requests.adapters import logging from requests.auth import HTTPBasicAuth @@ -28,11 +29,16 @@ class EPCAPI: _store_gql_host = 'store-launcher.epicgames.com' def __init__(self, lc='en', cc='US'): - self.session = requests.session() self.log = logging.getLogger('EPCAPI') - self.unauth_session = requests.session() + + self.session = requests.session() self.session.headers['User-Agent'] = self._user_agent + # increase maximum pool size for multithreaded metadata requests + self.session.mount('https://', requests.adapters.HTTPAdapter(pool_maxsize=16)) + + self.unauth_session = requests.session() self.unauth_session.headers['User-Agent'] = self._user_agent + self._oauth_basic = HTTPBasicAuth(self._user_basic, self._pw_basic) self.access_token = None @@ -149,10 +155,11 @@ class EPCAPI: r.raise_for_status() return r.json() - def get_game_info(self, namespace, catalog_item_id): + def get_game_info(self, namespace, catalog_item_id, timeout=None): r = self.session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/{namespace}/bulk/items', params=dict(id=catalog_item_id, includeDLCDetails=True, includeMainGameDetails=True, - country=self.country_code, locale=self.language_code)) + country=self.country_code, locale=self.language_code), + timeout=timeout) r.raise_for_status() return r.json().get(catalog_item_id, None) diff --git a/legendary/core.py b/legendary/core.py index f8ae0fe..e4acabd 100644 --- a/legendary/core.py +++ b/legendary/core.py @@ -8,6 +8,7 @@ import shutil from base64 import b64decode from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor from datetime import timezone from locale import getdefaultlocale from multiprocessing import Queue @@ -387,6 +388,9 @@ class LegendaryCore: else: assets[ga.app_name][_platform] = ga + fetch_list = [] + games = {} + for app_name, app_assets in sorted(assets.items()): if skip_ue and any(v.namespace == 'ue' for v in app_assets.values()): continue @@ -395,19 +399,41 @@ class LegendaryCore: asset_updated = False if game: asset_updated = any(game.app_version(_p) != app_assets[_p].build_version for _p in app_assets.keys()) + games[app_name] = game if update_assets and (not game or force_refresh or (game and asset_updated)): - if game and asset_updated: - self.log.info(f'Updating meta for {game.app_name} due to build version mismatch') - + self.log.debug(f'Scheduling metadata update for {app_name}') # namespace/catalog item are the same for all platforms, so we can just use the first one _ga = next(iter(app_assets.values())) - eg_meta = self.egs.get_game_info(_ga.namespace, _ga.catalog_item_id) - game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta, - asset_infos=app_assets) - + fetch_list.append((app_name, _ga.namespace, _ga.catalog_item_id)) meta_updated = True - self.lgd.set_game_meta(game.app_name, game) + + def fetch_game_meta(args): + app_name, namespace, catalog_item_id = args + eg_meta = self.egs.get_game_info(namespace, catalog_item_id, timeout=10.0) + game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta, asset_infos=assets[app_name]) + self.lgd.set_game_meta(game.app_name, game) + games[app_name] = game + + # setup and teardown of thread pool takes some time, so only do it when it makes sense. + use_threads = len(fetch_list) > 5 + self.log.info(f'Fetching metadata for {len(fetch_list)} apps.') + if use_threads: + with ThreadPoolExecutor(max_workers=16) as executor: + executor.map(fetch_game_meta, fetch_list, timeout=60.0) + + for app_name, app_assets in sorted(assets.items()): + if skip_ue and any(v.namespace == 'ue' for v in app_assets.values()): + continue + + game = games.get(app_name) + # retry if metadata is still missing/threaded loading wasn't used + if not game: + if use_threads: + self.log.warning(f'Fetching metadata for {app_name} failed, retrying') + _ga = next(iter(app_assets.values())) + fetch_game_meta((app_name, _ga.namespace, _ga.catalog_item_id)) + game = games[app_name] if game.is_dlc: _dlc[game.metadata['mainGameItem']['id']].append(game)