diff --git a/legendary/api/egs.py b/legendary/api/egs.py index f88707d..63af89f 100644 --- a/legendary/api/egs.py +++ b/legendary/api/egs.py @@ -1,16 +1,18 @@ # !/usr/bin/env python -# coding: utf-8 +import logging import urllib.parse import requests import requests.adapters -import logging - from requests.auth import HTTPBasicAuth from legendary.models.exceptions import InvalidCredentialsError -from legendary.models.gql import * +from legendary.models.gql import ( + uplay_claim_query, + uplay_codes_query, + uplay_redeem_query, +) class EPCAPI: diff --git a/legendary/api/lgd.py b/legendary/api/lgd.py index ebc235f..8371f28 100644 --- a/legendary/api/lgd.py +++ b/legendary/api/lgd.py @@ -1,10 +1,10 @@ # !/usr/bin/env python -# coding: utf-8 import logging +from platform import system + import requests -from platform import system from legendary import __version__ diff --git a/legendary/cli.py b/legendary/cli.py index 0a5df55..0290167 100644 --- a/legendary/cli.py +++ b/legendary/cli.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -# coding: utf-8 import argparse import csv @@ -10,25 +9,46 @@ import shlex import subprocess import time import webbrowser - from collections import defaultdict, namedtuple from logging.handlers import QueueListener -from multiprocessing import freeze_support, Queue as MPQueue +from multiprocessing import Queue as MPQueue +from multiprocessing import freeze_support from platform import platform -from sys import exit, stdout, platform as sys_platform +from sys import exit, stdout +from sys import platform as sys_platform -from legendary import __version__, __codename__ +from legendary import __codename__, __version__ from legendary.core import LegendaryCore +from legendary.lfs.crossover import ( + mac_find_crossover_apps, + mac_get_bottle_path, + mac_get_crossover_bottles, + mac_get_crossover_version, + mac_is_crossover_running, + mac_is_valid_bottle, +) +from legendary.lfs.eos import ( + add_registry_entries, + query_registry_entries, + remove_registry_entries, +) +from legendary.lfs.utils import clean_filename, validate_files +from legendary.lfs.wine_helpers import ( + case_insensitive_file_search, + get_shell_folders, + read_registry, +) from legendary.models.exceptions import InvalidCredentialsError -from legendary.models.game import SaveGameStatus, VerifyResult, Game -from legendary.utils.cli import get_boolean_choice, get_int_choice, sdl_prompt, strtobool -from legendary.lfs.crossover import * +from legendary.models.game import Game, SaveGameStatus, VerifyResult +from legendary.utils.cli import ( + get_boolean_choice, + get_int_choice, + sdl_prompt, + strtobool, +) from legendary.utils.custom_parser import HiddenAliasSubparsersAction from legendary.utils.env import is_windows_mac_or_pyi -from legendary.lfs.eos import add_registry_entries, query_registry_entries, remove_registry_entries -from legendary.lfs.utils import validate_files, clean_filename from legendary.utils.selective_dl import get_sdl_appname -from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_file_search # todo custom formatter for cli logger (clean info, highlighted error/warning) logging.basicConfig( @@ -146,7 +166,10 @@ class LegendaryCLI: auth_code = '' if not args.auth_code and not args.session_id and not args.ex_token: # only import here since pywebview import is slow - from legendary.utils.webview_login import webview_available, do_webview_login + from legendary.utils.webview_login import ( + do_webview_login, + webview_available, + ) if not webview_available or args.no_webview or self.core.webview_killswitch: # unfortunately the captcha stuff makes a complete CLI login flow kinda impossible right now... @@ -179,9 +202,7 @@ class LegendaryCLI: logger.fatal('No exchange token/authorization code, cannot login.') return - if exchange_token and self.core.auth_ex_token(exchange_token): - logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}"') - elif auth_code and self.core.auth_code(auth_code): + if exchange_token and self.core.auth_ex_token(exchange_token) or auth_code and self.core.auth_code(auth_code): logger.info(f'Successfully logged in as "{self.core.lgd.userdata["displayName"]}"') else: logger.error('Login attempt failed, please see log for details.') @@ -211,7 +232,7 @@ class LegendaryCLI: # sort games and dlc by name games = sorted(games, key=lambda x: x.app_title.lower()) - for citem_id in dlc_list.keys(): + for citem_id in dlc_list: if citem_id in na_dlcs: dlc_list[citem_id].extend(na_dlcs[citem_id]) dlc_list[citem_id] = sorted(dlc_list[citem_id], key=lambda d: d.app_title.lower()) @@ -543,7 +564,7 @@ class LegendaryCLI: continue if not args.yes and not args.force_download: - if not get_boolean_choice(f'Download cloud save?'): + if not get_boolean_choice('Download cloud save?'): logger.info('Not downloading...') continue @@ -564,7 +585,7 @@ class LegendaryCLI: continue if not args.yes and not args.force_upload: - if not get_boolean_choice(f'Upload local save?'): + if not get_boolean_choice('Upload local save?'): logger.info('Not uploading...') continue logger.info('Uploading local savegame...') @@ -727,7 +748,7 @@ class LegendaryCLI: return if not game.is_origin_game: - logger.error(f'The specified game is not an Origin title.') + logger.error('The specified game is not an Origin title.') return # login is not required to launch the game, but linking does require it. @@ -791,8 +812,8 @@ class LegendaryCLI: logger.info(f'Using CrossOver Bottle "{bottle_name}"') if not command: - logger.error(f'In order to launch Origin correctly you must specify a prefix and wine binary or ' - f'wrapper in the configuration file or command line. See the README for details.') + logger.error('In order to launch Origin correctly you must specify a prefix and wine binary or ' + 'wrapper in the configuration file or command line. See the README for details.') return # You cannot launch a URI without start.exe @@ -1087,7 +1108,7 @@ class LegendaryCLI: install_dlcs = not args.skip_dlcs if not args.yes and not args.with_dlcs and not args.skip_dlcs: - if not get_boolean_choice(f'Do you wish to automatically install DLCs?'): + if not get_boolean_choice('Do you wish to automatically install DLCs?'): install_dlcs = False if install_dlcs: @@ -1358,7 +1379,7 @@ class LegendaryCLI: f'(App name: "{main_game_appname}") is not installed!') return else: - logger.fatal(f'Unable to get base game information for DLC, cannot continue.') + logger.fatal('Unable to get base game information for DLC, cannot continue.') return # get everything needed for import from core, then run additional checks. @@ -1410,7 +1431,7 @@ class LegendaryCLI: logger.info(f'Found {len(dlcs)} items of DLC that could be imported.') import_dlc = True if not args.yes and not args.with_dlcs: - if not get_boolean_choice(f'Do you wish to automatically attempt to import all DLCs?'): + if not get_boolean_choice('Do you wish to automatically attempt to import all DLCs?'): import_dlc = False if import_dlc: @@ -1788,9 +1809,7 @@ class LegendaryCLI: if igame := self.core.get_installed_game(app_name): installed_dlc_json.append(dict(app_name=igame.app_name, title=igame.title, install_size=igame.install_size)) - installed_dlc_human.append('App name: {}, Title: "{}", Size: {:.02f} GiB'.format( - igame.app_name, igame.title, igame.install_size / 1024 / 1024 / 1024 - )) + installed_dlc_human.append(f'App name: {igame.app_name}, Title: "{igame.title}", Size: {igame.install_size / 1024 / 1024 / 1024:.02f} GiB') installation_info.append(InfoItem('Installed DLC', 'installed_dlc', installed_dlc_human or None, installed_dlc_json)) @@ -1863,11 +1882,11 @@ class LegendaryCLI: manifest.chunk_data_list.count)) # total file size total_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements) - file_size = '{:.02f} GiB'.format(total_size / 1024 / 1024 / 1024) + file_size = f'{total_size / 1024 / 1024 / 1024:.02f} GiB' manifest_info.append(InfoItem('Disk size (uncompressed)', 'disk_size', file_size, total_size)) # total chunk size total_size = sum(c.file_size for c in manifest.chunk_data_list.elements) - chunk_size = '{:.02f} GiB'.format(total_size / 1024 / 1024 / 1024) + chunk_size = f'{total_size / 1024 / 1024 / 1024:.02f} GiB' manifest_info.append(InfoItem('Download size (compressed)', 'download_size', chunk_size, total_size)) @@ -1885,7 +1904,7 @@ class LegendaryCLI: (tag in fm.install_tags) or (not tag and not fm.install_tags)] tag_file_size = sum(fm.file_size for fm in tag_files) tag_disk_size.append(dict(tag=tag, size=tag_file_size, count=len(tag_files))) - tag_file_size_human = '{:.02f} GiB'.format(tag_file_size / 1024 / 1024 / 1024) + tag_file_size_human = f'{tag_file_size / 1024 / 1024 / 1024:.02f} GiB' tag_disk_size_human.append(f'{human_tag.ljust(longest_tag)} - {tag_file_size_human} ' f'(Files: {len(tag_files)})') # tag_disk_size_human.append(f'Size: {tag_file_size_human}, Files: {len(tag_files)}, Tag: "{tag}"') @@ -1898,7 +1917,7 @@ class LegendaryCLI: tag_chunk_size = sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in tag_chunk_guids) tag_download_size.append(dict(tag=tag, size=tag_chunk_size, count=len(tag_chunk_guids))) - tag_chunk_size_human = '{:.02f} GiB'.format(tag_chunk_size / 1024 / 1024 / 1024) + tag_chunk_size_human = f'{tag_chunk_size / 1024 / 1024 / 1024:.02f} GiB' tag_download_size_human.append(f'{human_tag.ljust(longest_tag)} - {tag_chunk_size_human} ' f'(Chunks: {len(tag_chunk_guids)})') @@ -2299,7 +2318,7 @@ class LegendaryCLI: reg_paths = query_registry_entries(prefix) if old_path := reg_paths["overlay_path"]: if os.path.normpath(old_path) == args.path: - logger.info(f'Overlay already enabled, nothing to do.') + logger.info('Overlay already enabled, nothing to do.') return else: logger.info(f'Updating overlay registry entries from "{old_path}" to "{args.path}"') @@ -2314,7 +2333,7 @@ class LegendaryCLI: remove_registry_entries(prefix) # if the install is not managed by legendary, specify the command including the path if self.core.is_overlay_installed(): - logger.info(f'To re-enable the overlay, run: legendary eos-overlay enable') + logger.info('To re-enable the overlay, run: legendary eos-overlay enable') else: logger.info(f'To re-enable the overlay, run: legendary eos-overlay enable --path "{old_path}"') @@ -2333,7 +2352,7 @@ class LegendaryCLI: if os.name != 'nt' and not prefix: logger.info('Registry entries in prefixes (if any) have not been removed. ' - f'This shouldn\'t cause any issues as the overlay will simply fail to load.') + 'This shouldn\'t cause any issues as the overlay will simply fail to load.') else: logger.info('Removing registry entries...') remove_registry_entries(prefix) @@ -2345,7 +2364,7 @@ class LegendaryCLI: elif args.action in {'install', 'update'}: if args.action == 'update' and not self.core.is_overlay_installed(): - logger.error(f'Overlay not installed, nothing to update.') + logger.error('Overlay not installed, nothing to update.') return logger.info('Preparing to start overlay install...') dlm, ares, igame = self.core.prepare_overlay_install(args.path) @@ -2385,7 +2404,7 @@ class LegendaryCLI: logger.info(f'Updating overlay registry entries from "{old_path}" to "{install_path}"') remove_registry_entries(prefix) else: - logger.info(f'Registry entries already exist. Done.') + logger.info('Registry entries already exist. Done.') return add_registry_entries(install_path, prefix) logger.info('Done.') @@ -2420,7 +2439,7 @@ class LegendaryCLI: if args.crossover_app: cx_version = mac_get_crossover_version(args.crossover_app) if not cx_version: - logger.error(f'No valid CrossOver install specified!') + logger.error('No valid CrossOver install specified!') return logger.info(f'Using CrossOver {cx_version} at {args.crossover_app}') else: @@ -2431,9 +2450,9 @@ class LegendaryCLI: for i, (ver, path) in enumerate(apps, start=1): print(f' {i:2d}. {ver} ({path})') print('') - choice = get_int_choice(f'Select a CrossOver install', 1, 1, len(apps)) + choice = get_int_choice('Select a CrossOver install', 1, 1, len(apps)) if choice is None: - logger.error(f'No valid choice made, aborting.') + logger.error('No valid choice made, aborting.') exit(1) # empty line just to make the output look a little less crammed print('') @@ -2442,8 +2461,8 @@ class LegendaryCLI: cx_version, args.crossover_app = apps[0] logger.info(f'Found CrossOver {cx_version} at {args.crossover_app}') else: - logger.error(f'No CrossOver installs found, see https://legendary.gl/crossover-setup ' - f'for setup instructions') + logger.error('No CrossOver installs found, see https://legendary.gl/crossover-setup ' + 'for setup instructions') return forced_selection = None @@ -2451,7 +2470,7 @@ class LegendaryCLI: if args.crossover_bottle: if args.crossover_bottle not in bottles: - logger.error(f'No valid CrossOver bottle specified!') + logger.error('No valid CrossOver bottle specified!') return logger.info(f'Using Bottle "{args.crossover_bottle}"') forced_selection = args.crossover_bottle @@ -2476,8 +2495,8 @@ class LegendaryCLI: f'(Total: {len(available_bottles)})') if not usable_bottles: - logger.info(f'No usable bottles found, see https://legendary.gl/crossover-setup for ' - f'manual setup instructions.') + logger.info('No usable bottles found, see https://legendary.gl/crossover-setup for ' + 'manual setup instructions.') install_candidate = None else: print('\nFound available bottle(s), please select one:') @@ -2502,11 +2521,11 @@ class LegendaryCLI: print(f' {i:2d}. {bottle["name"]} ({bottle["description"]})') print('') - choice = get_int_choice(f'Select a bottle (CTRL+C to abort)', + choice = get_int_choice('Select a bottle (CTRL+C to abort)', default_choice, 1, len(usable_bottles), return_on_invalid=True) if choice is None: - logger.error(f'No valid choice made, aborting.') + logger.error('No valid choice made, aborting.') return # empty line just to make the output look a little less crammed print('') @@ -2517,7 +2536,7 @@ class LegendaryCLI: logger.info(f'Preparing to download "{bottle_name}" ({install_candidate["description"]})...') if bottle_name in bottles: - logger.warning(f'Bottle with the same name already exists!') + logger.warning('Bottle with the same name already exists!') new_name = input('Please provide a new name for the bottle [CTRL-C or empty to abort]: ') if not new_name: logger.error('No new name provided, aborting.') @@ -2569,9 +2588,9 @@ class LegendaryCLI: print(f' {i:2d}. {bottle}') print('') - choice = get_int_choice(f'Select a bottle', default_choice, 1, len(bottles)) + choice = get_int_choice('Select a bottle', default_choice, 1, len(bottles)) if choice is None: - logger.error(f'No valid choice made, aborting.') + logger.error('No valid choice made, aborting.') exit(1) # empty line just to make the output look a little less crammed print('') @@ -2635,7 +2654,7 @@ class LegendaryCLI: f'"legendary move {app_name} "{args.new_path}" --skip-move"') return else: - logger.info(f'Not moving, just rewriting legendary metadata...') + logger.info('Not moving, just rewriting legendary metadata...') igame.install_path = new_path self.core.install_game(igame) @@ -3143,7 +3162,7 @@ def main(): # show note if update is available if not disable_update_message and cli.core.update_available and cli.core.update_notice_enabled(): if update_info := cli.core.get_update_info(): - print(f'\nLegendary update available!') + print('\nLegendary update available!') print(f'- New version: {update_info["version"]} - "{update_info["name"]}"') print(f'- Release summary:\n{update_info["summary"]}\n- Release URL: {update_info["gh_url"]}') if update_info['critical']: diff --git a/legendary/core.py b/legendary/core.py index d2d8214..92eaeeb 100644 --- a/legendary/core.py +++ b/legendary/core.py @@ -1,46 +1,73 @@ -# coding: utf-8 +import contextlib import json +import logging +import os import shlex import shutil - from base64 import b64decode from collections import defaultdict from concurrent.futures import ThreadPoolExecutor -from datetime import timezone +from datetime import datetime, timezone from hashlib import sha1 from locale import getdefaultlocale from multiprocessing import Queue from platform import system -from requests import session -from requests.exceptions import HTTPError, ConnectionError from sys import platform as sys_platform +from urllib.parse import parse_qsl, urlencode, urlparse from uuid import uuid4 -from urllib.parse import urlencode, parse_qsl, urlparse + +from requests import session +from requests.exceptions import ConnectionError, HTTPError from legendary import __version__ from legendary.api.egs import EPCAPI from legendary.api.lgd import LGDAPI from legendary.downloader.mp.manager import DLManager +from legendary.lfs.crossover import ( + mac_find_crossover_apps, + mac_get_bottle_path, + mac_get_crossover_version, + mac_is_valid_bottle, +) from legendary.lfs.egl import EPCLFS +from legendary.lfs.eos import EOSOverlayApp, query_registry_entries from legendary.lfs.lgndry import LGDLFS -from legendary.lfs.utils import clean_filename, delete_folder, delete_filelist, get_dir_size +from legendary.lfs.utils import ( + clean_filename, + delete_filelist, + delete_folder, + get_dir_size, +) +from legendary.lfs.wine_helpers import ( + case_insensitive_path_search, + get_shell_folders, + read_registry, +) +from legendary.models.chunk import Chunk from legendary.models.downloading import AnalysisResult, ConditionCheckResult from legendary.models.egl import EGLManifest -from legendary.models.exceptions import * -from legendary.models.game import * +from legendary.models.exceptions import InvalidCredentialsError +from legendary.models.game import ( + Game, + GameAsset, + InstalledGame, + LaunchParameters, + SaveGameFile, + SaveGameStatus, + Sidecar, +) from legendary.models.json_manifest import JSONManifest from legendary.models.manifest import Manifest, ManifestMeta -from legendary.models.chunk import Chunk -from legendary.lfs.crossover import * from legendary.utils.egl_crypt import decrypt_epic_data from legendary.utils.env import is_windows_mac_or_pyi -from legendary.lfs.eos import EOSOverlayApp, query_registry_entries -from legendary.utils.game_workarounds import is_opt_enabled, update_workarounds, get_exe_override +from legendary.utils.game_workarounds import ( + get_exe_override, + is_opt_enabled, + update_workarounds, +) from legendary.utils.savegame_helper import SaveGameHelper from legendary.utils.selective_dl import games as sdl_games -from legendary.lfs.wine_helpers import read_registry, get_shell_folders, case_insensitive_path_search - # ToDo: instead of true/false return values for success/failure actually raise an exception that the CLI/GUI # can handle to give the user more details. (Not required yet since there's no GUI so log output is fine) @@ -295,7 +322,7 @@ class LegendaryCore: update_workarounds(game_overrides) if sdl_config := game_overrides.get('sdl_config'): # add placeholder for games to fetch from API that aren't hardcoded - for app_name in sdl_config.keys(): + for app_name in sdl_config: if app_name not in sdl_games: sdl_games[app_name] = None if lgd_config := version_info.get('legendary_config'): @@ -344,7 +371,7 @@ class LegendaryCore: if _aliases_enabled and (force or not self.lgd.aliases): self.lgd.generate_aliases() - def get_assets(self, update_assets=False, platform='Windows') -> List[GameAsset]: + def get_assets(self, update_assets=False, platform='Windows') -> list[GameAsset]: # do not save and always fetch list when platform is overridden if not self.lgd.assets or update_assets or platform not in self.lgd.assets: # if not logged in, return empty list @@ -372,8 +399,8 @@ class LegendaryCore: try: return next(i for i in self.lgd.assets[platform] if i.app_name == app_name) - except StopIteration: - raise ValueError + except StopIteration as e: + raise ValueError from e def asset_valid(self, app_name) -> bool: # EGL sync is only supported for Windows titles so this is fine @@ -395,11 +422,11 @@ class LegendaryCore: self.get_game_list(True, platform=platform) return self.lgd.get_game_meta(app_name) - def get_game_list(self, update_assets=True, platform='Windows') -> List[Game]: + def get_game_list(self, update_assets=True, platform='Windows') -> list[Game]: return self.get_game_and_dlc_list(update_assets=update_assets, platform=platform)[0] def get_game_and_dlc_list(self, update_assets=True, platform='Windows', - force_refresh=False, skip_ue=True) -> (List[Game], Dict[str, List[Game]]): + force_refresh=False, skip_ue=True) -> tuple[list[Game], dict[str, list[Game]]]: _ret = [] _dlc = defaultdict(list) meta_updated = False @@ -433,7 +460,7 @@ class LegendaryCore: game = self.lgd.get_game_meta(app_name) asset_updated = sidecar_updated = False if game: - asset_updated = any(game.app_version(_p) != app_assets[_p].build_version for _p in app_assets.keys()) + asset_updated = any(game.app_version(_p) != app_assets[_p].build_version for _p in app_assets) # assuming sidecar data is the same for all platforms, just check the baseline (Windows) for updates. sidecar_updated = (app_assets['Windows'].sidecar_rev > 0 and (not game.sidecar or game.sidecar.rev != app_assets['Windows'].sidecar_rev)) @@ -467,10 +494,8 @@ class LegendaryCore: sidecar=sidecar) self.lgd.set_game_meta(game.app_name, game) games[app_name] = game - try: + with contextlib.suppress(KeyError): still_needs_update.remove(app_name) - except KeyError: - pass # setup and teardown of thread pool takes some time, so only do it when it makes sense. still_needs_update = {e[0] for e in fetch_list} @@ -525,7 +550,7 @@ class LegendaryCore: self.lgd.delete_game_meta(app_name) def get_non_asset_library_items(self, force_refresh=False, - skip_ue=True) -> (List[Game], Dict[str, List[Game]]): + skip_ue=True) -> tuple[list[Game], dict[str, list[Game]]]: """ Gets a list of Games without assets for installation, for instance Games delivered via third-party stores that do not have assets for installation @@ -581,20 +606,20 @@ class LegendaryCore: def get_installed_platforms(self): return {i.platform for i in self._get_installed_list(False)} - def get_installed_list(self, include_dlc=False) -> List[InstalledGame]: + def get_installed_list(self, include_dlc=False) -> list[InstalledGame]: if self.egl_sync_enabled: self.log.debug('Running EGL sync...') self.egl_sync() return self._get_installed_list(include_dlc) - def _get_installed_list(self, include_dlc=False) -> List[InstalledGame]: + def _get_installed_list(self, include_dlc=False) -> list[InstalledGame]: if include_dlc: return self.lgd.get_installed_list() else: return [g for g in self.lgd.get_installed_list() if not g.is_dlc] - def get_installed_dlc_list(self) -> List[InstalledGame]: + def get_installed_dlc_list(self) -> list[InstalledGame]: return [g for g in self.lgd.get_installed_list() if g.is_dlc] def get_installed_game(self, app_name, skip_sync=False) -> InstalledGame: @@ -831,7 +856,7 @@ class LegendaryCore: return f'link2ea://launchgame/{app_name}?{urlencode(parameters)}' def get_save_games(self, app_name: str = ''): - savegames = self.egs.get_user_cloud_saves(app_name, manifests=not not app_name) + savegames = self.egs.get_user_cloud_saves(app_name, manifests=bool(app_name)) _saves = [] for fname, f in savegames['files'].items(): if '.manifest' not in fname: @@ -966,7 +991,7 @@ class LegendaryCore: return absolute_path - def check_savegame_state(self, path: str, save: SaveGameFile) -> (SaveGameStatus, (datetime, datetime)): + def check_savegame_state(self, path: str, save: SaveGameFile) -> tuple[SaveGameStatus, tuple[datetime | None, datetime | None]]: latest = 0 for _dir, _, _files in os.walk(path): for _file in _files: @@ -1107,8 +1132,8 @@ class LegendaryCore: f'"legendary clean-saves {app_name}" and try again.') return else: - self.log.error(f'No chunks were available, skipping. You can run "legendary clean-saves" ' - f'to remove this broken save from your account.') + self.log.error('No chunks were available, skipping. You can run "legendary clean-saves" ' + 'to remove this broken save from your account.') continue for fm in m.file_manifest_list.elements: @@ -1183,14 +1208,14 @@ class LegendaryCore: deletion_list.append(fname) continue elif 0 < missing_chunks < total_chunks: - self.log.error(f'Some chunk(s) missing, optionally run "legendary download-saves" to obtain a backup ' - f'of the corrupted save, then re-run this command with "--delete-incomplete" to remove ' - f'it from the cloud save service.') + self.log.error('Some chunk(s) missing, optionally run "legendary download-saves" to obtain a backup ' + 'of the corrupted save, then re-run this command with "--delete-incomplete" to remove ' + 'it from the cloud save service.') used_chunks |= chunk_fnames # check for orphaned chunks (not used in any manifests) - for fname, f in files.items(): + for fname in files: if fname in used_chunks or '.manifest' in fname: continue # skip chunks where orphan status could not be reliably determined @@ -1353,7 +1378,7 @@ class LegendaryCore: game.base_urls = _base_urls if not old_bytes: - self.log.error(f'Could not load old manifest, patching will not work!') + self.log.error('Could not load old manifest, patching will not work!') else: old_manifest = self.load_manifest(old_bytes) @@ -1408,7 +1433,7 @@ class LegendaryCore: f'"{new_manifest.meta.build_id}"...') new_manifest.apply_delta_manifest(delta_manifest) else: - self.log.debug(f'No Delta manifest received from CDN.') + self.log.debug('No Delta manifest received from CDN.') # reuse existing installation's directory if igame := self.get_installed_game(base_game.app_name if base_game else game.app_name): @@ -1734,7 +1759,7 @@ class LegendaryCore: installed_game.install_path, filelist, case_insensitive=installed_game.platform.startswith('Win') ): - self.log.warning(f'Deleting some deselected files failed, please check/remove manually.') + self.log.warning('Deleting some deselected files failed, please check/remove manually.') def prereq_installed(self, app_name): igame = self.lgd.get_installed_game(app_name) @@ -1775,7 +1800,7 @@ class LegendaryCore: needs_verify = True if not needs_verify: - self.log.debug(f'No in-progress installation found, assuming complete...') + self.log.debug('No in-progress installation found, assuming complete...') manifest_secrets = dict() if not manifest_data: @@ -1922,8 +1947,8 @@ class LegendaryCore: except ValueError as e: self.log.warning(f'Deleting EGL manifest failed: {e!r}') except FileNotFoundError: - self.log.warning(f'EGL manifest was already deleted, in case you uninstalled the Epic Games Launcher' - f' please disable and unlink EGL Sync.') + self.log.warning('EGL manifest was already deleted, in case you uninstalled the Epic Games Launcher' + ' please disable and unlink EGL Sync.') if delete_files: delete_folder(os.path.join(igame.install_path, '.egstore')) diff --git a/legendary/downloader/mp/manager.py b/legendary/downloader/mp/manager.py index 3ee6ce7..c44fb79 100644 --- a/legendary/downloader/mp/manager.py +++ b/legendary/downloader/mp/manager.py @@ -1,22 +1,31 @@ -# coding: utf-8 # please don't look at this code too hard, it's a mess. import logging import os import time - from collections import Counter, defaultdict, deque from logging.handlers import QueueHandler -from multiprocessing import cpu_count, Process, Queue as MPQueue +from multiprocessing import Process, cpu_count +from multiprocessing import Queue as MPQueue from multiprocessing.shared_memory import SharedMemory from queue import Empty from sys import exit from threading import Condition, Thread from legendary.downloader.mp.workers import DLWorker, FileWorker -from legendary.models.downloading import * -from legendary.models.manifest import ManifestComparison, Manifest +from legendary.models.downloading import ( + AnalysisResult, + ChunkTask, + DownloaderTask, + FileTask, + SharedMemorySegment, + TaskFlags, + TerminateWorkerTask, + UIUpdate, + WriterTask, +) +from legendary.models.manifest import Manifest, ManifestComparison class DLManager(Process): @@ -107,7 +116,7 @@ class DLManager(Process): is_1mib = analysis_res.biggest_chunk == 1024 * 1024 self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})') - self.log.debug(f'Creating manifest comparison...') + self.log.debug('Creating manifest comparison...') mc = ManifestComparison.create(manifest, old_manifest) analysis_res.manifest_comparison = mc @@ -380,11 +389,11 @@ class DLManager(Process): if reused: self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') # open temporary file that will contain download + old file contents - self.tasks.append(FileTask(current_file.filename + u'.tmp', flags=TaskFlags.OPEN_FILE)) + self.tasks.append(FileTask(current_file.filename + '.tmp', flags=TaskFlags.OPEN_FILE)) self.tasks.extend(chunk_tasks) - self.tasks.append(FileTask(current_file.filename + u'.tmp', flags=TaskFlags.CLOSE_FILE)) + self.tasks.append(FileTask(current_file.filename + '.tmp', flags=TaskFlags.CLOSE_FILE)) # delete old file and rename temporary - self.tasks.append(FileTask(current_file.filename, old_file=current_file.filename + u'.tmp', + self.tasks.append(FileTask(current_file.filename, old_file=current_file.filename + '.tmp', flags=TaskFlags.RENAME_FILE | TaskFlags.DELETE_FILE)) else: self.tasks.append(FileTask(current_file.filename, flags=TaskFlags.OPEN_FILE)) @@ -659,7 +668,7 @@ class DLManager(Process): self.dl_result_q = MPQueue(-1) self.writer_result_q = MPQueue(-1) - self.log.info(f'Starting download workers...') + self.log.info('Starting download workers...') bind_ip = None for i in range(self.max_workers): @@ -773,7 +782,7 @@ class DLManager(Process): time.sleep(self.update_interval) - for i in range(self.max_workers): + for _ in range(self.max_workers): self.dl_worker_queue.put_nowait(TerminateWorkerTask()) self.log.info('Waiting for installation to finish...') @@ -781,7 +790,7 @@ class DLManager(Process): writer_p.join(timeout=10.0) if writer_p.exitcode is None: - self.log.warning(f'Terminating writer process, no exit code!') + self.log.warning('Terminating writer process, no exit code!') writer_p.terminate() # forcibly kill DL workers that are not actually dead yet diff --git a/legendary/downloader/mp/workers.py b/legendary/downloader/mp/workers.py index 94381df..1ffb7e4 100644 --- a/legendary/downloader/mp/workers.py +++ b/legendary/downloader/mp/workers.py @@ -1,23 +1,24 @@ -# coding: utf-8 +import logging import os import time -import logging - from logging.handlers import QueueHandler from multiprocessing import Process from multiprocessing.shared_memory import SharedMemory from queue import Empty import requests -from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK +from requests.adapters import DEFAULT_POOLBLOCK, HTTPAdapter from legendary.lfs.wine_helpers import case_insensitive_file_search from legendary.models.chunk import Chunk from legendary.models.downloading import ( - DownloaderTask, DownloaderTaskResult, - WriterTask, WriterTaskResult, - TerminateWorkerTask, TaskFlags + DownloaderTask, + DownloaderTaskResult, + TaskFlags, + TerminateWorkerTask, + WriterTask, + WriterTaskResult, ) @@ -36,8 +37,10 @@ class BindingHTTPAdapter(HTTPAdapter): class DLWorker(Process): def __init__(self, name, queue, out_queue, shm, max_retries=7, - logging_queue=None, dl_timeout=10, bind_addr=None, secrets=dict()): + logging_queue=None, dl_timeout=10, bind_addr=None, secrets=None): super().__init__(name=name) + if secrets is None: + secrets = dict() self.q = queue self.o_q = out_queue self.secrets = secrets @@ -65,7 +68,7 @@ class DLWorker(Process): logger = logging.getLogger(self.name) logger.setLevel(self.log_level) - logger.debug(f'Download worker reporting for duty!') + logger.debug('Download worker reporting for duty!') empty = False while True: diff --git a/legendary/lfs/crossover.py b/legendary/lfs/crossover.py index 04abb93..464b619 100644 --- a/legendary/lfs/crossover.py +++ b/legendary/lfs/crossover.py @@ -1,6 +1,6 @@ import logging -import plistlib import os +import plistlib import subprocess _logger = logging.getLogger('CXHelpers') diff --git a/legendary/lfs/egl.py b/legendary/lfs/egl.py index 5311e8a..d331626 100644 --- a/legendary/lfs/egl.py +++ b/legendary/lfs/egl.py @@ -1,11 +1,8 @@ -# coding: utf-8 import configparser import json import os -from typing import List - from legendary.models.egl import EGLManifest @@ -62,7 +59,7 @@ class EPCLFS: data = json.load(open(os.path.join(self.programdata_path, f), encoding='utf-8')) self.manifests[data['AppName']] = data - def get_manifests(self) -> List[EGLManifest]: + def get_manifests(self) -> list[EGLManifest]: if not self.manifests: self.read_manifests() diff --git a/legendary/lfs/eos.py b/legendary/lfs/eos.py index fb8fc0f..3ecea41 100644 --- a/legendary/lfs/eos.py +++ b/legendary/lfs/eos.py @@ -1,10 +1,19 @@ -import os import logging +import os from legendary.models.game import Game if os.name == 'nt': - from legendary.lfs.windows_helpers import * + from legendary.lfs.windows_helpers import ( + HKEY_CURRENT_USER, + HKEY_LOCAL_MACHINE, + TYPE_DWORD, + TYPE_STRING, + list_registry_values, + query_registry_value, + remove_registry_value, + set_registry_value, + ) logger = logging.getLogger('EOSUtils') # Dummy Game objects to use with Core methods that expect them @@ -98,7 +107,6 @@ def add_registry_entries(overlay_path, prefix=None): overlay_path = f'Z:{overlay_path}' overlay_line = f'"{EOS_OVERLAY_VALUE}"="{overlay_path}"\n' - overlay_idx = None section_idx = None for idx, line in enumerate(reg_lines): diff --git a/legendary/lfs/lgndry.py b/legendary/lfs/lgndry.py index dfab258..916c871 100644 --- a/legendary/lfs/lgndry.py +++ b/legendary/lfs/lgndry.py @@ -1,23 +1,20 @@ -# coding: utf-8 import json -import os import logging - -from contextlib import contextmanager +import os from collections import defaultdict +from contextlib import contextmanager from pathlib import Path from time import time from filelock import FileLock -from .utils import clean_filename, LockedJSONData - -from legendary.models.game import * -from legendary.utils.aliasing import generate_aliases from legendary.models.config import LGDConf +from legendary.models.game import Game, GameAsset, InstalledGame +from legendary.utils.aliasing import generate_aliases from legendary.utils.env import is_windows_mac_or_pyi +from .utils import LockedJSONData, clean_filename FILELOCK_DEBUG = False @@ -229,7 +226,7 @@ class LGDLFS: try: return open(self._get_manifest_filename(app_name, version, platform), 'rb').read() except FileNotFoundError: # all other errors should propagate - self.log.debug(f'Loading manifest failed, retrying without platform in filename...') + self.log.debug('Loading manifest failed, retrying without platform in filename...') try: return open(self._get_manifest_filename(app_name, version), 'rb').read() except FileNotFoundError: # all other errors should propagate @@ -464,7 +461,7 @@ class LGDLFS: collisions = set() alias_map = defaultdict(set) - for app_name in self._game_metadata.keys(): + for app_name in self._game_metadata: game = self.get_game_meta(app_name) if game.is_dlc: continue @@ -478,7 +475,7 @@ class LGDLFS: collisions.add(alias) # remove colliding aliases from map and add aliases to lookup table - for app_name, aliases in alias_map.items(): + for app_name in alias_map: alias_map[app_name] -= collisions for alias in alias_map[app_name]: self.aliases[alias] = app_name diff --git a/legendary/lfs/utils.py b/legendary/lfs/utils.py index 85c2793..7de9e21 100644 --- a/legendary/lfs/utils.py +++ b/legendary/lfs/utils.py @@ -1,15 +1,13 @@ -# coding: utf-8 -import os -import shutil import hashlib import json import logging - +import os +import shutil +from collections.abc import Iterator from pathlib import Path from sys import stdout from time import perf_counter -from typing import List, Iterator from filelock import FileLock @@ -33,7 +31,7 @@ def delete_folder(path: str, recursive=True) -> bool: return True -def delete_filelist(path: str, filenames: List[str], +def delete_filelist(path: str, filenames: list[str], delete_root_directory: bool = False, silent: bool = False, case_insensitive: bool = True) -> bool: dirs = set() @@ -87,7 +85,7 @@ def delete_filelist(path: str, filenames: List[str], return no_error -def validate_files(base_path: str, filelist: List[tuple], hash_type='sha1', +def validate_files(base_path: str, filelist: list[tuple], hash_type='sha1', large_file_threshold=1024 * 1024 * 512, case_insensitive: bool = True) -> Iterator[tuple]: """ Validates the files in filelist in path against the provided hashes diff --git a/legendary/lfs/windows_helpers.py b/legendary/lfs/windows_helpers.py index ec20098..f39ca14 100644 --- a/legendary/lfs/windows_helpers.py +++ b/legendary/lfs/windows_helpers.py @@ -1,6 +1,6 @@ +import ctypes import logging import winreg -import ctypes _logger = logging.getLogger('WindowsHelpers') diff --git a/legendary/models/chunk.py b/legendary/models/chunk.py index 701e0a3..adcd8db 100644 --- a/legendary/models/chunk.py +++ b/legendary/models/chunk.py @@ -1,11 +1,10 @@ -# coding: utf-8 import struct import zlib - from hashlib import sha1 from io import BytesIO from uuid import uuid4 + from Cryptodome.Cipher import AES from legendary.utils.rolling_hash import get_hash @@ -74,7 +73,7 @@ class Chunk: @property def guid_str(self): if not self._guid_str: - self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) + self._guid_str = '-'.join(f'{g:08x}' for g in self.guid) return self._guid_str @property @@ -97,7 +96,9 @@ class Chunk: return cls.read(_sio, secrets) @classmethod - def read(cls, bio, secrets=dict()): + def read(cls, bio, secrets=None): + if secrets is None: + secrets = dict() head_start = bio.tell() if struct.unpack('= 4: _chunk.secret_guid = struct.unpack(''.format( - self.guid_str, self.hash, self.sha_hash.hex(), self.group_num, self.window_size, self.file_size - ) + return f'' @property def guid_str(self): if not self._guid_str: - self._guid_str = '-'.join('{:08x}'.format(g) for g in self.guid) + self._guid_str = '-'.join(f'{g:08x}' for g in self.guid) return self._guid_str @@ -599,12 +594,10 @@ class ChunkInfo: secret_b64 = base64.urlsafe_b64encode(struct.pack(''.format( - guid_readable, self.offset, self.size, self.file_offset) + guid_readable = '-'.join(f'{g:08x}' for g in self.guid) + return f'' class CustomFields: diff --git a/legendary/utils/aliasing.py b/legendary/utils/aliasing.py index dfc596c..27aa475 100644 --- a/legendary/utils/aliasing.py +++ b/legendary/utils/aliasing.py @@ -40,7 +40,7 @@ roman = { def _filter(input): - return ''.join(l for l in input if l in allowed_characters) + return ''.join(char for char in input if char in allowed_characters) def generate_aliases(game_name, game_folder=None, split_words=True, app_name=None): diff --git a/legendary/utils/cli.py b/legendary/utils/cli.py index ff8eb6e..869fdec 100644 --- a/legendary/utils/cli.py +++ b/legendary/utils/cli.py @@ -4,17 +4,11 @@ def get_boolean_choice(prompt, default=True): choice = input(f'{prompt} [{yn}]: ') if not choice: return default - elif choice[0].lower() == 'y': - return True - else: - return False + return choice[0].lower() == 'y' def get_int_choice(prompt, default=None, min_choice=None, max_choice=None, return_on_invalid=False): - if default is not None: - prompt = f'{prompt} [{default}]: ' - else: - prompt = f'{prompt}: ' + prompt = f'{prompt} [{default}]: ' if default is not None else f'{prompt}: ' while True: try: @@ -55,7 +49,7 @@ def sdl_prompt(sdl_data, title): continue print(' *', tag, '-', info['name']) - examples = ', '.join([g for g in sdl_data.keys() if g != '__required'][:2]) + examples = ', '.join([g for g in sdl_data if g != '__required'][:2]) print(f'Please enter tags of pack(s) to install (space/comma-separated, e.g. "{examples}")') print('Leave blank to use defaults (only required data will be downloaded).') choices = input('Additional packs [Enter to confirm]: ') @@ -87,5 +81,5 @@ def strtobool(val): elif val in ('n', 'no', 'f', 'false', 'off', '0', ''): return 0 else: - raise ValueError("invalid truth value %r" % (val,)) + raise ValueError(f"invalid truth value {val!r}") diff --git a/legendary/utils/egl_crypt.py b/legendary/utils/egl_crypt.py index 43d7724..f3f88dd 100644 --- a/legendary/utils/egl_crypt.py +++ b/legendary/utils/egl_crypt.py @@ -78,7 +78,8 @@ def add_round_key(s, k): # learned from http://cs.ucsb.edu/~koc/cs178/projects/JT/aes.c -xtime = lambda a: (((a << 1) ^ 0x1B) & 0xFF) if (a & 0x80) else (a << 1) +def xtime(a): + return (((a << 1) ^ 0x1B) & 0xFF) if (a & 0x80) else (a << 1) def mix_single_column(a): @@ -243,6 +244,6 @@ def decrypt_epic_data(key, encrypted): for encoding in (locale.getpreferredencoding(), 'cp1252', 'cp932', 'ascii', 'utf-8'): try: return decrypted.decode(encoding) - except: # ignore exception, just try the next encoding + except Exception: # ignore exception, just try the next encoding continue raise ValueError('Failed to decode decrypted data') diff --git a/legendary/utils/game_workarounds.py b/legendary/utils/game_workarounds.py index 69aeb76..aa5a628 100644 --- a/legendary/utils/game_workarounds.py +++ b/legendary/utils/game_workarounds.py @@ -1,4 +1,3 @@ -# coding: utf-8 from sys import platform diff --git a/legendary/utils/savegame_helper.py b/legendary/utils/savegame_helper.py index 04c7e69..42bdad1 100644 --- a/legendary/utils/savegame_helper.py +++ b/legendary/utils/savegame_helper.py @@ -1,6 +1,5 @@ import logging import os - from datetime import datetime from fnmatch import fnmatch from hashlib import sha1 @@ -8,8 +7,16 @@ from io import BytesIO from tempfile import TemporaryFile from legendary.models.chunk import Chunk -from legendary.models.manifest import \ - Manifest, ManifestMeta, CDL, FML, CustomFields, FileManifest, ChunkPart, ChunkInfo +from legendary.models.manifest import ( + CDL, + FML, + ChunkInfo, + ChunkPart, + CustomFields, + FileManifest, + Manifest, + ManifestMeta, +) def _filename_matches(filename, patterns): diff --git a/legendary/utils/selective_dl.py b/legendary/utils/selective_dl.py index 6fa37df..a17f04f 100644 --- a/legendary/utils/selective_dl.py +++ b/legendary/utils/selective_dl.py @@ -1,5 +1,4 @@ # This file contains definitions for selective downloading for supported games -# coding: utf-8 _cyberpunk_sdl = { 'de': {'tags': ['voice_de_de'], 'name': 'Deutsch'}, @@ -32,7 +31,7 @@ games = { def get_sdl_appname(app_name): - for k in games.keys(): + for k in games: if k.endswith('_Mac'): continue diff --git a/legendary/utils/webview_login.py b/legendary/utils/webview_login.py index 6168842..f125d1a 100644 --- a/legendary/utils/webview_login.py +++ b/legendary/utils/webview_login.py @@ -1,5 +1,5 @@ -import logging import json +import logging import os import webbrowser @@ -113,7 +113,7 @@ class MockLauncher: try: j = json.loads(sid_json) sid = j['sid'] - logger.debug(f'Got SID (stage 2)! Executing sid login callback...') + logger.debug('Got SID (stage 2)! Executing sid login callback...') exchange_code = self.callback_sid(sid) if exchange_code: self.callback_result = self.callback_code(exchange_code)