Test asynchronous api requests

This commit is contained in:
derrod 2021-12-03 14:55:30 +01:00
parent 5671448264
commit a72b9c9fa6
2 changed files with 53 additions and 9 deletions

View file

@ -1,6 +1,7 @@
# !/usr/bin/env python
# coding: utf-8
import aiohttp
import requests
import logging
@ -40,6 +41,13 @@ class EPCAPI:
self.language_code = lc
self.country_code = cc
self.aio_session = None
def init_aio_session(self):
self.aio_session = aiohttp.ClientSession(headers=self.session.headers)
async def close_aio_session(self):
return await self.aio_session.close()
def update_egs_params(self, egs_params):
# update user-agent
@ -156,6 +164,17 @@ class EPCAPI:
r.raise_for_status()
return r.json().get(catalog_item_id, None)
async def aio_get_game_info(self, namespace, catalog_item_id):
if not self.aio_session:
raise RuntimeError('No asiohttp session!')
async with self.aio_session.get(f'https://{self._catalog_host}/catalog/api/shared/namespace/'
f'{namespace}/bulk/items', raise_for_status=True,
params=dict(id=catalog_item_id, includeDLCDetails='true',
includeMainGameDetails='true', country=self.country_code,
locale=self.language_code)) as r:
return (await r.json()).get(catalog_item_id, None)
def get_library_items(self, include_metadata=True):
records = []
r = self.session.get(f'https://{self._library_host}/library/api/public/items',

View file

@ -1,5 +1,6 @@
# coding: utf-8
import asyncio
import json
import logging
import os
@ -368,6 +369,12 @@ class LegendaryCore:
def get_game_list(self, update_assets=True, platform='Windows') -> List[Game]:
return self.get_game_and_dlc_list(update_assets=update_assets, platform=platform)[0]
async def _fetch_game_info(self, app_name: str, ga: GameAsset, app_assets: Dict[str, GameAsset] = None) -> Game:
eg_meta = await self.egs.aio_get_game_info(ga.namespace, ga.catalog_item_id)
game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta, asset_infos=app_assets)
self.lgd.set_game_meta(game.app_name, game)
return game
def get_game_and_dlc_list(self, update_assets=True, platform='Windows',
force_refresh=False, skip_ue=True) -> (List[Game], Dict[str, List[Game]]):
_ret = []
@ -387,6 +394,7 @@ class LegendaryCore:
else:
assets[ga.app_name][_platform] = ga
to_fetch = []
for app_name, app_assets in sorted(assets.items()):
if skip_ue and any(v.namespace == 'ue' for v in app_assets.values()):
continue
@ -402,17 +410,34 @@ class LegendaryCore:
# namespace/catalog item are the same for all platforms, so we can just use the first one
_ga = next(iter(app_assets.values()))
eg_meta = self.egs.get_game_info(_ga.namespace, _ga.catalog_item_id)
game = Game(app_name=app_name, app_title=eg_meta['title'], metadata=eg_meta,
asset_infos=app_assets)
to_fetch.append((app_name, _ga, app_assets))
meta_updated = True
self.lgd.set_game_meta(game.app_name, game)
if game.is_dlc:
_dlc[game.metadata['mainGameItem']['id']].append(game)
elif not any(i['path'] == 'mods' for i in game.metadata.get('categories', [])) and platform in app_assets:
_ret.append(game)
if game:
if game.is_dlc:
_dlc[game.metadata['mainGameItem']['id']].append(game)
elif not any(i['path'] == 'mods' for i in game.metadata.get('categories', [])) and platform in app_assets:
_ret.append(game)
async def fetch_remaining_infos(fetch_list):
self.egs.init_aio_session()
coros = [self._fetch_game_info(a_n, ga, a_s) for a_n, ga, a_s in fetch_list]
res = await asyncio.gather(*coros, return_exceptions=True)
await self.egs.close_aio_session()
return res
loop = asyncio.get_event_loop()
results = loop.run_until_complete(fetch_remaining_infos(to_fetch))
for res in results:
if isinstance(res, Exception):
self.log.error(f'Failed to fetch game info: {res!r}')
continue
if res.is_dlc:
_dlc[res.metadata['mainGameItem']['id']].append(res)
elif not any(i['path'] == 'mods' for i in res.metadata.get('categories', [])) and platform in res.asset_infos:
_ret.append(res)
self.update_aliases(force=meta_updated)
if meta_updated: