[api/cli/core] Add extremely basic support for cloud saves

Currently only supports downloading all saves to a folder,
in the future it should support automatically extracting save
files to the proper directory (at least on Windows).
This commit is contained in:
derrod 2020-05-06 15:40:04 +02:00
parent 693ad3cefc
commit 31530692ef
3 changed files with 127 additions and 4 deletions

View file

@ -20,6 +20,7 @@ class EPCAPI:
_entitlements_host = 'entitlement-public-service-prod08.ol.epicgames.com' _entitlements_host = 'entitlement-public-service-prod08.ol.epicgames.com'
_catalog_host = 'catalog-public-service-prod06.ol.epicgames.com' _catalog_host = 'catalog-public-service-prod06.ol.epicgames.com'
_ecommerce_host = 'ecommerceintegration-public-service-ecomprod02.ol.epicgames.com' _ecommerce_host = 'ecommerceintegration-public-service-ecomprod02.ol.epicgames.com'
_datastorage_host = 'datastorage-public-service-liveegs.live.use1a.on.epicgames.com'
def __init__(self): def __init__(self):
self.session = requests.session() self.session = requests.session()
@ -117,3 +118,23 @@ class EPCAPI:
country='US', locale='en')) country='US', locale='en'))
r.raise_for_status() r.raise_for_status()
return r.json().get(catalog_item_id, None) return r.json().get(catalog_item_id, None)
def get_user_cloud_saves(self, app_name='', manifests=False, filenames=None):
if app_name and manifests:
app_name += '/manifests/'
elif app_name:
app_name += '/'
user_id = self.user.get('account_id')
if filenames:
r = self.session.post(f'https://{self._datastorage_host}/api/v1/access/egstore/savesync/'
f'{user_id}/{app_name}', json=dict(files=filenames))
else:
r = self.session.get(f'https://{self._datastorage_host}/api/v1/access/egstore/savesync/'
f'{user_id}/{app_name}')
r.raise_for_status()
return r.json()
def create_game_cloud_saves(self, app_name, filenames):
return self.get_user_cloud_saves(app_name, filenames=filenames)

View file

@ -194,6 +194,34 @@ class LegendaryCLI:
# use the log output so this isn't included when piping file list into file # use the log output so this isn't included when piping file list into file
logger.info(f'Install tags: {", ".join(sorted(install_tags))}') logger.info(f'Install tags: {", ".join(sorted(install_tags))}')
def list_saves(self, args):
if not self.core.login():
logger.error('Login failed! Cannot continue with download process.')
exit(1)
# update game metadata
logger.debug('Refreshing games list...')
_ = self.core.get_game_and_dlc_list(update_assets=True)
# then get the saves
logger.info('Getting list of saves...')
saves = self.core.get_save_games(args.app_name)
last_app = ''
print('Save games:')
for save in sorted(saves, key=lambda a: a.app_name):
if save.app_name != last_app:
game_title = self.core.get_game(save.app_name).app_title
last_app = save.app_name
print(f'- {game_title} ("{save.app_name}")')
print(' +', save.manifest_name)
def download_saves(self, args):
if not self.core.login():
logger.error('Login failed! Cannot continue with download process.')
exit(1)
# then get the saves
logger.info(f'Downloading saves to "{self.core.get_default_install_dir()}"')
# todo expand this to allow downloading single saves and extracting them to the correct directory
self.core.download_saves()
def launch_game(self, args, extra): def launch_game(self, args, extra):
app_name = args.app_name app_name = args.app_name
if not self.core.is_installed(app_name): if not self.core.is_installed(app_name):
@ -435,11 +463,16 @@ def main():
list_parser = subparsers.add_parser('list-games', help='List available (installable) games') list_parser = subparsers.add_parser('list-games', help='List available (installable) games')
list_installed_parser = subparsers.add_parser('list-installed', help='List installed games') list_installed_parser = subparsers.add_parser('list-installed', help='List installed games')
list_files_parser = subparsers.add_parser('list-files', help='List files in manifest') list_files_parser = subparsers.add_parser('list-files', help='List files in manifest')
list_saves_parser = subparsers.add_parser('list-saves', help='List available cloud saves')
download_saves_parser = subparsers.add_parser('download-saves', help='Download all cloud saves')
install_parser.add_argument('app_name', help='Name of the app', metavar='<App Name>') install_parser.add_argument('app_name', help='Name of the app', metavar='<App Name>')
uninstall_parser.add_argument('app_name', help='Name of the app', metavar='<App Name>') uninstall_parser.add_argument('app_name', help='Name of the app', metavar='<App Name>')
launch_parser.add_argument('app_name', help='Name of the app', metavar='<App Name>') launch_parser.add_argument('app_name', help='Name of the app', metavar='<App Name>')
list_files_parser.add_argument('app_name', nargs='?', help='Name of the app', metavar='<App Name>') list_files_parser.add_argument('app_name', nargs='?', metavar='<App Name>',
help='Name of the app (optional)')
list_saves_parser.add_argument('app_name', nargs='?', metavar='<App Name>', default='',
help='Name of the app (optional)')
# importing only works on Windows right now # importing only works on Windows right now
if os.name == 'nt': if os.name == 'nt':
@ -526,14 +559,15 @@ def main():
exit(0) exit(0)
if args.subparser_name not in ('auth', 'list-games', 'list-installed', 'list-files', if args.subparser_name not in ('auth', 'list-games', 'list-installed', 'list-files',
'launch', 'download', 'uninstall', 'install', 'update'): 'launch', 'download', 'uninstall', 'install', 'update',
'list-saves', 'download-saves'):
print(parser.format_help()) print(parser.format_help())
# Print the main help *and* the help for all of the subcommands. Thanks stackoverflow! # Print the main help *and* the help for all of the subcommands. Thanks stackoverflow!
print('Individual command help:') print('Individual command help:')
subparsers = next(a for a in parser._actions if isinstance(a, argparse._SubParsersAction)) subparsers = next(a for a in parser._actions if isinstance(a, argparse._SubParsersAction))
for choice, subparser in subparsers.choices.items(): for choice, subparser in subparsers.choices.items():
if choice in ('install', 'update'): if choice in ('download', 'update'):
continue continue
print(f'\nCommand: {choice}') print(f'\nCommand: {choice}')
print(subparser.format_help()) print(subparser.format_help())
@ -566,6 +600,10 @@ def main():
cli.uninstall_game(args) cli.uninstall_game(args)
elif args.subparser_name == 'list-files': elif args.subparser_name == 'list-files':
cli.list_files(args) cli.list_files(args)
elif args.subparser_name == 'list-saves':
cli.list_saves(args)
elif args.subparser_name == 'download-saves':
cli.download_saves(args)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info('Command was aborted via KeyboardInterrupt, cleaning up...') logger.info('Command was aborted via KeyboardInterrupt, cleaning up...')

View file

@ -8,7 +8,7 @@ import shlex
import shutil import shutil
from base64 import b64decode from base64 import b64decode
from collections import defaultdict from collections import defaultdict, namedtuple
from datetime import datetime from datetime import datetime
from multiprocessing import Queue from multiprocessing import Queue
from random import choice as randchoice from random import choice as randchoice
@ -25,6 +25,7 @@ from legendary.models.exceptions import *
from legendary.models.game import * from legendary.models.game import *
from legendary.models.json_manifest import JSONManifest from legendary.models.json_manifest import JSONManifest
from legendary.models.manifest import Manifest, ManifestMeta from legendary.models.manifest import Manifest, ManifestMeta
from legendary.models.chunk import Chunk
from legendary.utils.game_workarounds import is_opt_enabled from legendary.utils.game_workarounds import is_opt_enabled
@ -272,6 +273,69 @@ class LegendaryCore:
return params, working_dir, env return params, working_dir, env
def get_save_games(self, app_name: str = ''):
# todo make this a proper class in legendary.models.egs or something
CloudSave = namedtuple('CloudSave', ['filename', 'app_name', 'manifest_name', 'iso_date'])
savegames = self.egs.get_user_cloud_saves(app_name)
_saves = []
for fname, f in savegames['files'].items():
if '.manifest' not in fname:
continue
f_parts = fname.split('/')
_saves.append(CloudSave(filename=fname, app_name=f_parts[2],
manifest_name=f_parts[4], iso_date=f['lastModified']))
return _saves
def download_saves(self):
save_path = os.path.join(self.get_default_install_dir(), '.saves')
if not os.path.exists(save_path):
os.makedirs(save_path)
savegames = self.egs.get_user_cloud_saves()
files = savegames['files']
for fname, f in files.items():
if '.manifest' not in fname:
continue
f_parts = fname.split('/')
save_dir = os.path.join(save_path, f'{f_parts[2]}/{f_parts[4].rpartition(".")[0]}')
if not os.path.exists(save_dir):
os.makedirs(save_dir)
self.log.info(f'Downloading "{fname.split("/", 2)[2]}"...')
# download manifest
r = self.egs.unauth_session.get(f['readLink'])
if r.status_code != 200:
self.log.error(f'Download failed, status code: {r.status_code}')
continue
m = self.load_manfiest(r.content)
# download chunks requierd for extraction
chunks = dict()
for chunk in m.chunk_data_list.elements:
cpath_p = fname.split('/', 3)[:3]
cpath_p.append(chunk.path)
cpath = '/'.join(cpath_p)
self.log.debug(f'Downloading chunk "{cpath}"')
r = self.egs.unauth_session.get(files[cpath]['readLink'])
if r.status_code != 200:
self.log.error(f'Download failed, status code: {r.status_code}')
break
c = Chunk.read_buffer(r.content)
chunks[c.guid_num] = c.data
for fm in m.file_manifest_list.elements:
dirs, fname = os.path.split(fm.filename)
fdir = os.path.join(save_dir, dirs)
fpath = os.path.join(fdir, fname)
if not os.path.exists(fdir):
os.makedirs(fdir)
self.log.debug(f'Writing "{fpath}"...')
with open(fpath, 'wb') as fh:
for cp in fm.chunk_parts:
fh.write(chunks[cp.guid_num][cp.offset:cp.offset+cp.size])
def is_offline_game(self, app_name: str) -> bool: def is_offline_game(self, app_name: str) -> bool:
return self.lgd.config.getboolean(app_name, 'offline', fallback=False) return self.lgd.config.getboolean(app_name, 'offline', fallback=False)