[core/cli/utils] Extremely WIP Steam Sync from 2022

Unfinished and might not even work anymore, left here as a reference for future work.
This commit is contained in:
derrod 2023-11-16 02:21:53 +01:00
parent f1d815797f
commit 64639a5520
7 changed files with 849 additions and 8 deletions

View file

@ -28,9 +28,12 @@ jobs:
pyinstaller pyinstaller
requests requests
filelock filelock
pefile
vdf
Pillow
- name: Optional dependencies (WebView) - name: Optional dependencies (WebView)
run: pip3 install --upgrade pywebview run: pip3 install --upgrade "pywebview<4.0"
if: runner.os != 'macOS' if: runner.os != 'macOS'
- name: Set strip option on non-Windows - name: Set strip option on non-Windows

View file

@ -576,6 +576,9 @@ class LegendaryCLI:
if args.origin: if args.origin:
return self._launch_origin(args) return self._launch_origin(args)
if args.steam and sys_platform == 'linux':
return self._launch_steam(app_name, args)
igame = self.core.get_installed_game(app_name) igame = self.core.get_installed_game(app_name)
if not igame: if not igame:
logger.error(f'Game {app_name} is not currently installed!') logger.error(f'Game {app_name} is not currently installed!')
@ -703,7 +706,9 @@ class LegendaryCLI:
if params.environment: if params.environment:
logger.debug('Environment overrides: {}'.format(', '.join( logger.debug('Environment overrides: {}'.format(', '.join(
f'{k}={v}' for k, v in params.environment.items()))) f'{k}={v}' for k, v in params.environment.items())))
subprocess.Popen(full_params, cwd=params.working_directory, env=full_env) p = subprocess.Popen(full_params, cwd=params.working_directory, env=full_env)
if args.wait:
p.wait()
def _launch_origin(self, args): def _launch_origin(self, args):
game = self.core.get_game(app_name=args.app_name) game = self.core.get_game(app_name=args.app_name)
@ -802,6 +807,50 @@ class LegendaryCLI:
logger.debug(f'Opening Origin URI with command: {shlex.join(command)}') logger.debug(f'Opening Origin URI with command: {shlex.join(command)}')
subprocess.Popen(command, env=full_env) subprocess.Popen(command, env=full_env)
def _launch_steam(self, app_name, args):
def exit_error(msg, errcode=1):
print('https://legendary.gl/steam_error?code=' + msg)
exit(errcode)
igame = self.core.get_installed_game(app_name)
if not igame:
exit_error(f'not_installed')
if igame.is_dlc:
exit_error(f'is_dlc')
if not os.path.exists(igame.install_path):
exit_error(f'install_dir_missing')
# override with config value
args.offline = self.core.is_offline_game(app_name) or args.offline
if not args.offline:
logger.info('Logging in...')
try:
if not self.core.login():
exit_error('login_failed')
except ValueError:
exit_error('login_failed_no_credentials')
if not args.skip_version_check and not self.core.is_noupdate_game(app_name):
logger.info('Checking for updates...')
try:
latest = self.core.get_asset(app_name, update=True, platform=igame.platform)
except ValueError:
exit_error('metadata_missing')
if latest.build_version != igame.version:
exit_error('app_outdated')
params = self.core.get_launch_parameters(app_name=app_name, offline=args.offline,
user=args.user_name_override,
language=args.language, disable_wine=True)
full_params = []
full_params.extend(params.game_parameters)
full_params.extend(params.user_parameters)
full_params.extend(params.egl_parameters)
logger.debug(f'Launch parameters: {shlex.join(full_params)}')
print(shlex.join(full_params))
def install_game(self, args): def install_game(self, args):
if not self.core.lgd.lock_installed(): if not self.core.lgd.lock_installed():
logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may ' logger.fatal('Failed to acquire installed data lock, only one instance of Legendary may '
@ -2612,6 +2661,13 @@ class LegendaryCLI:
self.core.install_game(igame) self.core.install_game(igame)
logger.info('Finished.') logger.info('Finished.')
def steam_sync(self, args):
if not self.core.login():
logger.error('Login failed!')
return
self.core.steam_sync()
def main(): def main():
# Set output encoding to UTF-8 if not outputting to a terminal # Set output encoding to UTF-8 if not outputting to a terminal
@ -2662,6 +2718,7 @@ def main():
list_saves_parser = subparsers.add_parser('list-saves', help='List available cloud saves') list_saves_parser = subparsers.add_parser('list-saves', help='List available cloud saves')
move_parser = subparsers.add_parser('move', help='Move specified app name to a new location') move_parser = subparsers.add_parser('move', help='Move specified app name to a new location')
status_parser = subparsers.add_parser('status', help='Show legendary status information') status_parser = subparsers.add_parser('status', help='Show legendary status information')
steam_parser = subparsers.add_parser('steam-sync', help='Setup/Run Steam Sync')
sync_saves_parser = subparsers.add_parser('sync-saves', help='Sync cloud saves') sync_saves_parser = subparsers.add_parser('sync-saves', help='Sync cloud saves')
uninstall_parser = subparsers.add_parser('uninstall', help='Uninstall (delete) a game') uninstall_parser = subparsers.add_parser('uninstall', help='Uninstall (delete) a game')
verify_parser = subparsers.add_parser('verify', help='Verify a game\'s local files', verify_parser = subparsers.add_parser('verify', help='Verify a game\'s local files',
@ -2814,6 +2871,10 @@ def main():
help='Launch Origin to activate or run the game.') help='Launch Origin to activate or run the game.')
launch_parser.add_argument('--json', dest='json', action='store_true', launch_parser.add_argument('--json', dest='json', action='store_true',
help='Print launch information as JSON and exit') help='Print launch information as JSON and exit')
launch_parser.add_argument('--wait', dest='wait', action='store_true',
help='Wait until child process exits')
# hidden option for Steam sync launch
launch_parser.add_argument('--steam', dest='steam', action='store_true', help=argparse.SUPPRESS)
if os.name != 'nt': if os.name != 'nt':
launch_parser.add_argument('--wine', dest='wine_bin', action='store', metavar='<wine binary>', launch_parser.add_argument('--wine', dest='wine_bin', action='store', metavar='<wine binary>',
@ -3010,8 +3071,8 @@ def main():
if args.full_help: if args.full_help:
# Commands that should not be shown in full help/list of commands (e.g. aliases) # Commands that should not be shown in full help/list of commands (e.g. aliases)
_hidden_commands = {'download', 'update', 'repair', 'get-token', _hidden_commands = {'download', 'update', 'repair', 'get-token', 'import-game',
'import-game', 'verify-game', 'list-games'} 'verify-game', 'list-games'}
# Print the help for all of the subparsers. Thanks stackoverflow! # Print the help for all of the subparsers. Thanks stackoverflow!
print('Individual command help:') print('Individual command help:')
subparsers = next(a for a in parser._actions if isinstance(a, argparse._SubParsersAction)) subparsers = next(a for a in parser._actions if isinstance(a, argparse._SubParsersAction))
@ -3099,6 +3160,8 @@ def main():
cli.crossover_setup(args) cli.crossover_setup(args)
elif args.subparser_name == 'move': elif args.subparser_name == 'move':
cli.move(args) cli.move(args)
elif args.subparser_name == 'steam-sync':
cli.steam_sync(args)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info('Command was aborted via KeyboardInterrupt, cleaning up...') logger.info('Command was aborted via KeyboardInterrupt, cleaning up...')

View file

@ -40,6 +40,7 @@ from legendary.utils.game_workarounds import is_opt_enabled, update_workarounds,
from legendary.utils.savegame_helper import SaveGameHelper from legendary.utils.savegame_helper import SaveGameHelper
from legendary.utils.selective_dl import games as sdl_games from legendary.utils.selective_dl import games as sdl_games
from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search
from legendary.utils.steam import SteamHelper
# ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI # ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI
@ -574,8 +575,12 @@ class LegendaryCore:
def get_installed_game(self, app_name, skip_sync=False) -> InstalledGame: def get_installed_game(self, app_name, skip_sync=False) -> InstalledGame:
igame = self._get_installed_game(app_name) igame = self._get_installed_game(app_name)
if not skip_sync and igame and self.egl_sync_enabled and igame.egl_guid and not igame.is_dlc: if not skip_sync and igame:
if self.egl_sync_enabled and igame.egl_guid and not igame.is_dlc:
self.egl_sync(app_name) self.egl_sync(app_name)
if self.steam_sync_enabled and igame.steam_appid and not igame.is_dlc:
self.steam_sync(app_name)
return self._get_installed_game(app_name) return self._get_installed_game(app_name)
else: else:
return igame return igame
@ -2110,6 +2115,188 @@ class LegendaryCore:
if os.path.exists(path): if os.path.exists(path):
delete_folder(path, recursive=True) delete_folder(path, recursive=True)
@property
def steam_sync_enabled(self):
return self.lgd.config.getboolean('Legendary', 'steam_sync', fallback=False)
def _steam_export(self, sh: SteamHelper, shortcuts: dict, igame: InstalledGame):
def shortcut_exists(app_id):
for shortcut in shortcuts['shortcuts'].values():
if (shortcut['appid'] + 2**32) == app_id:
return True
return False
if igame.steam_appid and shortcut_exists(igame.steam_appid):
return False
entry = sh.create_shortcut_entry(igame, igame.steam_appid)
idx = 0
while str(idx) in shortcuts['shortcuts']:
idx += 1
shortcuts['shortcuts'][str(idx)] = entry
# add appid to installed game
igame.steam_appid = entry['appid'] + 2**32
self._install_game(igame)
# todo only do this if no wine is configured for this app
if sys_platform == 'linux':
sh.set_compat_tool(igame.steam_appid, 'proton_experimental')
return True
def _steam_remove(self):
# todo remove icons and shit as well
pass
def steam_sync(self, app_name=None, is_install=False, steam_path=None,
legendary_bin=None, steam_user=None, refresh_artwork=False):
try:
steam_path = steam_path or self.lgd.config.get('Legendary', 'steam_path', fallback=None)
legendary_bin = legendary_bin or self.lgd.config.get('Legendary', 'legendary_binary', fallback=None)
sh = SteamHelper(steam_path, legendary_bin, self.lgd.path)
if sys_platform == 'linux':
sh.ensure_launch_script()
except RuntimeError as e:
self.log.error(f'SteamHelper failed to initialize: {e!r}')
return
except FileNotFoundError:
self.log.error('Steam installation not found, please specify the installation directory '
'via the config (steam_path) or command line (--steam-path).')
return
if sh.is_steam_running():
if not is_install:
# todo use better exception
raise RuntimeError('Steam is running, please close it before running this command.')
else:
self.log.warning('Steam is still running, please restart it to reload Legendary shortcuts.')
_ = sh.get_user_dir(steam_user or self.lgd.config.get('Legendary', 'steam_user', fallback=None))
shortcuts = sh.read_shortcuts()
if sys_platform == 'linux':
sh.read_config()
any_changes = False
if app_name:
igame = self._get_installed_game(app_name)
any_changes = self._steam_export(sh, shortcuts, igame)
else:
for igame in self._get_installed_list():
any_changes = self._steam_export(sh, shortcuts, igame) or any_changes
# todo remove uninstalled games from shortcuts
if any_changes:
sh.write_shortcuts(shortcuts)
if sys_platform == 'linux':
sh.write_config()
elif not refresh_artwork:
return
# Download cover art and stuff
self.log.info('Downloading Steam Library artwork, this may take a while...')
for igame in self._get_installed_list():
if not igame.steam_appid:
continue
sh.create_grid_json(igame.steam_appid)
game = self.get_game(igame.app_name)
# go through all available image files and download if necessary
banner = logo = tall = None
# todo SteamDB instead
# todo move this into Steam Helper
for img in game.metadata.get('keyImages', []):
img_url = img['url']
img_type = img['type']
url_fname = img_url.rpartition('/')[2]
if '.' not in url_fname:
self.log.debug(f'Image url for {igame.app_name} does not have a file extension.')
# extension doesn't really matter, Steam will determine the type when loading
ext = 'jpg' if img['type'] != 'DieselGameBoxLogo' else 'png'
else:
ext = url_fname.rpartition('.')[2]
if img_type == 'DieselGameBox' or img_type == 'DieselGameBoxWide':
# Sometimes DieselGameBox doesn't exist but DieselGameBoxWide does.
# In cases where both exist they appear to be the same image.
filename = f'{igame.steam_appid}_hero.{ext}'
elif img_type == 'DieselGameBoxLogo':
filename = f'{igame.steam_appid}_logo.{ext}'
elif img_type == 'DieselGameBoxTall':
filename = f'{igame.steam_appid}p_epic.{ext}'
elif img_type == 'Thumbnail':
# If this is square use it instead of manually extracting the icon
if img['height'] == img['width']:
filename = f'{igame.steam_appid}_icon.{ext}'
else:
self.log.debug(f'Non-square thumbnail: {img_url}')
continue
else:
self.log.debug(f'Unknown EGS image type: {img["type"]}')
continue
file_path = os.path.join(sh.grid_path, filename)
if not os.path.exists(file_path) or refresh_artwork:
self.log.debug(f'Downloading {img["url"]} to {filename}')
r = self.egs.unauth_session.get(img['url'], timeout=20.0)
if r.status_code == 200:
# save component image for big picture/box generation
if img_type == 'DieselGameBox' or img_type == 'DieselGameBoxWide':
banner = r.content
elif img_type == 'DieselGameBoxLogo':
logo = r.content
elif img_type == 'DieselGameBoxTall':
tall = r.content
with open(file_path, 'wb') as f:
f.write(r.content)
# assemble the banner (Steam calls it "header") for big picture
if banner:
# Big Picture banner image
banner_id = sh.get_header_id(igame)
banner_file = os.path.join(sh.grid_path, f'{banner_id}.jpg')
if not os.path.exists(banner_file) or refresh_artwork:
with open(banner_file, 'wb') as f:
f.write(sh.make_header_image(banner, logo))
# Deck UI banner image
banner_file = os.path.join(sh.grid_path, f'{igame.steam_appid}.png')
if not os.path.exists(banner_file) or refresh_artwork:
with open(banner_file, 'wb') as f:
f.write(sh.make_banner_image(banner, logo))
# If the logo exists as a separate file we need to manually generate the "tall" box art as well
if tall and logo:
box_file = os.path.join(sh.grid_path, f'{igame.steam_appid}p.png')
if not os.path.exists(box_file) or refresh_artwork:
with open(box_file, 'wb') as f:
f.write(sh.make_tall_box(tall, logo))
# steam can read exe icons directly, but that doesn't handle alpha correctly, so do it ourselves.
icon_fie = os.path.join(sh.grid_path, f'{igame.steam_appid}_icon.png')
if not os.path.exists(icon_fie) or refresh_artwork:
try:
icon = sh.make_icon(igame)
if icon:
with open(icon_fie, 'wb') as f:
f.write(icon)
except Exception as e:
self.log.warning(f'Getting Steam icon failed with {e!r}')
# todo figure out how to set Proton by default
self.log.info('Done, Steam may now be restarted.')
def steam_unlink(self):
pass
def exit(self): def exit(self):
""" """
Do cleanup, config saving, and exit. Do cleanup, config saving, and exit.

View file

@ -140,6 +140,7 @@ class InstalledGame:
base_urls: List[str] = field(default_factory=list) base_urls: List[str] = field(default_factory=list)
can_run_offline: bool = False can_run_offline: bool = False
egl_guid: str = '' egl_guid: str = ''
steam_appid: int = 0
executable: str = '' executable: str = ''
install_size: int = 0 install_size: int = 0
install_tags: List[str] = field(default_factory=list) install_tags: List[str] = field(default_factory=list)
@ -177,6 +178,7 @@ class InstalledGame:
tmp.platform = json.get('platform', 'Windows') tmp.platform = json.get('platform', 'Windows')
tmp.install_size = json.get('install_size', 0) tmp.install_size = json.get('install_size', 0)
tmp.egl_guid = json.get('egl_guid', '') tmp.egl_guid = json.get('egl_guid', '')
tmp.steam_appid = json.get('steam_appid', 0)
tmp.install_tags = json.get('install_tags', []) tmp.install_tags = json.get('install_tags', [])
return tmp return tmp

176
legendary/utils/pe.py Normal file
View file

@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
"""
Utilities for extracting information from PE (Portable Executable) files.
Adapted from https://github.com/robomotic/pemeta
Original credits:
__author__ = "Paolo Di Prodi"
__copyright__ = "Copyright 2017, LogstTotal Project"
__license__ = "Apache"
__version__ = "2.0"
__maintainer__ = "Paolo Di Prodi"
__email__ = "paolo@logstotal.com"
"""
import io
import logging
import struct
import pefile
from PIL import Image
class PEUtils(object):
GRPICONDIRENTRY_format = ('GRPICONDIRENTRY',
('B,Width', 'B,Height', 'B,ColorCount', 'B,Reserved',
'H,Planes', 'H,BitCount', 'I,BytesInRes', 'H,ID'))
GRPICONDIR_format = ('GRPICONDIR',
('H,Reserved', 'H,Type', 'H,Count'))
RES_ICON = 1
RES_CURSOR = 2
def __init__(self, pe_file):
self.pe = pefile.PE(pe_file, fast_load=True)
self.pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']
])
def _find_resource_base(self, res_type):
try:
rt_base_idx = [entry.id for
entry in self.pe.DIRECTORY_ENTRY_RESOURCE.entries].index(
pefile.RESOURCE_TYPE[res_type]
)
except AttributeError:
rt_base_idx = None
except ValueError:
rt_base_idx = None
if rt_base_idx is not None:
return self.pe.DIRECTORY_ENTRY_RESOURCE.entries[rt_base_idx]
return None
def _find_resource(self, res_type, res_index):
rt_base_dir = self._find_resource_base(res_type)
if res_index < 0:
try:
idx = [entry.id for entry in rt_base_dir.directory.entries].index(-res_index)
except:
return None
else:
idx = res_index if res_index < len(rt_base_dir.directory.entries) else None
if idx is None:
return None
test_res_dir = rt_base_dir.directory.entries[idx]
res_dir = test_res_dir
if test_res_dir.struct.DataIsDirectory:
# another Directory
# probably language take the first one
res_dir = test_res_dir.directory.entries[0]
if res_dir.struct.DataIsDirectory:
# a directory there is no icon here
return None
return res_dir
def _get_group_icons(self):
rt_base_dir = self._find_resource_base('RT_GROUP_ICON')
groups = list()
if not hasattr(rt_base_dir, "directory"):
return groups
for res_index in range(0, len(rt_base_dir.directory.entries)):
grp_icon_dir_entry = self._find_resource('RT_GROUP_ICON', res_index)
if not grp_icon_dir_entry:
continue
data_rva = grp_icon_dir_entry.data.struct.OffsetToData
size = grp_icon_dir_entry.data.struct.Size
data = self.pe.get_memory_mapped_image()[data_rva:data_rva + size]
file_offset = self.pe.get_offset_from_rva(data_rva)
grp_icon_dir = pefile.Structure(self.GRPICONDIR_format, file_offset=file_offset)
grp_icon_dir.__unpack__(data)
if grp_icon_dir.Reserved != 0 or grp_icon_dir.Type != self.RES_ICON:
continue
offset = grp_icon_dir.sizeof()
entries = list()
for idx in range(0, grp_icon_dir.Count):
grp_icon = pefile.Structure(self.GRPICONDIRENTRY_format, file_offset=file_offset + offset)
grp_icon.__unpack__(data[offset:])
offset += grp_icon.sizeof()
entries.append(grp_icon)
groups.append(entries)
return groups
def _get_icon(self, index):
icon_entry = self._find_resource('RT_ICON', -index)
if not icon_entry:
return None
data_rva = icon_entry.data.struct.OffsetToData
size = icon_entry.data.struct.Size
data = self.pe.get_memory_mapped_image()[data_rva:data_rva + size]
return data
def _export_raw(self, entries=None, index=None):
if not entries:
# just get the first group
for entries in self._get_group_icons():
if entries:
break
else:
return None
if index is not None:
entries = entries[index:index + 1]
ico = struct.pack('<HHH', 0, self.RES_ICON, len(entries))
data_offset = None
data = []
info = []
for grp_icon in entries:
if data_offset is None:
data_offset = len(ico) + ((grp_icon.sizeof() + 2) * len(entries))
nfo = grp_icon.__pack__()[:-2] + struct.pack('<L', data_offset)
info.append(nfo)
raw_data = self._get_icon(grp_icon.ID)
if not raw_data:
continue
data.append(raw_data)
data_offset += len(raw_data)
return ico + b"".join(info) + b"".join(data)
def get_icon(self, out_format='PNG', **format_kwargs):
raw = self._export_raw()
if not raw:
return None
img = Image.open(io.BytesIO(raw)).convert('RGBA')
out = io.BytesIO()
img.save(out, format=out_format, **format_kwargs)
return out.getvalue()
def close(self):
self.pe.close()

407
legendary/utils/steam.py Normal file
View file

@ -0,0 +1,407 @@
import logging
import json
import os
import plistlib
from io import BytesIO
from random import randint
from sys import platform as sys_platform, argv as sys_argv
from zlib import crc32
from requests.models import CaseInsensitiveDict
from legendary.models.game import InstalledGame
if sys_platform == 'win32':
from legendary.lfs.windows_helpers import query_registry_value, HKEY_CURRENT_USER
else:
query_registry_value = HKEY_CURRENT_USER = None
try:
import vdf
except ImportError:
vdf = None
_script = '''#!/bin/bash
EPIC_PARAMS=$("{executable}" launch $1 --steam)
ret=$?
if [ $ret -ne 0 ]; then
echo "Legendary failed with $ret"
# hack to open URL in Deck UI browser (doesn't work from within legendary)
if [ -n "${EPIC_PARAMS}" ]; then
python3 -m webbrowser "${EPIC_PARAMS}"
# This is just here so the browser opens before we get thrown back to the deck UI.
read -s -t 5 -p "Waiting 5 seconds..."
fi
exit $ret
fi
eval "$2" $EPIC_PARAMS
'''
class SteamHelper:
def __init__(self, steam_path=None, legendary_binary=None, legendary_config=None):
if not vdf:
raise RuntimeError('Steam support requires the vdf module to be installed.')
if not steam_path:
if sys_platform == 'win32':
search_paths = [os.path.expandvars(f'%programfiles(x86)%\\steam'),
'C:\\Program Files (x86)\\Steam']
elif sys_platform == 'darwin':
search_paths = [os.path.expanduser('~/Library/Application Support/Steam')]
elif sys_platform == 'linux':
search_paths = [os.path.expanduser('~/.steam/Steam'),
os.path.expanduser('~/.steam/steam'),
os.path.expanduser('~/.local/share/Steam')]
else:
raise NotImplementedError('Steam support is not implemented for this platform.')
for _path in search_paths:
if os.path.exists(_path) and os.path.isdir(_path):
if os.path.isdir(os.path.join(_path, 'userdata')):
steam_path = _path
break
if not steam_path:
raise FileNotFoundError('Unable to find Steam installation.')
# Legendary will be used as the executable in the shortcuts to run the games, in order for that to work,
# it must be either run from a PyInstaller binary or the wrapper script that setuptools creates.
if not legendary_binary or not os.path.exists(legendary_binary):
legendary_binary = os.path.join(os.getcwd(), sys_argv[0])
if legendary_binary.endswith('.py'):
raise RuntimeError('Legendary must not be run from a .py file for Steam shortcuts to work.')
if not os.path.exists(legendary_binary):
# on windows, try again with '.exe':
if sys_platform == 'win32':
legendary_binary += '.exe'
if not os.path.exists(legendary_binary):
raise RuntimeError('Could not automatically find a usable legendary binary!')
else:
raise RuntimeError('Could not automatically find a usable legendary binary!')
self.log = logging.getLogger('SteamHelper')
self.steam_path = steam_path
self.lgd_binary = os.path.abspath(legendary_binary)
self.lgd_config_dir = legendary_config
self.launch_script = None
self.user_id = None
self.user_dir = None
self.grid_path = None
self.shortcuts = dict()
self.steam_config = dict()
def is_steam_running(self):
if sys_platform == 'win32':
pid = query_registry_value(HKEY_CURRENT_USER, 'Software\\Valve\\Steam\\ActiveProcess', 'pid')
self.log.debug(f'Steam PID: {pid}')
return pid is not None and pid != 0
elif sys_platform == 'darwin' or sys_platform == 'linux':
if sys_platform == 'linux':
registry_path = os.path.abspath(os.path.join(self.steam_path, '..', 'registry.vdf'))
else:
registry_path = os.path.join(self.steam_path, 'registry.vdf')
registry = vdf.load(open(registry_path), mapper=CaseInsensitiveDict)
pid = int(registry.get('Registry', {}).get('HKLM', {}).get('Software', {})
.get('Valve', {}).get('Steam', {}).get('SteamPID', '0'))
return pid is not None and pid != 0
else:
raise NotImplementedError('Steam support is not implemented for this platform.')
@staticmethod
def _userid_from_steam64(sid):
return sid & 0xFFFFFFFF
def get_user_dir(self, username=None):
if not username:
# attempt to get primary username from registry
if sys_platform == 'win32':
username = query_registry_value(HKEY_CURRENT_USER, 'Software\\Valve\\Steam', 'AutoLoginUser')
elif sys_platform == 'darwin' or sys_platform == 'linux':
if sys_platform == 'linux':
registry_path = os.path.abspath(os.path.join(self.steam_path, '..', 'registry.vdf'))
else:
registry_path = os.path.join(self.steam_path, 'registry.vdf')
registry = vdf.load(open(registry_path), mapper=CaseInsensitiveDict)
username = registry.get('Registry', {}).get('HKCU', {}).get('Software', {})\
.get('Valve', {}).get('Steam', {}).get('AutoLoginUser', '')
else:
raise NotImplementedError('Getting username from Steam registry is not implemented on Linux.')
if not username:
raise ValueError('Unable to find username.')
self.log.info(f'Using Steam username: {username}')
# read config from steam path
login_users = vdf.load(open(os.path.join(self.steam_path, 'config', 'loginusers.vdf')),
mapper=CaseInsensitiveDict)
for _steam_id, user_info in login_users.get('users', {}).items():
if user_info['AccountName'] == username:
steam_id = int(_steam_id)
break
else:
raise ValueError('Unable to find user in Steam configuration.')
self.user_id = self._userid_from_steam64(steam_id)
self.user_dir = os.path.realpath(os.path.join(self.steam_path, 'userdata', str(self.user_id)))
self.grid_path = os.path.join(self.user_dir, 'config', 'grid')
if not os.path.exists(self.grid_path):
os.makedirs(self.grid_path)
return self.user_dir
def read_shortcuts(self):
if not self.user_dir:
raise ValueError('Steam user directory not set.')
# todo figure out case-insensitive dict
shortcuts_file = os.path.join(self.user_dir, 'config', 'shortcuts.vdf')
if os.path.exists(shortcuts_file):
self.shortcuts = vdf.binary_load(open(shortcuts_file, 'rb'), mapper=CaseInsensitiveDict)
else:
self.shortcuts = dict(shortcuts=dict())
return self.shortcuts
def write_shortcuts(self, shortcuts):
if not self.user_dir:
raise ValueError('Steam user directory not set.')
vdf.binary_dump(shortcuts, open(os.path.join(self.user_dir, 'config', 'shortcuts.vdf'), 'wb'))
def read_config(self):
if not self.steam_path:
raise ValueError('Steam directory not set.')
config_file = os.path.join(self.steam_path, 'config', 'config.vdf')
if os.path.exists(config_file):
self.steam_config = vdf.load(open(config_file), mapper=CaseInsensitiveDict)
else:
self.steam_config = dict(shortcuts=dict())
return self.steam_config
def write_config(self, config=None):
if not self.steam_path:
raise ValueError('Steam directory not set.')
if not config:
config = self.steam_config
vdf.dump(config, open(os.path.join(self.steam_path, 'config', 'config.vdf'), 'w'), pretty=True)
def create_shortcut_entry(self, igame: InstalledGame, app_id: int = 0):
if not app_id:
# check against existing and keep generating until unique
existing_ids = set(entry['appid'] + 2**32 for entry in self.shortcuts.get('shortcuts', {}).values())
while not app_id or app_id in existing_ids:
app_id = randint(2 ** 31, 2 ** 32 - 1)
if sys_platform == 'linux':
launch_options = f'{self.launch_script} {igame.app_name} "%command%"'
launch_dir = f'{igame.install_path}'
exe = os.path.join(igame.install_path, igame.executable)
launch_exe = f'\'{exe}\''
else:
launch_options = f'launch {igame.app_name} --steam'
launch_dir = f'"{os.path.dirname(self.lgd_binary)}"'
launch_exe = f'"{self.lgd_binary}"'
entry = {
'AllowDesktopConfig': 1,
'AllowOverlay': 1,
'Devkit': 0,
'DevkitGameID': '',
'DevkitOverrideAppID': 0,
'IsHidden': 0,
'LastPlayTime': 0,
'LaunchOptions': launch_options,
'ShortcutPath': '',
'StartDir': launch_dir,
'appid': app_id - 2 ** 32, # appid is unsigned, but stored as signed, so hack it to be right
'appname': igame.title,
'exe': launch_exe,
'icon': os.path.realpath(os.path.join(self.grid_path, f'{app_id}_icon.png')),
'openvr': 0,
'tags': {'0': 'Installed', '1': 'Unplayed', '2': 'Legendary'},
}
return entry
def set_compat_tool(self, app_id: int, compat_tool: str):
# todo ensure this section exists
self.steam_config['InstallConfigStore']['Software']['Valve']['Steam']['CompatToolMapping'].update({
str(app_id): {
'name': compat_tool,
'config': '',
'Priority': '250'
}
})
@staticmethod
def make_header_image(banner, logo=None):
# Big Picture banner
try:
from PIL import Image
from PIL import ImageOps
except ImportError:
return banner
bfp = BytesIO(banner)
banner_img = Image.open(bfp).convert(mode='RGBA')
banner_fit = ImageOps.fit(banner_img, (460, 215), Image.BILINEAR)
if logo:
lfp = BytesIO(logo)
logo_img = Image.open(lfp).convert(mode='RGBA')
logo_width = round(banner_fit.width * 0.50)
# use sharper algorithm for upscaling
method = Image.NEAREST if logo_img.width < logo_width else Image.BILINEAR
logo_fit = ImageOps.pad(logo_img, (logo_width, banner_fit.height), method)
x_pos = round(banner_fit.width * 0.25)
banner_fit.alpha_composite(logo_fit, (x_pos, 0))
outfp = BytesIO()
banner_fit.convert(mode='RGB').save(outfp, format='JPEG', quality=95)
return outfp.getvalue()
@staticmethod
def make_banner_image(banner, logo=None):
# Steam Deck UI banners just use the game ID and PNG format and have a slightly different size
try:
from PIL import Image
from PIL import ImageOps
except ImportError:
return banner
bfp = BytesIO(banner)
banner_img = Image.open(bfp).convert(mode='RGBA')
banner_fit = ImageOps.fit(banner_img, (616, 353), Image.BILINEAR)
if logo:
lfp = BytesIO(logo)
logo_img = Image.open(lfp).convert(mode='RGBA')
logo_width = round(banner_fit.width * 0.50)
# use sharper algorithm for upscaling
method = Image.NEAREST if logo_img.width < logo_width else Image.BILINEAR
logo_fit = ImageOps.pad(logo_img, (logo_width, banner_fit.height), method)
x_pos = round(banner_fit.width * 0.25)
banner_fit.alpha_composite(logo_fit, (x_pos, 0))
outfp = BytesIO()
banner_fit.convert(mode='RGB').save(outfp, format='PNG')
return outfp.getvalue()
@staticmethod
def make_tall_box(tall, logo):
try:
from PIL import Image
from PIL import ImageOps
except ImportError:
return tall
bfp = BytesIO(tall)
banner_img = Image.open(bfp).convert(mode='RGBA')
banner_fit = ImageOps.fit(banner_img, (600, 900), Image.BILINEAR)
lfp = BytesIO(logo)
logo_img = Image.open(lfp).convert(mode='RGBA')
logo_width = round(banner_fit.width * 0.8)
# use sharper algorithm for upscaling
method = Image.NEAREST if logo_img.width < logo_width else Image.BILINEAR
logo_fit = ImageOps.pad(logo_img, (logo_width, banner_fit.height), method)
x_pos = round(banner_fit.width * 0.1)
banner_fit.alpha_composite(logo_fit, (x_pos, 0))
outfp = BytesIO()
banner_fit.convert(mode='RGB').save(outfp, format='PNG')
return outfp.getvalue()
@staticmethod
def make_icon(igame):
if igame.platform in ('Windows', 'Win32'):
try:
from legendary.utils.pe import PEUtils
except ImportError:
raise RuntimeError('Could not import PEUtils.')
game_exe = os.path.join(igame.install_path, igame.executable)
p = PEUtils(game_exe)
icon = p.get_icon()
p.close()
return icon
elif igame.platform == 'Mac':
try:
from PIL import Image
except ImportError:
raise RuntimeError('Could not import PIL.')
# Install path for app bundles points to ~/Applications (or similar), the easiest way to
# get to the info plist is to go to the executable first, and then check from there.
# If it's not an app bundle (e.g. Unreal Engine) then this will fail.
info_plist = os.path.realpath(os.path.join(igame.install_path, igame.executable, '..', 'Info.plist'))
if not os.path.exists(info_plist):
raise FileNotFoundError(f'Could not find Info.plist for {igame.title}.')
plist = plistlib.load(open(info_plist, 'rb'))
icon_file = plist.get('CFBundleIconFile', None)
if not icon_file:
return None
icon_path = os.path.join(igame.install_path, 'Contents', 'Resources', icon_file)
icon_img = Image.open(icon_path).convert(mode='RGBA')
out = BytesIO()
icon_img.save(out, format='PNG')
return out.getvalue()
else:
raise NotImplementedError(f'Icon generation for {igame.platform} not implemented.')
def get_header_id(self, igame):
# taken from chimera
if sys_platform == 'linux':
exe = os.path.join(igame.install_path, igame.executable)
else:
exe = self.lgd_binary
crc_input = ''.join([f'"{exe}"', igame.title])
high_32 = crc32(crc_input.encode('utf-8')) | 0x80000000
full_64 = (high_32 << 32) | 0x02000000
return full_64
def create_grid_json(self, app_id):
filename = os.path.join(self.grid_path, f'{app_id}.json')
if os.path.exists(filename):
return
# Always just center the logo
grid_json = {
"nVersion": 1,
"logoPosition": {
"pinnedPosition": "CenterCenter", "nWidthPct": 75, "nHeightPct": 75
}
}
json.dump(grid_json, open(filename, 'w'))
def ensure_launch_script(self):
if self.lgd_binary is None or self.lgd_config_dir is None:
raise RuntimeError('No LGD binary or config fir specified.')
self.launch_script = os.path.join(self.lgd_config_dir, 'steam_launch')
if os.path.exists(self.launch_script):
# todo make sure the launch script still points at the right binary
return
with open(self.launch_script, 'w') as fp:
fp.write(_script.replace('{executable}', self.lgd_binary))
st = os.stat(self.launch_script)
os.chmod(self.launch_script, st.st_mode | 0o0100)

View file

@ -41,8 +41,11 @@ setup(
'filelock' 'filelock'
], ],
extras_require=dict( extras_require=dict(
webview=['pywebview>=3.4'], webview=['pywebview>=3.4<4.0'],
webview_gtk=['pywebview>=3.4', 'PyGObject'] webview_gtk=['pywebview>=3.4<4.0', 'PyGObject'],
steam=['vdf', 'Pillow', 'pefile'],
full=['vdf', 'Pillow', 'pefile', 'pywebview>=3.4<4.0'],
full_gtk=['vdf', 'Pillow', 'pefile', 'pywebview>=3.4<4.0', 'PyGObject']
), ),
url='https://github.com/derrod/legendary', url='https://github.com/derrod/legendary',
description='Free and open-source replacement for the Epic Games Launcher application', description='Free and open-source replacement for the Epic Games Launcher application',