diff --git a/legendary/gui/gui.py b/legendary/gui/gui.py index 0324455..94597ed 100755 --- a/legendary/gui/gui.py +++ b/legendary/gui/gui.py @@ -7,6 +7,33 @@ from gi.repository import Gtk import legendary.core core = legendary.core.LegendaryCore() +class args_obj: + base_path = '' + game_folder = '' + shared_memory = '' + max_workers = '' + override_manifest = '' + override_old_manifest = '' + override_delta_manifest = '' + override_base_url = '' + force = '' + disable_patching = '' + no_install = '' + update_only = '' + dlm_debug = '' + platform_override = '' + file_prefix = '' + file_exclude_prefix = '' + install_tag = '' + order_opt = '' + dl_timeout = '' + save_path = '' + repair_mode = '' + repair_and_update = '' + ignore_space = '' + disable_delta = '' + reset_sdl = '' + def log_gtk(msg): dialog = Gtk.Dialog(title="Legendary Log") dialog.log = Gtk.Label(label=msg) @@ -297,17 +324,17 @@ def install_gtk(app_name, app_title, parent): advanced_options.add(update_only_check_button) # --dlm-debug - glm_debug = False - glm_debug_check_button = Gtk.CheckButton(label="Downloader debug messages") - glm_debug_check_button.set_tooltip_text("Set download manager and worker processes' loglevel to debug") - #def glm_debug_button_toggled(button, name): + dlm_debug = False + dlm_debug_check_button = Gtk.CheckButton(label="Downloader debug messages") + dlm_debug_check_button.set_tooltip_text("Set download manager and worker processes' loglevel to debug") + #def dlm_debug_button_toggled(button, name): # if button.get_active(): - # glm_debug = True + # dlm_debug = True # else: - # glm_debug = False - # print(name, "is now", glm_debug) - #glm_debug_check_button.connect("toggled", glm_debug_button_toggled, "glm_debug") - advanced_options.add(glm_debug_check_button) + # dlm_debug = False + # print(name, "is now", dlm_debug) + #dlm_debug_check_button.connect("toggled", dlm_debug_button_toggled, "dlm_debug") + advanced_options.add(dlm_debug_check_button) # --platform # use drop-down menu platform_override_box = Gtk.HBox() @@ -439,9 +466,9 @@ def install_gtk(app_name, app_title, parent): advanced_options.add(ignore_space_req_check_button) # --disable-delta-manifests - override_delta_manifest = False - override_delta_manifest_check_button = Gtk.CheckButton(label="Disable delta manifests") - override_delta_manifest_check_button.set_tooltip_text("Do not use delta manifests when updating (may increase download size)") + disable_delta_manifest = False + disable_delta_manifest_check_button = Gtk.CheckButton(label="Disable delta manifests") + disable_delta_manifest_check_button.set_tooltip_text("Do not use delta manifests when updating (may increase download size)") #def override_delta_manifest_button_toggled(button, name): # if button.get_active(): # override_delta_manifest = True @@ -449,7 +476,7 @@ def install_gtk(app_name, app_title, parent): # override_delta_manifest = False # print(name, "is now", override_delta_manifest) #override_delta_manifest_check_button.connect("toggled", override_delta_manifest_button_toggled, "override_delta_manifest") - advanced_options.add(override_delta_manifest_check_button) + advanced_options.add(disable_delta_manifest_check_button) # --reset-sdl reset_sdl = False @@ -483,63 +510,64 @@ def install_gtk(app_name, app_title, parent): install_dialog.show_all() install_dialog_response = install_dialog.run() + args = args_obj() # entries - base_path = base_path_entry.get_text() - game_folder = game_folder_entry.get_text() - max_shm = max_shm_entry.get_text() - max_workers = max_workers_entry.get_text() - override_manifest = override_manifest_entry.get_text() - override_old_manifest = override_old_manifest_entry.get_text() - override_delta_manifest = override_delta_manifest_entry.get_text() - override_base_url = override_base_url_entry.get_text() - platform_override = platform_override_entry.get_text() - file_prefix_filter = file_prefix_filter_entry.get_text() - file_exclude_filter = file_exclude_filter_entry.get_text() - file_install_tag = file_install_tag_entry.get_text() - dl_timeout = dl_timeout_entry.get_text() - save_path = save_path_entry.get_text() + args.base_path = base_path_entry.get_text() + args.game_folder = game_folder_entry.get_text() + args.shared_memory = max_shm_entry.get_text() + args.max_workers = max_workers_entry.get_text() + args.override_manifest = override_manifest_entry.get_text() + args.override_old_manifest = override_old_manifest_entry.get_text() + args.override_delta_manifest = override_delta_manifest_entry.get_text() + args.override_base_url = override_base_url_entry.get_text() + args.platform_override = platform_override_entry.get_text() + args.file_prefix = file_prefix_filter_entry.get_text() + args.file_exclude_prefix = file_exclude_filter_entry.get_text() + args.install_tag = file_install_tag_entry.get_text() + args.dl_timeout = dl_timeout_entry.get_text() + args.save_path = save_path_entry.get_text() # check boxes - force = force_check_button.get_active() - disable_patching = disable_patching_check_button.get_active() - download_only = download_only_check_button.get_active() - update_only = update_only_check_button.get_active() - glm_debug = glm_debug_check_button.get_active() - enable_reordering = enable_reordering_check_button.get_active() - repair = repair_check_button.get_active() - repair_and_update = repair_and_update_check_button.get_active() - ignore_space_req = ignore_space_req_check_button.get_active() - reset_sdl = reset_sdl_check_button.get_active() + args.force = force_check_button.get_active() + args.disable_patching = disable_patching_check_button.get_active() + args.no_install = download_only_check_button.get_active() + args.update_only = update_only_check_button.get_active() + args.dlm_debug = dlm_debug_check_button.get_active() + args.order_opt = enable_reordering_check_button.get_active() + args.repair_mode = repair_check_button.get_active() + args.repair_and_update = repair_and_update_check_button.get_active() + args.ignore_space = ignore_space_req_check_button.get_active() + args.disable_delta = disable_delta_manifest_check_button.get_active() + args.reset_sdl = reset_sdl_check_button.get_active() install_dialog.destroy() - print( f"base_path:\t\t {base_path}", - f"game_folder:\t\t {game_folder}", - f"max_shm:\t\t {max_shm}", - f"max_workers:\t\t {max_workers}", - f"override_manifest:\t {override_manifest}", - f"override_old_manifest:\t {override_old_manifest}", - f"override_delta_manifest: {override_delta_manifest}", - f"override_base_url:\t {override_base_url}", - f"platform_override:\t {platform_override}", - f"file_prefix_filter:\t {file_prefix_filter}", - f"file_exclude_filter:\t {file_exclude_filter}", - f"file_install_tag:\t {file_install_tag}", - f"dl_timeout:\t\t {dl_timeout}", - f"save_path:\t\t {save_path}", - f"force:\t\t\t {force}", - f"disable_patching:\t {disable_patching}", - f"download_only:\t\t {download_only}", - f"update_only:\t\t {update_only}", - f"glm_debug:\t\t {glm_debug}", - f"enable_reordering:\t {enable_reordering}", - f"repair:\t\t\t {repair}", - f"repair_and_update:\t {repair_and_update}", - f"ignore_space_req:\t {ignore_space_req}", - f"reset_sdl:\t\t {reset_sdl}", + print( f"base_path:\t\t {args.base_path}", + f"game_folder:\t\t {args.game_folder}", + f"max_shm:\t\t {args.shared_memory}", + f"max_workers:\t\t {args.max_workers}", + f"override_manifest:\t {args.override_manifest}", + f"override_old_manifest:\t {args.override_old_manifest}", + f"override_delta_manifest: {args.override_delta_manifest}", + f"override_base_url:\t {args.override_base_url}", + f"platform_override:\t {args.platform_override}", + f"file_prefix_filter:\t {args.file_prefix}", + f"file_exclude_filter:\t {args.file_exclude_prefix}", + f"file_install_tag:\t {args.install_tag}", + f"dl_timeout:\t\t {args.dl_timeout}", + f"save_path:\t\t {args.save_path}", + f"force:\t\t\t {args.force}", + f"disable_patching:\t {args.disable_patching}", + f"download_only:\t\t {args.no_install}", + f"update_only:\t\t {args.update_only}", + f"dlm_debug:\t\t {args.dlm_debug}", + f"enable_reordering:\t {args.order_opt}", + f"repair:\t\t\t {args.repair_mode}", + f"repair_and_update:\t {args.repair_and_update}", + f"ignore_space_req:\t {args.ignore_space}", + f"reset_sdl:\t\t {args.reset_sdl}", sep='\n' ) return 1 - # TODO: if install_dialog_response != Gtk.ResponseType.OK: return 1 @@ -551,74 +579,74 @@ def install_gtk(app_name, app_title, parent): repair_file = None if repair_mode: args.no_install = args.repair_and_update is False - repair_file = os.path.join(self.core.lgd.get_tmp_path(), f'{args.app_name}.repair') + repair_file = os.path.join(core.lgd.get_tmp_path(), f'{app_name}.repair') - if not self.core.login(): - logger.error('Login failed! Cannot continue with download process.') + if not core.login(): + log_gtk('Login failed! Cannot continue with download process.') exit(1) if args.file_prefix or args.file_exclude_prefix or args.install_tag: args.no_install = True if args.update_only: - if not self.core.is_installed(args.app_name): - logger.error(f'Update requested for "{args.app_name}", but app not installed!') + if not core.is_installed(app_name): + log_gtk(f'Update requested for "{app_name}", but app not installed!') exit(1) if args.platform_override: args.no_install = True - game = self.core.get_game(args.app_name, update_meta=True) + game = core.get_game(app_name, update_meta=True) if not game: - logger.error(f'Could not find "{args.app_name}" in list of available games,' + log_gtk(f'Could not find "{app_name}" in list of available games,' f'did you type the name correctly?') exit(1) if game.is_dlc: - logger.info('Install candidate is DLC') + log_gtk('Install candidate is DLC') app_name = game.metadata['mainGameItem']['releaseInfo'][0]['appId'] - base_game = self.core.get_game(app_name) + base_game = core.get_game(app_name) # check if base_game is actually installed - if not self.core.is_installed(app_name): + if not core.is_installed(app_name): # download mode doesn't care about whether or not something's installed if not args.no_install: - logger.fatal(f'Base game "{app_name}" is not installed!') + log_gtk(f'Base game "{app_name}" is not installed!') exit(1) else: base_game = None - if args.repair_mode: - if not self.core.is_installed(game.app_name): - logger.error(f'Game "{game.app_title}" ({game.app_name}) is not installed!') - exit(0) + #if args.repair_mode: + # if not core.is_installed(game.app_name): + # log_gtk(f'Game "{game.app_title}" ({game.app_name}) is not installed!') + # exit(0) - if not os.path.exists(repair_file): - logger.info('Game has not been verified yet.') - if not args.yes: - if not get_boolean_choice(f'Verify "{game.app_name}" now ("no" will abort repair)?'): - print('Aborting...') - exit(0) + # if not os.path.exists(repair_file): + # log_gtk('Game has not been verified yet.') + # if not args.yes: + # if not get_boolean_choice(f'Verify "{game.app_name}" now ("no" will abort repair)?'): + # print('Aborting...') + # exit(0) - self.verify_game(args, print_command=False) - else: - logger.info(f'Using existing repair file: {repair_file}') + # self.verify_game(args, print_command=False) + # else: + # log_gtk(f'Using existing repair file: {repair_file}') # Workaround for Cyberpunk 2077 preload - if not args.install_tag and not game.is_dlc and ((sdl_name := get_sdl_appname(game.app_name)) is not None): - config_tags = self.core.lgd.config.get(game.app_name, 'install_tags', fallback=None) - if not self.core.is_installed(game.app_name) or config_tags is None or args.reset_sdl: - args.install_tag = sdl_prompt(sdl_name, game.app_title) - if game.app_name not in self.core.lgd.config: - self.core.lgd.config[game.app_name] = dict() - self.core.lgd.config.set(game.app_name, 'install_tags', ','.join(args.install_tag)) - else: - args.install_tag = config_tags.split(',') + #if not args.install_tag and not game.is_dlc and ((sdl_name := get_sdl_appname(game.app_name)) is not None): + # config_tags = core.lgd.config.get(game.app_name, 'install_tags', fallback=None) + # if not core.is_installed(game.app_name) or config_tags is None or args.reset_sdl: + # args.install_tag = sdl_prompt(sdl_name, game.app_title) + # if game.app_name not in self.core.lgd.config: + # core.lgd.config[game.app_name] = dict() + # core.lgd.config.set(game.app_name, 'install_tags', ','.join(args.install_tag)) + # else: + # args.install_tag = config_tags.split(',') - logger.info('Preparing download...') + log_gtk('Preparing download...') # todo use status queue to print progress from CLI # This has become a little ridiculous hasn't it? - dlm, analysis, igame = self.core.prepare_download(game=game, base_game=base_game, base_path=args.base_path, + dlm, analysis, igame = core.prepare_download(game=game, base_game=base_game, base_path=args.base_path, force=args.force, max_shm=args.shared_memory, max_workers=args.max_workers, game_folder=args.game_folder, disable_patching=args.disable_patching, @@ -638,57 +666,52 @@ def install_gtk(app_name, app_title, parent): # game is either up to date or hasn't changed, so we have nothing to do if not analysis.dl_size: - old_igame = self.core.get_installed_game(game.app_name) - logger.info('Download size is 0, the game is either already up to date or has not changed. Exiting...') + old_igame = core.get_installed_game(game.app_name) + log_gtk('Download size is 0, the game is either already up to date or has not changed. Exiting...') if old_igame and args.repair_mode and os.path.exists(repair_file): if old_igame.needs_verification: old_igame.needs_verification = False - self.core.install_game(old_igame) + core.install_game(old_igame) - logger.debug('Removing repair file.') + log_gtk('Removing repair file.') os.remove(repair_file) # check if install tags have changed, if they did; try deleting files that are no longer required. if old_igame and old_igame.install_tags != igame.install_tags: old_igame.install_tags = igame.install_tags - self.logger.info('Deleting now untagged files.') - self.core.uninstall_tag(old_igame) - self.core.install_game(old_igame) + self.log_gtk('Deleting now untagged files.') + core.uninstall_tag(old_igame) + core.install_game(old_igame) exit(0) - logger.info(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB') + log_gtk(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB') compression = (1 - (analysis.dl_size / analysis.uncompressed_dl_size)) * 100 - logger.info(f'Download size: {analysis.dl_size / 1024 / 1024:.02f} MiB ' + log_gtk(f'Download size: {analysis.dl_size / 1024 / 1024:.02f} MiB ' f'(Compression savings: {compression:.01f}%)') - logger.info(f'Reusable size: {analysis.reuse_size / 1024 / 1024:.02f} MiB (chunks) / ' + log_gtk(f'Reusable size: {analysis.reuse_size / 1024 / 1024:.02f} MiB (chunks) / ' f'{analysis.unchanged / 1024 / 1024:.02f} MiB (unchanged / skipped)') - res = self.core.check_installation_conditions(analysis=analysis, install=igame, game=game, - updating=self.core.is_installed(args.app_name), + res = core.check_installation_conditions(analysis=analysis, install=igame, game=game, + updating=self.core.is_installed(app_name), ignore_space_req=args.ignore_space) if res.warnings or res.failures: - logger.info('Installation requirements check returned the following results:') + log_gtk('Installation requirements check returned the following results:') if res.warnings: for warn in sorted(res.warnings): - logger.warning(warn) + log_gtk(warn) if res.failures: for msg in sorted(res.failures): - logger.fatal(msg) - logger.error('Installation cannot proceed, exiting.') + log_gtk(msg) + log_gtk('Installation cannot proceed, exiting.') exit(1) - logger.info('Downloads are resumable, you can interrupt the download with ' + log_gtk('Downloads are resumable, you can interrupt the download with ' 'CTRL-C and resume it using the same command later on.') - if not args.yes: - if not get_boolean_choice(f'Do you wish to install "{igame.title}"?'): - print('Aborting...') - exit(0) - start_t = time.time() try: @@ -700,8 +723,8 @@ def install_gtk(app_name, app_title, parent): dlm.join() except Exception as e: end_t = time.time() - logger.info(f'Installation failed after {end_t - start_t:.02f} seconds.') - logger.warning(f'The following exception occurred while waiting for the downloader to finish: {e!r}. ' + log_gtk(f'Installation failed after {end_t - start_t:.02f} seconds.' + f'The following exception occurred while waiting for the downloader to finish: {e!r}. ' f'Try restarting the process, the resume file will be used to start where it failed. ' f'If it continues to fail please open an issue on GitHub.') else: @@ -728,19 +751,18 @@ def install_gtk(app_name, app_title, parent): install_dlcs = False if install_dlcs: - _yes, _app_name = args.yes, args.app_name + _yes, _app_name = args.yes, app_name args.yes = True for dlc in dlcs: - args.app_name = dlc.app_name + app_name = dlc.app_name self.install_game(args) - args.yes, args.app_name = _yes, _app_name + args.yes, app_name = _yes, _app_name if game.supports_cloud_saves and not game.is_dlc: # todo option to automatically download saves after the installation # args does not have the required attributes for sync_saves in here, # not sure how to solve that elegantly. - logger.info('This game supports cloud saves, syncing is handled by the "sync-saves" command.') - logger.info(f'To download saves for this game run "legendary sync-saves {args.app_name}"') + log_gtk(f'This game supports cloud saves, syncing is handled by the "sync-saves" command.To download saves for this game run "legendary sync-saves {app_name}"') old_igame = self.core.get_installed_game(game.app_name) if old_igame and args.repair_mode and os.path.exists(repair_file): @@ -748,17 +770,17 @@ def install_gtk(app_name, app_title, parent): old_igame.needs_verification = False self.core.install_game(old_igame) - logger.debug('Removing repair file.') + log_gtk('Removing repair file.') os.remove(repair_file) # check if install tags have changed, if they did; try deleting files that are no longer required. if old_igame and old_igame.install_tags != igame.install_tags: old_igame.install_tags = igame.install_tags - self.logger.info('Deleting now untagged files.') - self.core.uninstall_tag(old_igame) - self.core.install_game(old_igame) + log_gtk('Deleting now untagged files.') + core.uninstall_tag(old_igame) + core.install_game(old_igame) - logger.info(f'Finished installation process in {end_t - start_t:.02f} seconds.') + log_gtk(f'Finished installation process in {end_t - start_t:.02f} seconds.') class main_window(Gtk.Window): def __init__(self): diff --git a/legendary/gui/manager_gui.py b/legendary/gui/manager_gui.py new file mode 100644 index 0000000..0b84d2a --- /dev/null +++ b/legendary/gui/manager_gui.py @@ -0,0 +1,769 @@ +# coding: utf-8 + +# please don't look at this code too hard, it's a mess. + +import logging +import os +import time + +from collections import Counter, defaultdict, deque +from logging.handlers import QueueHandler +from multiprocessing import cpu_count, Process, Queue as MPQueue +from multiprocessing.shared_memory import SharedMemory +from queue import Empty +from sys import exit +from threading import Condition, Thread + +from legendary.downloader.workers import DLWorker, FileWorker +from legendary.models.downloading import * +from legendary.models.manifest import ManifestComparison, Manifest + + +class DLManager(Process): + def __init__(self, download_dir, base_url, cache_dir=None, status_q=None, + max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None, + max_shared_memory=1024 * 1024 * 1024): + super().__init__(name='DLManager') + self.log = logging.getLogger('DLM') + self.proc_debug = False + + self.base_url = base_url + self.dl_dir = download_dir + self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache') + + # All the queues! + self.logging_queue = None + self.dl_worker_queue = None + self.writer_queue = None + self.dl_result_q = None + self.writer_result_q = None + self.max_workers = max_workers if max_workers else min(cpu_count() * 2, 16) + self.dl_timeout = dl_timeout + + # Analysis stuff + self.analysis = None + self.tasks = deque() + self.chunks_to_dl = deque() + self.chunk_data_list = None + + # shared memory stuff + self.max_shared_memory = max_shared_memory # 1 GiB by default + self.sms = deque() + self.shared_memory = None + + # Interval for log updates and pushing updates to the queue + self.update_interval = update_interval + self.status_queue = status_q # queue used to relay status info back to GUI/CLI + + # Resume file stuff + self.resume_file = resume_file + self.hash_map = dict() + + # cross-thread runtime information + self.running = True + self.active_tasks = 0 + self.children = [] + self.threads = [] + self.conditions = [] + # bytes downloaded and decompressed since last report + self.bytes_downloaded_since_last = 0 + self.bytes_decompressed_since_last = 0 + # bytes written since last report + self.bytes_written_since_last = 0 + # bytes read since last report + self.bytes_read_since_last = 0 + # chunks written since last report + self.num_processed_since_last = 0 + self.num_tasks_processed_since_last = 0 + + def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None, + patch=True, resume=True, file_prefix_filter=None, + file_exclude_filter=None, file_install_tag=None, + processing_optimization=False) -> AnalysisResult: + """ + Run analysis on manifest and old manifest (if not None) and return a result + with a summary resources required in order to install the provided manifest. + + :param manifest: Manifest to install + :param old_manifest: Old manifest to patch from (if applicable) + :param patch: Patch instead of redownloading the entire file + :param resume: Continue based on resume file if it exists + :param file_prefix_filter: Only download files that start with this prefix + :param file_exclude_filter: Exclude files with this prefix from download + :param file_install_tag: Only install files with the specified tag + :param processing_optimization: Attempt to optimize processing order and RAM usage + :return: AnalysisResult + """ + + analysis_res = AnalysisResult() + analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements) + analysis_res.biggest_chunk = max(c.window_size for c in manifest.chunk_data_list.elements) + analysis_res.biggest_file_size = max(f.file_size for f in manifest.file_manifest_list.elements) + is_1mib = analysis_res.biggest_chunk == 1024 * 1024 + self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})') + + self.log.debug(f'Creating manifest comparison...') + mc = ManifestComparison.create(manifest, old_manifest) + analysis_res.manifest_comparison = mc + + if resume and self.resume_file and os.path.exists(self.resume_file): + self.log.info('Found previously interrupted download. Download will be resumed if possible.') + try: + missing = 0 + mismatch = 0 + completed_files = set() + + for line in open(self.resume_file).readlines(): + file_hash, _, filename = line.strip().partition(':') + _p = os.path.join(self.dl_dir, filename) + if not os.path.exists(_p): + self.log.debug(f'File does not exist but is in resume file: "{_p}"') + missing += 1 + elif file_hash != manifest.file_manifest_list.get_file_by_path(filename).sha_hash.hex(): + mismatch += 1 + else: + completed_files.add(filename) + + if missing: + self.log.warning(f'{missing} previously completed file(s) are missing, they will be redownloaded.') + if mismatch: + self.log.warning(f'{mismatch} existing file(s) have been changed and will be redownloaded.') + + # remove completed files from changed/added and move them to unchanged for the analysis. + mc.added -= completed_files + mc.changed -= completed_files + mc.unchanged |= completed_files + self.log.info(f'Skipping {len(completed_files)} files based on resume data.') + except Exception as e: + self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...') + + # Install tags are used for selective downloading, e.g. for language packs + additional_deletion_tasks = [] + if file_install_tag is not None: + if isinstance(file_install_tag, str): + file_install_tag = [file_install_tag] + + files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements + if not any((fit in i.install_tags) or (not fit and not i.install_tags) + for fit in file_install_tag)) + self.log.info(f'Found {len(files_to_skip)} files to skip based on install tag.') + mc.added -= files_to_skip + mc.changed -= files_to_skip + mc.unchanged |= files_to_skip + for fname in sorted(files_to_skip): + additional_deletion_tasks.append(FileTask(fname, delete=True, silent=True)) + + # if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged + if file_exclude_filter: + if isinstance(file_exclude_filter, str): + file_exclude_filter = [file_exclude_filter] + + file_exclude_filter = [f.lower() for f in file_exclude_filter] + files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements if + any(i.filename.lower().startswith(pfx) for pfx in file_exclude_filter)) + self.log.info(f'Found {len(files_to_skip)} files to skip based on exclude prefix.') + mc.added -= files_to_skip + mc.changed -= files_to_skip + mc.unchanged |= files_to_skip + + if file_prefix_filter: + if isinstance(file_prefix_filter, str): + file_prefix_filter = [file_prefix_filter] + + file_prefix_filter = [f.lower() for f in file_prefix_filter] + files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements if not + any(i.filename.lower().startswith(pfx) for pfx in file_prefix_filter)) + self.log.info(f'Found {len(files_to_skip)} files to skip based on include prefix(es)') + mc.added -= files_to_skip + mc.changed -= files_to_skip + mc.unchanged |= files_to_skip + + if file_prefix_filter or file_exclude_filter or file_install_tag: + self.log.info(f'Remaining files after filtering: {len(mc.added) + len(mc.changed)}') + # correct install size after filtering + analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements + if fm.filename in mc.added) + + if mc.removed: + analysis_res.removed = len(mc.removed) + self.log.debug(f'{analysis_res.removed} removed files') + if mc.added: + analysis_res.added = len(mc.added) + self.log.debug(f'{analysis_res.added} added files') + if mc.changed: + analysis_res.changed = len(mc.changed) + self.log.debug(f'{analysis_res.changed} changed files') + if mc.unchanged: + analysis_res.unchanged = len(mc.unchanged) + self.log.debug(f'{analysis_res.unchanged} unchanged files') + + if processing_optimization and len(manifest.file_manifest_list.elements) > 100_000: + self.log.warning('Manifest contains too many files, processing optimizations will be disabled.') + processing_optimization = False + elif processing_optimization: + self.log.info('Processing order optimization is enabled, analysis may take a few seconds longer...') + + # count references to chunks for determining runtime cache size later + references = Counter() + fmlist = sorted(manifest.file_manifest_list.elements, + key=lambda a: a.filename.lower()) + + for fm in fmlist: + self.hash_map[fm.filename] = fm.sha_hash.hex() + + # chunks of unchanged files are not downloaded so we can skip them + if fm.filename in mc.unchanged: + analysis_res.unchanged += fm.file_size + continue + + for cp in fm.chunk_parts: + references[cp.guid_num] += 1 + + if processing_optimization: + s_time = time.time() + # reorder the file manifest list to group files that share many chunks + # 4 is mostly arbitrary but has shown in testing to be a good choice + min_overlap = 4 + # ignore files with less than N chunk parts, this speeds things up dramatically + cp_threshold = 5 + + remaining_files = {fm.filename: {cp.guid_num for cp in fm.chunk_parts} + for fm in fmlist if fm.filename not in mc.unchanged} + _fmlist = [] + + # iterate over all files that will be downloaded and pair up those that share the most chunks + for fm in fmlist: + if fm.filename not in remaining_files: + continue + + _fmlist.append(fm) + f_chunks = remaining_files.pop(fm.filename) + if len(f_chunks) < cp_threshold: + continue + + best_overlap, match = 0, None + for fname, chunks in remaining_files.items(): + if len(chunks) < cp_threshold: + continue + overlap = len(f_chunks & chunks) + if overlap > min_overlap and overlap > best_overlap: + best_overlap, match = overlap, fname + + if match: + _fmlist.append(manifest.file_manifest_list.get_file_by_path(match)) + remaining_files.pop(match) + + fmlist = _fmlist + opt_delta = time.time() - s_time + self.log.debug(f'Processing optimizations took {opt_delta:.01f} seconds.') + + # determine reusable chunks and prepare lookup table for reusable ones + re_usable = defaultdict(dict) + if old_manifest and mc.changed and patch: + self.log.debug('Analyzing manifests for re-usable chunks...') + for changed in mc.changed: + old_file = old_manifest.file_manifest_list.get_file_by_path(changed) + new_file = manifest.file_manifest_list.get_file_by_path(changed) + + existing_chunks = defaultdict(list) + off = 0 + for cp in old_file.chunk_parts: + existing_chunks[cp.guid_num].append((off, cp.offset, cp.offset + cp.size)) + off += cp.size + + for cp in new_file.chunk_parts: + key = (cp.guid_num, cp.offset, cp.size) + for file_o, cp_o, cp_end_o in existing_chunks[cp.guid_num]: + # check if new chunk part is wholly contained in the old chunk part + if cp_o <= cp.offset and (cp.offset + cp.size) <= cp_end_o: + references[cp.guid_num] -= 1 + re_usable[changed][key] = file_o + (cp.offset - cp_o) + analysis_res.reuse_size += cp.size + break + + last_cache_size = current_cache_size = 0 + # set to determine whether a file is currently cached or not + cached = set() + # Using this secondary set is orders of magnitude faster than checking the deque. + chunks_in_dl_list = set() + # This is just used to count all unique guids that have been cached + dl_cache_guids = set() + + # run through the list of files and create the download jobs and also determine minimum + # runtime cache requirement by simulating adding/removing from cache during download. + self.log.debug('Creating filetasks and chunktasks...') + for current_file in fmlist: + # skip unchanged and empty files + if current_file.filename in mc.unchanged: + continue + elif not current_file.chunk_parts: + self.tasks.append(FileTask(current_file.filename, empty=True)) + continue + + existing_chunks = re_usable.get(current_file.filename, None) + chunk_tasks = [] + reused = 0 + + for cp in current_file.chunk_parts: + ct = ChunkTask(cp.guid_num, cp.offset, cp.size) + + # re-use the chunk from the existing file if we can + if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks: + reused += 1 + ct.chunk_file = current_file.filename + ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)] + else: + # add to DL list if not already in it + if cp.guid_num not in chunks_in_dl_list: + self.chunks_to_dl.append(cp.guid_num) + chunks_in_dl_list.add(cp.guid_num) + + # if chunk has more than one use or is already in cache, + # check if we need to add or remove it again. + if references[cp.guid_num] > 1 or cp.guid_num in cached: + references[cp.guid_num] -= 1 + + # delete from cache if no references left + if references[cp.guid_num] < 1: + current_cache_size -= analysis_res.biggest_chunk + cached.remove(cp.guid_num) + ct.cleanup = True + # add to cache if not already cached + elif cp.guid_num not in cached: + dl_cache_guids.add(cp.guid_num) + cached.add(cp.guid_num) + current_cache_size += analysis_res.biggest_chunk + else: + ct.cleanup = True + + chunk_tasks.append(ct) + + if reused: + self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}') + # open temporary file that will contain download + old file contents + self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True)) + self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True)) + # delete old file and rename temporary + self.tasks.append(FileTask(current_file.filename, delete=True, rename=True, + temporary_filename=current_file.filename + u'.tmp')) + else: + self.tasks.append(FileTask(current_file.filename, fopen=True)) + self.tasks.extend(chunk_tasks) + self.tasks.append(FileTask(current_file.filename, close=True)) + + # check if runtime cache size has changed + if current_cache_size > last_cache_size: + self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB') + last_cache_size = current_cache_size + + self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') + analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe + + # Todo implement on-disk caching to avoid this issue. + if analysis_res.min_memory > self.max_shared_memory: + shared_mib = f'{self.max_shared_memory / 1024 / 1024:.01f} MiB' + required_mib = f'{analysis_res.min_memory / 1024 / 1024:.01f} MiB' + suggested_mib = round(self.max_shared_memory / 1024 / 1024 + + (analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32) + + if processing_optimization: + message = f'Try running legendary with "--enable-reordering --max-shared-memory {suggested_mib:.0f}"' + else: + message = 'Try running legendary with "--enable-reordering" to reduce memory usage, ' \ + f'or use "--max-shared-memory {suggested_mib:.0f}" to increase the limit.' + + raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. ' + + message) + + # calculate actual dl and patch write size. + analysis_res.dl_size = \ + sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) + analysis_res.uncompressed_dl_size = \ + sum(c.window_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list) + + # add jobs to remove files + for fname in mc.removed: + self.tasks.append(FileTask(fname, delete=True)) + self.tasks.extend(additional_deletion_tasks) + + analysis_res.num_chunks_cache = len(dl_cache_guids) + self.chunk_data_list = manifest.chunk_data_list + self.analysis = analysis_res + + return analysis_res + + def download_job_manager(self, task_cond: Condition, shm_cond: Condition): + while self.chunks_to_dl and self.running: + while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl: + try: + sms = self.sms.popleft() + no_shm = False + except IndexError: # no free cache + no_shm = True + break + + c_guid = self.chunks_to_dl.popleft() + chunk = self.chunk_data_list.get_chunk_by_guid(c_guid) + self.log.debug(f'Adding {chunk.guid_num} (active: {self.active_tasks})') + try: + self.dl_worker_queue.put(DownloaderTask(url=self.base_url + '/' + chunk.path, + chunk_guid=c_guid, shm=sms), + timeout=1.0) + except Exception as e: + self.log.warning(f'Failed to add to download queue: {e!r}') + self.chunks_to_dl.appendleft(c_guid) + break + + self.active_tasks += 1 + else: + # active tasks limit hit, wait for tasks to finish + with task_cond: + self.log.debug('Waiting for download tasks to complete..') + task_cond.wait(timeout=1.0) + continue + + if no_shm: + # if we break we ran out of shared memory, so wait for that. + with shm_cond: + self.log.debug('Waiting for more shared memory...') + shm_cond.wait(timeout=1.0) + + self.log.debug('Download Job Manager quitting...') + + def dl_results_handler(self, task_cond: Condition): + in_buffer = dict() + + task = self.tasks.popleft() + current_file = '' + + while task and self.running: + if isinstance(task, FileTask): # this wasn't necessarily a good idea... + try: + if task.empty: + self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0) + elif task.rename: + self.writer_queue.put(WriterTask(task.filename, rename=True, + delete=task.delete, + old_filename=task.temporary_filename), + timeout=1.0) + elif task.delete: + self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0) + elif task.open: + self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0) + current_file = task.filename + elif task.close: + self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0) + except Exception as e: + self.tasks.appendleft(task) + self.log.warning(f'Adding to queue failed: {e!r}') + continue + + try: + task = self.tasks.popleft() + except IndexError: # finished + break + continue + + while (task.chunk_guid in in_buffer) or task.chunk_file: + res_shm = None + if not task.chunk_file: # not re-using from an old file + res_shm = in_buffer[task.chunk_guid].shm + + try: + self.log.debug(f'Adding {task.chunk_guid} to writer queue') + self.writer_queue.put(WriterTask( + filename=current_file, shared_memory=res_shm, + chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, + chunk_guid=task.chunk_guid, release_memory=task.cleanup, + old_file=task.chunk_file # todo on-disk cache + ), timeout=1.0) + except Exception as e: + self.log.warning(f'Adding to queue failed: {e!r}') + break + + if task.cleanup and not task.chunk_file: + del in_buffer[task.chunk_guid] + + try: + task = self.tasks.popleft() + if isinstance(task, FileTask): + break + except IndexError: # finished + task = None + break + else: # only enter blocking code if the loop did not break + try: + res = self.dl_result_q.get(timeout=1) + self.active_tasks -= 1 + with task_cond: + task_cond.notify() + + if res.success: + self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...') + in_buffer[res.guid] = res + self.bytes_downloaded_since_last += res.compressed_size + self.bytes_decompressed_since_last += res.size + else: + self.log.error(f'Download for {res.guid} failed, retrying...') + try: + self.dl_worker_queue.put(DownloaderTask( + url=res.url, chunk_guid=res.guid, shm=res.shm + ), timeout=1.0) + self.active_tasks += 1 + except Exception as e: + self.log.warning(f'Failed adding retry task to queue! {e!r}') + # If this failed for whatever reason, put the chunk at the front of the DL list + self.chunks_to_dl.appendleft(res.chunk_guid) + except Empty: + pass + except Exception as e: + self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}') + + self.log.debug('Download result handler quitting...') + + def fw_results_handler(self, shm_cond: Condition): + while self.running: + try: + res = self.writer_result_q.get(timeout=1.0) + self.num_tasks_processed_since_last += 1 + + if res.closed and self.resume_file and res.success: + if res.filename.endswith('.tmp'): + res.filename = res.filename[:-4] + + file_hash = self.hash_map[res.filename] + # write last completed file to super simple resume file + with open(self.resume_file, 'ab') as rf: + rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8')) + + if res.kill: + self.log.debug('Got termination command in FW result handler') + break + + if not res.success: + # todo make this kill the installation process or at least skip the file and mark it as failed + self.log.fatal(f'Writing for {res.filename} failed!') + if res.release_memory: + self.sms.appendleft(res.shm) + with shm_cond: + shm_cond.notify() + + if res.chunk_guid: + self.bytes_written_since_last += res.size + # if there's no shared memory we must have read from disk. + if not res.shm: + self.bytes_read_since_last += res.size + self.num_processed_since_last += 1 + + except Empty: + continue + except Exception as e: + self.log.warning(f'Exception when trying to read writer result queue: {e!r}') + self.log.debug('Writer result handler quitting...') + + def run(self): + if not self.analysis: + raise ValueError('Did not run analysis before trying to run download!') + + # Subprocess will use its own root logger that logs to a Queue instead + _root = logging.getLogger() + _root.setLevel(logging.DEBUG if self.proc_debug else logging.INFO) + if self.logging_queue: + _root.handlers = [] + _root.addHandler(QueueHandler(self.logging_queue)) + + self.log = logging.getLogger('DLManager') + self.log.info(f'Download Manager running with process-id: {os.getpid()}') + + try: + self.run_real() + except KeyboardInterrupt: + self.log.warning('Immediate exit requested!') + self.running = False + + # send conditions to unlock threads if they aren't already + for cond in self.conditions: + with cond: + cond.notify() + + # make sure threads are dead. + for t in self.threads: + t.join(timeout=5.0) + if t.is_alive(): + self.log.warning(f'Thread did not terminate! {repr(t)}') + + # clean up all the queues, otherwise this process won't terminate properly + for name, q in zip(('Download jobs', 'Writer jobs', 'Download results', 'Writer results'), + (self.dl_worker_queue, self.writer_queue, self.dl_result_q, self.writer_result_q)): + self.log.debug(f'Cleaning up queue "{name}"') + try: + while True: + _ = q.get_nowait() + except Empty: + q.close() + q.join_thread() + + def run_real(self): + self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory) + self.log.debug(f'Created shared memory of size: {self.shared_memory.size / 1024 / 1024:.02f} MiB') + + # create the shared memory segments and add them to their respective pools + for i in range(int(self.shared_memory.size / self.analysis.biggest_chunk)): + _sms = SharedMemorySegment(offset=i * self.analysis.biggest_chunk, + end=i * self.analysis.biggest_chunk + self.analysis.biggest_chunk) + self.sms.append(_sms) + + self.log.debug(f'Created {len(self.sms)} shared memory segments.') + + # Create queues + self.dl_worker_queue = MPQueue(-1) + self.writer_queue = MPQueue(-1) + self.dl_result_q = MPQueue(-1) + self.writer_result_q = MPQueue(-1) + + self.log.info(f'Starting download workers...') + for i in range(self.max_workers): + w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q, + self.shared_memory.name, logging_queue=self.logging_queue, + dl_timeout=self.dl_timeout) + self.children.append(w) + w.start() + + self.log.info('Starting file writing worker...') + writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir, + self.shared_memory.name, self.cache_dir, self.logging_queue) + self.children.append(writer_p) + writer_p.start() + + num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks) + num_dl_tasks = len(self.chunks_to_dl) + num_tasks = len(self.tasks) + num_shared_memory_segments = len(self.sms) + self.log.debug(f'Chunks to download: {num_dl_tasks}, File tasks: {num_tasks}, Chunk tasks: {num_chunk_tasks}') + + # active downloader tasks + self.active_tasks = 0 + processed_chunks = 0 + processed_tasks = 0 + total_dl = 0 + total_write = 0 + + # synchronization conditions + shm_cond = Condition() + task_cond = Condition() + self.conditions = [shm_cond, task_cond] + + # start threads + s_time = time.time() + self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond))) + self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,))) + self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,))) + + for t in self.threads: + t.start() + + last_update = time.time() + + while processed_tasks < num_tasks: + delta = time.time() - last_update + if not delta: + time.sleep(self.update_interval) + continue + + # update all the things + processed_chunks += self.num_processed_since_last + processed_tasks += self.num_tasks_processed_since_last + + total_dl += self.bytes_downloaded_since_last + total_write += self.bytes_written_since_last + + dl_speed = self.bytes_downloaded_since_last / delta + dl_unc_speed = self.bytes_decompressed_since_last / delta + w_speed = self.bytes_written_since_last / delta + r_speed = self.bytes_read_since_last / delta + # c_speed = self.num_processed_since_last / delta + + # set temporary counters to 0 + self.bytes_read_since_last = self.bytes_written_since_last = 0 + self.bytes_downloaded_since_last = self.num_processed_since_last = 0 + self.bytes_decompressed_since_last = self.num_tasks_processed_since_last = 0 + last_update = time.time() + + perc = (processed_chunks / num_chunk_tasks) * 100 + runtime = time.time() - s_time + total_avail = len(self.sms) + total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024) + + if runtime and processed_chunks: + rt_hours, runtime = int(runtime // 3600), runtime % 3600 + rt_minutes, rt_seconds = int(runtime // 60), int(runtime % 60) + + average_speed = processed_chunks / runtime + estimate = (num_chunk_tasks - processed_chunks) / average_speed + hours, estimate = int(estimate // 3600), estimate % 3600 + minutes, seconds = int(estimate // 60), int(estimate % 60) + else: + hours = minutes = seconds = 0 + rt_hours = rt_minutes = rt_seconds = 0 + + self.log.info(f'= Progress: {perc:.02f}% ({processed_chunks}/{num_chunk_tasks}), ' + f'Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, ' + f'ETA: {hours:02d}:{minutes:02d}:{seconds:02d}') + self.log.info(f' - Downloaded: {total_dl / 1024 / 1024:.02f} MiB, ' + f'Written: {total_write / 1024 / 1024:.02f} MiB') + self.log.info(f' - Cache usage: {total_used} MiB, active tasks: {self.active_tasks}') + self.log.info(f' + Download\t- {dl_speed / 1024 / 1024:.02f} MiB/s (raw) ' + f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)') + self.log.info(f' + Disk\t- {w_speed / 1024 / 1024:.02f} MiB/s (write) / ' + f'{r_speed / 1024 / 1024:.02f} MiB/s (read)') + + # send status update to back to instantiator (if queue exists) + if self.status_queue: + try: + self.status_queue.put(UIUpdate( + progress=perc, download_speed=dl_unc_speed, write_speed=w_speed, read_speed=r_speed, + memory_usage=total_used * 1024 * 1024 + ), timeout=1.0) + except Exception as e: + self.log.warning(f'Failed to send status update to queue: {e!r}') + + time.sleep(self.update_interval) + + for i in range(self.max_workers): + self.dl_worker_queue.put_nowait(DownloaderTask(kill=True)) + + self.log.info('Waiting for installation to finish...') + self.writer_queue.put_nowait(WriterTask('', kill=True)) + + writer_p.join(timeout=10.0) + if writer_p.exitcode is None: + self.log.warning(f'Terminating writer process, no exit code!') + writer_p.terminate() + + # forcibly kill DL workers that are not actually dead yet + for child in self.children: + if child.exitcode is None: + child.terminate() + + # make sure all the threads are dead. + for t in self.threads: + t.join(timeout=5.0) + if t.is_alive(): + self.log.warning(f'Thread did not terminate! {repr(t)}') + + # clean up resume file + if self.resume_file: + try: + os.remove(self.resume_file) + except OSError as e: + self.log.warning(f'Failed to remove resume file: {e!r}') + + # close up shared memory + self.shared_memory.close() + self.shared_memory.unlink() + self.shared_memory = None + + self.log.info('All done! Download manager quitting...') + # finally, exit the process. + exit(0)