added args_obj class - 2021-02-04_15-02-42

This commit is contained in:
koraynilay 2021-02-04 15:02:42 +01:00
parent 243ea1f429
commit c04fc01db1
2 changed files with 922 additions and 131 deletions

View file

@ -7,6 +7,33 @@ from gi.repository import Gtk
import legendary.core
core = legendary.core.LegendaryCore()
class args_obj:
base_path = ''
game_folder = ''
shared_memory = ''
max_workers = ''
override_manifest = ''
override_old_manifest = ''
override_delta_manifest = ''
override_base_url = ''
force = ''
disable_patching = ''
no_install = ''
update_only = ''
dlm_debug = ''
platform_override = ''
file_prefix = ''
file_exclude_prefix = ''
install_tag = ''
order_opt = ''
dl_timeout = ''
save_path = ''
repair_mode = ''
repair_and_update = ''
ignore_space = ''
disable_delta = ''
reset_sdl = ''
def log_gtk(msg):
dialog = Gtk.Dialog(title="Legendary Log")
dialog.log = Gtk.Label(label=msg)
@ -297,17 +324,17 @@ def install_gtk(app_name, app_title, parent):
advanced_options.add(update_only_check_button)
# --dlm-debug
glm_debug = False
glm_debug_check_button = Gtk.CheckButton(label="Downloader debug messages")
glm_debug_check_button.set_tooltip_text("Set download manager and worker processes' loglevel to debug")
#def glm_debug_button_toggled(button, name):
dlm_debug = False
dlm_debug_check_button = Gtk.CheckButton(label="Downloader debug messages")
dlm_debug_check_button.set_tooltip_text("Set download manager and worker processes' loglevel to debug")
#def dlm_debug_button_toggled(button, name):
# if button.get_active():
# glm_debug = True
# dlm_debug = True
# else:
# glm_debug = False
# print(name, "is now", glm_debug)
#glm_debug_check_button.connect("toggled", glm_debug_button_toggled, "glm_debug")
advanced_options.add(glm_debug_check_button)
# dlm_debug = False
# print(name, "is now", dlm_debug)
#dlm_debug_check_button.connect("toggled", dlm_debug_button_toggled, "dlm_debug")
advanced_options.add(dlm_debug_check_button)
# --platform <Platform> # use drop-down menu
platform_override_box = Gtk.HBox()
@ -439,9 +466,9 @@ def install_gtk(app_name, app_title, parent):
advanced_options.add(ignore_space_req_check_button)
# --disable-delta-manifests
override_delta_manifest = False
override_delta_manifest_check_button = Gtk.CheckButton(label="Disable delta manifests")
override_delta_manifest_check_button.set_tooltip_text("Do not use delta manifests when updating (may increase download size)")
disable_delta_manifest = False
disable_delta_manifest_check_button = Gtk.CheckButton(label="Disable delta manifests")
disable_delta_manifest_check_button.set_tooltip_text("Do not use delta manifests when updating (may increase download size)")
#def override_delta_manifest_button_toggled(button, name):
# if button.get_active():
# override_delta_manifest = True
@ -449,7 +476,7 @@ def install_gtk(app_name, app_title, parent):
# override_delta_manifest = False
# print(name, "is now", override_delta_manifest)
#override_delta_manifest_check_button.connect("toggled", override_delta_manifest_button_toggled, "override_delta_manifest")
advanced_options.add(override_delta_manifest_check_button)
advanced_options.add(disable_delta_manifest_check_button)
# --reset-sdl
reset_sdl = False
@ -483,63 +510,64 @@ def install_gtk(app_name, app_title, parent):
install_dialog.show_all()
install_dialog_response = install_dialog.run()
args = args_obj()
# entries
base_path = base_path_entry.get_text()
game_folder = game_folder_entry.get_text()
max_shm = max_shm_entry.get_text()
max_workers = max_workers_entry.get_text()
override_manifest = override_manifest_entry.get_text()
override_old_manifest = override_old_manifest_entry.get_text()
override_delta_manifest = override_delta_manifest_entry.get_text()
override_base_url = override_base_url_entry.get_text()
platform_override = platform_override_entry.get_text()
file_prefix_filter = file_prefix_filter_entry.get_text()
file_exclude_filter = file_exclude_filter_entry.get_text()
file_install_tag = file_install_tag_entry.get_text()
dl_timeout = dl_timeout_entry.get_text()
save_path = save_path_entry.get_text()
args.base_path = base_path_entry.get_text()
args.game_folder = game_folder_entry.get_text()
args.shared_memory = max_shm_entry.get_text()
args.max_workers = max_workers_entry.get_text()
args.override_manifest = override_manifest_entry.get_text()
args.override_old_manifest = override_old_manifest_entry.get_text()
args.override_delta_manifest = override_delta_manifest_entry.get_text()
args.override_base_url = override_base_url_entry.get_text()
args.platform_override = platform_override_entry.get_text()
args.file_prefix = file_prefix_filter_entry.get_text()
args.file_exclude_prefix = file_exclude_filter_entry.get_text()
args.install_tag = file_install_tag_entry.get_text()
args.dl_timeout = dl_timeout_entry.get_text()
args.save_path = save_path_entry.get_text()
# check boxes
force = force_check_button.get_active()
disable_patching = disable_patching_check_button.get_active()
download_only = download_only_check_button.get_active()
update_only = update_only_check_button.get_active()
glm_debug = glm_debug_check_button.get_active()
enable_reordering = enable_reordering_check_button.get_active()
repair = repair_check_button.get_active()
repair_and_update = repair_and_update_check_button.get_active()
ignore_space_req = ignore_space_req_check_button.get_active()
reset_sdl = reset_sdl_check_button.get_active()
args.force = force_check_button.get_active()
args.disable_patching = disable_patching_check_button.get_active()
args.no_install = download_only_check_button.get_active()
args.update_only = update_only_check_button.get_active()
args.dlm_debug = dlm_debug_check_button.get_active()
args.order_opt = enable_reordering_check_button.get_active()
args.repair_mode = repair_check_button.get_active()
args.repair_and_update = repair_and_update_check_button.get_active()
args.ignore_space = ignore_space_req_check_button.get_active()
args.disable_delta = disable_delta_manifest_check_button.get_active()
args.reset_sdl = reset_sdl_check_button.get_active()
install_dialog.destroy()
print( f"base_path:\t\t {base_path}",
f"game_folder:\t\t {game_folder}",
f"max_shm:\t\t {max_shm}",
f"max_workers:\t\t {max_workers}",
f"override_manifest:\t {override_manifest}",
f"override_old_manifest:\t {override_old_manifest}",
f"override_delta_manifest: {override_delta_manifest}",
f"override_base_url:\t {override_base_url}",
f"platform_override:\t {platform_override}",
f"file_prefix_filter:\t {file_prefix_filter}",
f"file_exclude_filter:\t {file_exclude_filter}",
f"file_install_tag:\t {file_install_tag}",
f"dl_timeout:\t\t {dl_timeout}",
f"save_path:\t\t {save_path}",
f"force:\t\t\t {force}",
f"disable_patching:\t {disable_patching}",
f"download_only:\t\t {download_only}",
f"update_only:\t\t {update_only}",
f"glm_debug:\t\t {glm_debug}",
f"enable_reordering:\t {enable_reordering}",
f"repair:\t\t\t {repair}",
f"repair_and_update:\t {repair_and_update}",
f"ignore_space_req:\t {ignore_space_req}",
f"reset_sdl:\t\t {reset_sdl}",
print( f"base_path:\t\t {args.base_path}",
f"game_folder:\t\t {args.game_folder}",
f"max_shm:\t\t {args.shared_memory}",
f"max_workers:\t\t {args.max_workers}",
f"override_manifest:\t {args.override_manifest}",
f"override_old_manifest:\t {args.override_old_manifest}",
f"override_delta_manifest: {args.override_delta_manifest}",
f"override_base_url:\t {args.override_base_url}",
f"platform_override:\t {args.platform_override}",
f"file_prefix_filter:\t {args.file_prefix}",
f"file_exclude_filter:\t {args.file_exclude_prefix}",
f"file_install_tag:\t {args.install_tag}",
f"dl_timeout:\t\t {args.dl_timeout}",
f"save_path:\t\t {args.save_path}",
f"force:\t\t\t {args.force}",
f"disable_patching:\t {args.disable_patching}",
f"download_only:\t\t {args.no_install}",
f"update_only:\t\t {args.update_only}",
f"dlm_debug:\t\t {args.dlm_debug}",
f"enable_reordering:\t {args.order_opt}",
f"repair:\t\t\t {args.repair_mode}",
f"repair_and_update:\t {args.repair_and_update}",
f"ignore_space_req:\t {args.ignore_space}",
f"reset_sdl:\t\t {args.reset_sdl}",
sep='\n'
)
return 1
# TODO:
if install_dialog_response != Gtk.ResponseType.OK:
return 1
@ -551,74 +579,74 @@ def install_gtk(app_name, app_title, parent):
repair_file = None
if repair_mode:
args.no_install = args.repair_and_update is False
repair_file = os.path.join(self.core.lgd.get_tmp_path(), f'{args.app_name}.repair')
repair_file = os.path.join(core.lgd.get_tmp_path(), f'{app_name}.repair')
if not self.core.login():
logger.error('Login failed! Cannot continue with download process.')
if not core.login():
log_gtk('Login failed! Cannot continue with download process.')
exit(1)
if args.file_prefix or args.file_exclude_prefix or args.install_tag:
args.no_install = True
if args.update_only:
if not self.core.is_installed(args.app_name):
logger.error(f'Update requested for "{args.app_name}", but app not installed!')
if not core.is_installed(app_name):
log_gtk(f'Update requested for "{app_name}", but app not installed!')
exit(1)
if args.platform_override:
args.no_install = True
game = self.core.get_game(args.app_name, update_meta=True)
game = core.get_game(app_name, update_meta=True)
if not game:
logger.error(f'Could not find "{args.app_name}" in list of available games,'
log_gtk(f'Could not find "{app_name}" in list of available games,'
f'did you type the name correctly?')
exit(1)
if game.is_dlc:
logger.info('Install candidate is DLC')
log_gtk('Install candidate is DLC')
app_name = game.metadata['mainGameItem']['releaseInfo'][0]['appId']
base_game = self.core.get_game(app_name)
base_game = core.get_game(app_name)
# check if base_game is actually installed
if not self.core.is_installed(app_name):
if not core.is_installed(app_name):
# download mode doesn't care about whether or not something's installed
if not args.no_install:
logger.fatal(f'Base game "{app_name}" is not installed!')
log_gtk(f'Base game "{app_name}" is not installed!')
exit(1)
else:
base_game = None
if args.repair_mode:
if not self.core.is_installed(game.app_name):
logger.error(f'Game "{game.app_title}" ({game.app_name}) is not installed!')
exit(0)
#if args.repair_mode:
# if not core.is_installed(game.app_name):
# log_gtk(f'Game "{game.app_title}" ({game.app_name}) is not installed!')
# exit(0)
if not os.path.exists(repair_file):
logger.info('Game has not been verified yet.')
if not args.yes:
if not get_boolean_choice(f'Verify "{game.app_name}" now ("no" will abort repair)?'):
print('Aborting...')
exit(0)
# if not os.path.exists(repair_file):
# log_gtk('Game has not been verified yet.')
# if not args.yes:
# if not get_boolean_choice(f'Verify "{game.app_name}" now ("no" will abort repair)?'):
# print('Aborting...')
# exit(0)
self.verify_game(args, print_command=False)
else:
logger.info(f'Using existing repair file: {repair_file}')
# self.verify_game(args, print_command=False)
# else:
# log_gtk(f'Using existing repair file: {repair_file}')
# Workaround for Cyberpunk 2077 preload
if not args.install_tag and not game.is_dlc and ((sdl_name := get_sdl_appname(game.app_name)) is not None):
config_tags = self.core.lgd.config.get(game.app_name, 'install_tags', fallback=None)
if not self.core.is_installed(game.app_name) or config_tags is None or args.reset_sdl:
args.install_tag = sdl_prompt(sdl_name, game.app_title)
if game.app_name not in self.core.lgd.config:
self.core.lgd.config[game.app_name] = dict()
self.core.lgd.config.set(game.app_name, 'install_tags', ','.join(args.install_tag))
else:
args.install_tag = config_tags.split(',')
#if not args.install_tag and not game.is_dlc and ((sdl_name := get_sdl_appname(game.app_name)) is not None):
# config_tags = core.lgd.config.get(game.app_name, 'install_tags', fallback=None)
# if not core.is_installed(game.app_name) or config_tags is None or args.reset_sdl:
# args.install_tag = sdl_prompt(sdl_name, game.app_title)
# if game.app_name not in self.core.lgd.config:
# core.lgd.config[game.app_name] = dict()
# core.lgd.config.set(game.app_name, 'install_tags', ','.join(args.install_tag))
# else:
# args.install_tag = config_tags.split(',')
logger.info('Preparing download...')
log_gtk('Preparing download...')
# todo use status queue to print progress from CLI
# This has become a little ridiculous hasn't it?
dlm, analysis, igame = self.core.prepare_download(game=game, base_game=base_game, base_path=args.base_path,
dlm, analysis, igame = core.prepare_download(game=game, base_game=base_game, base_path=args.base_path,
force=args.force, max_shm=args.shared_memory,
max_workers=args.max_workers, game_folder=args.game_folder,
disable_patching=args.disable_patching,
@ -638,57 +666,52 @@ def install_gtk(app_name, app_title, parent):
# game is either up to date or hasn't changed, so we have nothing to do
if not analysis.dl_size:
old_igame = self.core.get_installed_game(game.app_name)
logger.info('Download size is 0, the game is either already up to date or has not changed. Exiting...')
old_igame = core.get_installed_game(game.app_name)
log_gtk('Download size is 0, the game is either already up to date or has not changed. Exiting...')
if old_igame and args.repair_mode and os.path.exists(repair_file):
if old_igame.needs_verification:
old_igame.needs_verification = False
self.core.install_game(old_igame)
core.install_game(old_igame)
logger.debug('Removing repair file.')
log_gtk('Removing repair file.')
os.remove(repair_file)
# check if install tags have changed, if they did; try deleting files that are no longer required.
if old_igame and old_igame.install_tags != igame.install_tags:
old_igame.install_tags = igame.install_tags
self.logger.info('Deleting now untagged files.')
self.core.uninstall_tag(old_igame)
self.core.install_game(old_igame)
self.log_gtk('Deleting now untagged files.')
core.uninstall_tag(old_igame)
core.install_game(old_igame)
exit(0)
logger.info(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB')
log_gtk(f'Install size: {analysis.install_size / 1024 / 1024:.02f} MiB')
compression = (1 - (analysis.dl_size / analysis.uncompressed_dl_size)) * 100
logger.info(f'Download size: {analysis.dl_size / 1024 / 1024:.02f} MiB '
log_gtk(f'Download size: {analysis.dl_size / 1024 / 1024:.02f} MiB '
f'(Compression savings: {compression:.01f}%)')
logger.info(f'Reusable size: {analysis.reuse_size / 1024 / 1024:.02f} MiB (chunks) / '
log_gtk(f'Reusable size: {analysis.reuse_size / 1024 / 1024:.02f} MiB (chunks) / '
f'{analysis.unchanged / 1024 / 1024:.02f} MiB (unchanged / skipped)')
res = self.core.check_installation_conditions(analysis=analysis, install=igame, game=game,
updating=self.core.is_installed(args.app_name),
res = core.check_installation_conditions(analysis=analysis, install=igame, game=game,
updating=self.core.is_installed(app_name),
ignore_space_req=args.ignore_space)
if res.warnings or res.failures:
logger.info('Installation requirements check returned the following results:')
log_gtk('Installation requirements check returned the following results:')
if res.warnings:
for warn in sorted(res.warnings):
logger.warning(warn)
log_gtk(warn)
if res.failures:
for msg in sorted(res.failures):
logger.fatal(msg)
logger.error('Installation cannot proceed, exiting.')
log_gtk(msg)
log_gtk('Installation cannot proceed, exiting.')
exit(1)
logger.info('Downloads are resumable, you can interrupt the download with '
log_gtk('Downloads are resumable, you can interrupt the download with '
'CTRL-C and resume it using the same command later on.')
if not args.yes:
if not get_boolean_choice(f'Do you wish to install "{igame.title}"?'):
print('Aborting...')
exit(0)
start_t = time.time()
try:
@ -700,8 +723,8 @@ def install_gtk(app_name, app_title, parent):
dlm.join()
except Exception as e:
end_t = time.time()
logger.info(f'Installation failed after {end_t - start_t:.02f} seconds.')
logger.warning(f'The following exception occurred while waiting for the downloader to finish: {e!r}. '
log_gtk(f'Installation failed after {end_t - start_t:.02f} seconds.'
f'The following exception occurred while waiting for the downloader to finish: {e!r}. '
f'Try restarting the process, the resume file will be used to start where it failed. '
f'If it continues to fail please open an issue on GitHub.')
else:
@ -728,19 +751,18 @@ def install_gtk(app_name, app_title, parent):
install_dlcs = False
if install_dlcs:
_yes, _app_name = args.yes, args.app_name
_yes, _app_name = args.yes, app_name
args.yes = True
for dlc in dlcs:
args.app_name = dlc.app_name
app_name = dlc.app_name
self.install_game(args)
args.yes, args.app_name = _yes, _app_name
args.yes, app_name = _yes, _app_name
if game.supports_cloud_saves and not game.is_dlc:
# todo option to automatically download saves after the installation
# args does not have the required attributes for sync_saves in here,
# not sure how to solve that elegantly.
logger.info('This game supports cloud saves, syncing is handled by the "sync-saves" command.')
logger.info(f'To download saves for this game run "legendary sync-saves {args.app_name}"')
log_gtk(f'This game supports cloud saves, syncing is handled by the "sync-saves" command.To download saves for this game run "legendary sync-saves {app_name}"')
old_igame = self.core.get_installed_game(game.app_name)
if old_igame and args.repair_mode and os.path.exists(repair_file):
@ -748,17 +770,17 @@ def install_gtk(app_name, app_title, parent):
old_igame.needs_verification = False
self.core.install_game(old_igame)
logger.debug('Removing repair file.')
log_gtk('Removing repair file.')
os.remove(repair_file)
# check if install tags have changed, if they did; try deleting files that are no longer required.
if old_igame and old_igame.install_tags != igame.install_tags:
old_igame.install_tags = igame.install_tags
self.logger.info('Deleting now untagged files.')
self.core.uninstall_tag(old_igame)
self.core.install_game(old_igame)
log_gtk('Deleting now untagged files.')
core.uninstall_tag(old_igame)
core.install_game(old_igame)
logger.info(f'Finished installation process in {end_t - start_t:.02f} seconds.')
log_gtk(f'Finished installation process in {end_t - start_t:.02f} seconds.')
class main_window(Gtk.Window):
def __init__(self):

View file

@ -0,0 +1,769 @@
# coding: utf-8
# please don't look at this code too hard, it's a mess.
import logging
import os
import time
from collections import Counter, defaultdict, deque
from logging.handlers import QueueHandler
from multiprocessing import cpu_count, Process, Queue as MPQueue
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
from sys import exit
from threading import Condition, Thread
from legendary.downloader.workers import DLWorker, FileWorker
from legendary.models.downloading import *
from legendary.models.manifest import ManifestComparison, Manifest
class DLManager(Process):
def __init__(self, download_dir, base_url, cache_dir=None, status_q=None,
max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None,
max_shared_memory=1024 * 1024 * 1024):
super().__init__(name='DLManager')
self.log = logging.getLogger('DLM')
self.proc_debug = False
self.base_url = base_url
self.dl_dir = download_dir
self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache')
# All the queues!
self.logging_queue = None
self.dl_worker_queue = None
self.writer_queue = None
self.dl_result_q = None
self.writer_result_q = None
self.max_workers = max_workers if max_workers else min(cpu_count() * 2, 16)
self.dl_timeout = dl_timeout
# Analysis stuff
self.analysis = None
self.tasks = deque()
self.chunks_to_dl = deque()
self.chunk_data_list = None
# shared memory stuff
self.max_shared_memory = max_shared_memory # 1 GiB by default
self.sms = deque()
self.shared_memory = None
# Interval for log updates and pushing updates to the queue
self.update_interval = update_interval
self.status_queue = status_q # queue used to relay status info back to GUI/CLI
# Resume file stuff
self.resume_file = resume_file
self.hash_map = dict()
# cross-thread runtime information
self.running = True
self.active_tasks = 0
self.children = []
self.threads = []
self.conditions = []
# bytes downloaded and decompressed since last report
self.bytes_downloaded_since_last = 0
self.bytes_decompressed_since_last = 0
# bytes written since last report
self.bytes_written_since_last = 0
# bytes read since last report
self.bytes_read_since_last = 0
# chunks written since last report
self.num_processed_since_last = 0
self.num_tasks_processed_since_last = 0
def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None,
patch=True, resume=True, file_prefix_filter=None,
file_exclude_filter=None, file_install_tag=None,
processing_optimization=False) -> AnalysisResult:
"""
Run analysis on manifest and old manifest (if not None) and return a result
with a summary resources required in order to install the provided manifest.
:param manifest: Manifest to install
:param old_manifest: Old manifest to patch from (if applicable)
:param patch: Patch instead of redownloading the entire file
:param resume: Continue based on resume file if it exists
:param file_prefix_filter: Only download files that start with this prefix
:param file_exclude_filter: Exclude files with this prefix from download
:param file_install_tag: Only install files with the specified tag
:param processing_optimization: Attempt to optimize processing order and RAM usage
:return: AnalysisResult
"""
analysis_res = AnalysisResult()
analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements)
analysis_res.biggest_chunk = max(c.window_size for c in manifest.chunk_data_list.elements)
analysis_res.biggest_file_size = max(f.file_size for f in manifest.file_manifest_list.elements)
is_1mib = analysis_res.biggest_chunk == 1024 * 1024
self.log.debug(f'Biggest chunk size: {analysis_res.biggest_chunk} bytes (== 1 MiB? {is_1mib})')
self.log.debug(f'Creating manifest comparison...')
mc = ManifestComparison.create(manifest, old_manifest)
analysis_res.manifest_comparison = mc
if resume and self.resume_file and os.path.exists(self.resume_file):
self.log.info('Found previously interrupted download. Download will be resumed if possible.')
try:
missing = 0
mismatch = 0
completed_files = set()
for line in open(self.resume_file).readlines():
file_hash, _, filename = line.strip().partition(':')
_p = os.path.join(self.dl_dir, filename)
if not os.path.exists(_p):
self.log.debug(f'File does not exist but is in resume file: "{_p}"')
missing += 1
elif file_hash != manifest.file_manifest_list.get_file_by_path(filename).sha_hash.hex():
mismatch += 1
else:
completed_files.add(filename)
if missing:
self.log.warning(f'{missing} previously completed file(s) are missing, they will be redownloaded.')
if mismatch:
self.log.warning(f'{mismatch} existing file(s) have been changed and will be redownloaded.')
# remove completed files from changed/added and move them to unchanged for the analysis.
mc.added -= completed_files
mc.changed -= completed_files
mc.unchanged |= completed_files
self.log.info(f'Skipping {len(completed_files)} files based on resume data.')
except Exception as e:
self.log.warning(f'Reading resume file failed: {e!r}, continuing as normal...')
# Install tags are used for selective downloading, e.g. for language packs
additional_deletion_tasks = []
if file_install_tag is not None:
if isinstance(file_install_tag, str):
file_install_tag = [file_install_tag]
files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements
if not any((fit in i.install_tags) or (not fit and not i.install_tags)
for fit in file_install_tag))
self.log.info(f'Found {len(files_to_skip)} files to skip based on install tag.')
mc.added -= files_to_skip
mc.changed -= files_to_skip
mc.unchanged |= files_to_skip
for fname in sorted(files_to_skip):
additional_deletion_tasks.append(FileTask(fname, delete=True, silent=True))
# if include/exclude prefix has been set: mark all files that are not to be downloaded as unchanged
if file_exclude_filter:
if isinstance(file_exclude_filter, str):
file_exclude_filter = [file_exclude_filter]
file_exclude_filter = [f.lower() for f in file_exclude_filter]
files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements if
any(i.filename.lower().startswith(pfx) for pfx in file_exclude_filter))
self.log.info(f'Found {len(files_to_skip)} files to skip based on exclude prefix.')
mc.added -= files_to_skip
mc.changed -= files_to_skip
mc.unchanged |= files_to_skip
if file_prefix_filter:
if isinstance(file_prefix_filter, str):
file_prefix_filter = [file_prefix_filter]
file_prefix_filter = [f.lower() for f in file_prefix_filter]
files_to_skip = set(i.filename for i in manifest.file_manifest_list.elements if not
any(i.filename.lower().startswith(pfx) for pfx in file_prefix_filter))
self.log.info(f'Found {len(files_to_skip)} files to skip based on include prefix(es)')
mc.added -= files_to_skip
mc.changed -= files_to_skip
mc.unchanged |= files_to_skip
if file_prefix_filter or file_exclude_filter or file_install_tag:
self.log.info(f'Remaining files after filtering: {len(mc.added) + len(mc.changed)}')
# correct install size after filtering
analysis_res.install_size = sum(fm.file_size for fm in manifest.file_manifest_list.elements
if fm.filename in mc.added)
if mc.removed:
analysis_res.removed = len(mc.removed)
self.log.debug(f'{analysis_res.removed} removed files')
if mc.added:
analysis_res.added = len(mc.added)
self.log.debug(f'{analysis_res.added} added files')
if mc.changed:
analysis_res.changed = len(mc.changed)
self.log.debug(f'{analysis_res.changed} changed files')
if mc.unchanged:
analysis_res.unchanged = len(mc.unchanged)
self.log.debug(f'{analysis_res.unchanged} unchanged files')
if processing_optimization and len(manifest.file_manifest_list.elements) > 100_000:
self.log.warning('Manifest contains too many files, processing optimizations will be disabled.')
processing_optimization = False
elif processing_optimization:
self.log.info('Processing order optimization is enabled, analysis may take a few seconds longer...')
# count references to chunks for determining runtime cache size later
references = Counter()
fmlist = sorted(manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower())
for fm in fmlist:
self.hash_map[fm.filename] = fm.sha_hash.hex()
# chunks of unchanged files are not downloaded so we can skip them
if fm.filename in mc.unchanged:
analysis_res.unchanged += fm.file_size
continue
for cp in fm.chunk_parts:
references[cp.guid_num] += 1
if processing_optimization:
s_time = time.time()
# reorder the file manifest list to group files that share many chunks
# 4 is mostly arbitrary but has shown in testing to be a good choice
min_overlap = 4
# ignore files with less than N chunk parts, this speeds things up dramatically
cp_threshold = 5
remaining_files = {fm.filename: {cp.guid_num for cp in fm.chunk_parts}
for fm in fmlist if fm.filename not in mc.unchanged}
_fmlist = []
# iterate over all files that will be downloaded and pair up those that share the most chunks
for fm in fmlist:
if fm.filename not in remaining_files:
continue
_fmlist.append(fm)
f_chunks = remaining_files.pop(fm.filename)
if len(f_chunks) < cp_threshold:
continue
best_overlap, match = 0, None
for fname, chunks in remaining_files.items():
if len(chunks) < cp_threshold:
continue
overlap = len(f_chunks & chunks)
if overlap > min_overlap and overlap > best_overlap:
best_overlap, match = overlap, fname
if match:
_fmlist.append(manifest.file_manifest_list.get_file_by_path(match))
remaining_files.pop(match)
fmlist = _fmlist
opt_delta = time.time() - s_time
self.log.debug(f'Processing optimizations took {opt_delta:.01f} seconds.')
# determine reusable chunks and prepare lookup table for reusable ones
re_usable = defaultdict(dict)
if old_manifest and mc.changed and patch:
self.log.debug('Analyzing manifests for re-usable chunks...')
for changed in mc.changed:
old_file = old_manifest.file_manifest_list.get_file_by_path(changed)
new_file = manifest.file_manifest_list.get_file_by_path(changed)
existing_chunks = defaultdict(list)
off = 0
for cp in old_file.chunk_parts:
existing_chunks[cp.guid_num].append((off, cp.offset, cp.offset + cp.size))
off += cp.size
for cp in new_file.chunk_parts:
key = (cp.guid_num, cp.offset, cp.size)
for file_o, cp_o, cp_end_o in existing_chunks[cp.guid_num]:
# check if new chunk part is wholly contained in the old chunk part
if cp_o <= cp.offset and (cp.offset + cp.size) <= cp_end_o:
references[cp.guid_num] -= 1
re_usable[changed][key] = file_o + (cp.offset - cp_o)
analysis_res.reuse_size += cp.size
break
last_cache_size = current_cache_size = 0
# set to determine whether a file is currently cached or not
cached = set()
# Using this secondary set is orders of magnitude faster than checking the deque.
chunks_in_dl_list = set()
# This is just used to count all unique guids that have been cached
dl_cache_guids = set()
# run through the list of files and create the download jobs and also determine minimum
# runtime cache requirement by simulating adding/removing from cache during download.
self.log.debug('Creating filetasks and chunktasks...')
for current_file in fmlist:
# skip unchanged and empty files
if current_file.filename in mc.unchanged:
continue
elif not current_file.chunk_parts:
self.tasks.append(FileTask(current_file.filename, empty=True))
continue
existing_chunks = re_usable.get(current_file.filename, None)
chunk_tasks = []
reused = 0
for cp in current_file.chunk_parts:
ct = ChunkTask(cp.guid_num, cp.offset, cp.size)
# re-use the chunk from the existing file if we can
if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks:
reused += 1
ct.chunk_file = current_file.filename
ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)]
else:
# add to DL list if not already in it
if cp.guid_num not in chunks_in_dl_list:
self.chunks_to_dl.append(cp.guid_num)
chunks_in_dl_list.add(cp.guid_num)
# if chunk has more than one use or is already in cache,
# check if we need to add or remove it again.
if references[cp.guid_num] > 1 or cp.guid_num in cached:
references[cp.guid_num] -= 1
# delete from cache if no references left
if references[cp.guid_num] < 1:
current_cache_size -= analysis_res.biggest_chunk
cached.remove(cp.guid_num)
ct.cleanup = True
# add to cache if not already cached
elif cp.guid_num not in cached:
dl_cache_guids.add(cp.guid_num)
cached.add(cp.guid_num)
current_cache_size += analysis_res.biggest_chunk
else:
ct.cleanup = True
chunk_tasks.append(ct)
if reused:
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
# open temporary file that will contain download + old file contents
self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True))
self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True))
# delete old file and rename temporary
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True,
temporary_filename=current_file.filename + u'.tmp'))
else:
self.tasks.append(FileTask(current_file.filename, fopen=True))
self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename, close=True))
# check if runtime cache size has changed
if current_cache_size > last_cache_size:
self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB')
last_cache_size = current_cache_size
self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.')
analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe
# Todo implement on-disk caching to avoid this issue.
if analysis_res.min_memory > self.max_shared_memory:
shared_mib = f'{self.max_shared_memory / 1024 / 1024:.01f} MiB'
required_mib = f'{analysis_res.min_memory / 1024 / 1024:.01f} MiB'
suggested_mib = round(self.max_shared_memory / 1024 / 1024 +
(analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32)
if processing_optimization:
message = f'Try running legendary with "--enable-reordering --max-shared-memory {suggested_mib:.0f}"'
else:
message = 'Try running legendary with "--enable-reordering" to reduce memory usage, ' \
f'or use "--max-shared-memory {suggested_mib:.0f}" to increase the limit.'
raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. '
+ message)
# calculate actual dl and patch write size.
analysis_res.dl_size = \
sum(c.file_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list)
analysis_res.uncompressed_dl_size = \
sum(c.window_size for c in manifest.chunk_data_list.elements if c.guid_num in chunks_in_dl_list)
# add jobs to remove files
for fname in mc.removed:
self.tasks.append(FileTask(fname, delete=True))
self.tasks.extend(additional_deletion_tasks)
analysis_res.num_chunks_cache = len(dl_cache_guids)
self.chunk_data_list = manifest.chunk_data_list
self.analysis = analysis_res
return analysis_res
def download_job_manager(self, task_cond: Condition, shm_cond: Condition):
while self.chunks_to_dl and self.running:
while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl:
try:
sms = self.sms.popleft()
no_shm = False
except IndexError: # no free cache
no_shm = True
break
c_guid = self.chunks_to_dl.popleft()
chunk = self.chunk_data_list.get_chunk_by_guid(c_guid)
self.log.debug(f'Adding {chunk.guid_num} (active: {self.active_tasks})')
try:
self.dl_worker_queue.put(DownloaderTask(url=self.base_url + '/' + chunk.path,
chunk_guid=c_guid, shm=sms),
timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to add to download queue: {e!r}')
self.chunks_to_dl.appendleft(c_guid)
break
self.active_tasks += 1
else:
# active tasks limit hit, wait for tasks to finish
with task_cond:
self.log.debug('Waiting for download tasks to complete..')
task_cond.wait(timeout=1.0)
continue
if no_shm:
# if we break we ran out of shared memory, so wait for that.
with shm_cond:
self.log.debug('Waiting for more shared memory...')
shm_cond.wait(timeout=1.0)
self.log.debug('Download Job Manager quitting...')
def dl_results_handler(self, task_cond: Condition):
in_buffer = dict()
task = self.tasks.popleft()
current_file = ''
while task and self.running:
if isinstance(task, FileTask): # this wasn't necessarily a good idea...
try:
if task.empty:
self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0)
elif task.rename:
self.writer_queue.put(WriterTask(task.filename, rename=True,
delete=task.delete,
old_filename=task.temporary_filename),
timeout=1.0)
elif task.delete:
self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0)
elif task.open:
self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0)
current_file = task.filename
elif task.close:
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
except Exception as e:
self.tasks.appendleft(task)
self.log.warning(f'Adding to queue failed: {e!r}')
continue
try:
task = self.tasks.popleft()
except IndexError: # finished
break
continue
while (task.chunk_guid in in_buffer) or task.chunk_file:
res_shm = None
if not task.chunk_file: # not re-using from an old file
res_shm = in_buffer[task.chunk_guid].shm
try:
self.log.debug(f'Adding {task.chunk_guid} to writer queue')
self.writer_queue.put(WriterTask(
filename=current_file, shared_memory=res_shm,
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
chunk_guid=task.chunk_guid, release_memory=task.cleanup,
old_file=task.chunk_file # todo on-disk cache
), timeout=1.0)
except Exception as e:
self.log.warning(f'Adding to queue failed: {e!r}')
break
if task.cleanup and not task.chunk_file:
del in_buffer[task.chunk_guid]
try:
task = self.tasks.popleft()
if isinstance(task, FileTask):
break
except IndexError: # finished
task = None
break
else: # only enter blocking code if the loop did not break
try:
res = self.dl_result_q.get(timeout=1)
self.active_tasks -= 1
with task_cond:
task_cond.notify()
if res.success:
self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...')
in_buffer[res.guid] = res
self.bytes_downloaded_since_last += res.compressed_size
self.bytes_decompressed_since_last += res.size
else:
self.log.error(f'Download for {res.guid} failed, retrying...')
try:
self.dl_worker_queue.put(DownloaderTask(
url=res.url, chunk_guid=res.guid, shm=res.shm
), timeout=1.0)
self.active_tasks += 1
except Exception as e:
self.log.warning(f'Failed adding retry task to queue! {e!r}')
# If this failed for whatever reason, put the chunk at the front of the DL list
self.chunks_to_dl.appendleft(res.chunk_guid)
except Empty:
pass
except Exception as e:
self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}')
self.log.debug('Download result handler quitting...')
def fw_results_handler(self, shm_cond: Condition):
while self.running:
try:
res = self.writer_result_q.get(timeout=1.0)
self.num_tasks_processed_since_last += 1
if res.closed and self.resume_file and res.success:
if res.filename.endswith('.tmp'):
res.filename = res.filename[:-4]
file_hash = self.hash_map[res.filename]
# write last completed file to super simple resume file
with open(self.resume_file, 'ab') as rf:
rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8'))
if res.kill:
self.log.debug('Got termination command in FW result handler')
break
if not res.success:
# todo make this kill the installation process or at least skip the file and mark it as failed
self.log.fatal(f'Writing for {res.filename} failed!')
if res.release_memory:
self.sms.appendleft(res.shm)
with shm_cond:
shm_cond.notify()
if res.chunk_guid:
self.bytes_written_since_last += res.size
# if there's no shared memory we must have read from disk.
if not res.shm:
self.bytes_read_since_last += res.size
self.num_processed_since_last += 1
except Empty:
continue
except Exception as e:
self.log.warning(f'Exception when trying to read writer result queue: {e!r}')
self.log.debug('Writer result handler quitting...')
def run(self):
if not self.analysis:
raise ValueError('Did not run analysis before trying to run download!')
# Subprocess will use its own root logger that logs to a Queue instead
_root = logging.getLogger()
_root.setLevel(logging.DEBUG if self.proc_debug else logging.INFO)
if self.logging_queue:
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
self.log = logging.getLogger('DLManager')
self.log.info(f'Download Manager running with process-id: {os.getpid()}')
try:
self.run_real()
except KeyboardInterrupt:
self.log.warning('Immediate exit requested!')
self.running = False
# send conditions to unlock threads if they aren't already
for cond in self.conditions:
with cond:
cond.notify()
# make sure threads are dead.
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')
# clean up all the queues, otherwise this process won't terminate properly
for name, q in zip(('Download jobs', 'Writer jobs', 'Download results', 'Writer results'),
(self.dl_worker_queue, self.writer_queue, self.dl_result_q, self.writer_result_q)):
self.log.debug(f'Cleaning up queue "{name}"')
try:
while True:
_ = q.get_nowait()
except Empty:
q.close()
q.join_thread()
def run_real(self):
self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory)
self.log.debug(f'Created shared memory of size: {self.shared_memory.size / 1024 / 1024:.02f} MiB')
# create the shared memory segments and add them to their respective pools
for i in range(int(self.shared_memory.size / self.analysis.biggest_chunk)):
_sms = SharedMemorySegment(offset=i * self.analysis.biggest_chunk,
end=i * self.analysis.biggest_chunk + self.analysis.biggest_chunk)
self.sms.append(_sms)
self.log.debug(f'Created {len(self.sms)} shared memory segments.')
# Create queues
self.dl_worker_queue = MPQueue(-1)
self.writer_queue = MPQueue(-1)
self.dl_result_q = MPQueue(-1)
self.writer_result_q = MPQueue(-1)
self.log.info(f'Starting download workers...')
for i in range(self.max_workers):
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
self.shared_memory.name, logging_queue=self.logging_queue,
dl_timeout=self.dl_timeout)
self.children.append(w)
w.start()
self.log.info('Starting file writing worker...')
writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir,
self.shared_memory.name, self.cache_dir, self.logging_queue)
self.children.append(writer_p)
writer_p.start()
num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks)
num_dl_tasks = len(self.chunks_to_dl)
num_tasks = len(self.tasks)
num_shared_memory_segments = len(self.sms)
self.log.debug(f'Chunks to download: {num_dl_tasks}, File tasks: {num_tasks}, Chunk tasks: {num_chunk_tasks}')
# active downloader tasks
self.active_tasks = 0
processed_chunks = 0
processed_tasks = 0
total_dl = 0
total_write = 0
# synchronization conditions
shm_cond = Condition()
task_cond = Condition()
self.conditions = [shm_cond, task_cond]
# start threads
s_time = time.time()
self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond)))
self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,)))
self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,)))
for t in self.threads:
t.start()
last_update = time.time()
while processed_tasks < num_tasks:
delta = time.time() - last_update
if not delta:
time.sleep(self.update_interval)
continue
# update all the things
processed_chunks += self.num_processed_since_last
processed_tasks += self.num_tasks_processed_since_last
total_dl += self.bytes_downloaded_since_last
total_write += self.bytes_written_since_last
dl_speed = self.bytes_downloaded_since_last / delta
dl_unc_speed = self.bytes_decompressed_since_last / delta
w_speed = self.bytes_written_since_last / delta
r_speed = self.bytes_read_since_last / delta
# c_speed = self.num_processed_since_last / delta
# set temporary counters to 0
self.bytes_read_since_last = self.bytes_written_since_last = 0
self.bytes_downloaded_since_last = self.num_processed_since_last = 0
self.bytes_decompressed_since_last = self.num_tasks_processed_since_last = 0
last_update = time.time()
perc = (processed_chunks / num_chunk_tasks) * 100
runtime = time.time() - s_time
total_avail = len(self.sms)
total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024)
if runtime and processed_chunks:
rt_hours, runtime = int(runtime // 3600), runtime % 3600
rt_minutes, rt_seconds = int(runtime // 60), int(runtime % 60)
average_speed = processed_chunks / runtime
estimate = (num_chunk_tasks - processed_chunks) / average_speed
hours, estimate = int(estimate // 3600), estimate % 3600
minutes, seconds = int(estimate // 60), int(estimate % 60)
else:
hours = minutes = seconds = 0
rt_hours = rt_minutes = rt_seconds = 0
self.log.info(f'= Progress: {perc:.02f}% ({processed_chunks}/{num_chunk_tasks}), '
f'Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, '
f'ETA: {hours:02d}:{minutes:02d}:{seconds:02d}')
self.log.info(f' - Downloaded: {total_dl / 1024 / 1024:.02f} MiB, '
f'Written: {total_write / 1024 / 1024:.02f} MiB')
self.log.info(f' - Cache usage: {total_used} MiB, active tasks: {self.active_tasks}')
self.log.info(f' + Download\t- {dl_speed / 1024 / 1024:.02f} MiB/s (raw) '
f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)')
self.log.info(f' + Disk\t- {w_speed / 1024 / 1024:.02f} MiB/s (write) / '
f'{r_speed / 1024 / 1024:.02f} MiB/s (read)')
# send status update to back to instantiator (if queue exists)
if self.status_queue:
try:
self.status_queue.put(UIUpdate(
progress=perc, download_speed=dl_unc_speed, write_speed=w_speed, read_speed=r_speed,
memory_usage=total_used * 1024 * 1024
), timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to send status update to queue: {e!r}')
time.sleep(self.update_interval)
for i in range(self.max_workers):
self.dl_worker_queue.put_nowait(DownloaderTask(kill=True))
self.log.info('Waiting for installation to finish...')
self.writer_queue.put_nowait(WriterTask('', kill=True))
writer_p.join(timeout=10.0)
if writer_p.exitcode is None:
self.log.warning(f'Terminating writer process, no exit code!')
writer_p.terminate()
# forcibly kill DL workers that are not actually dead yet
for child in self.children:
if child.exitcode is None:
child.terminate()
# make sure all the threads are dead.
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')
# clean up resume file
if self.resume_file:
try:
os.remove(self.resume_file)
except OSError as e:
self.log.warning(f'Failed to remove resume file: {e!r}')
# close up shared memory
self.shared_memory.close()
self.shared_memory.unlink()
self.shared_memory = None
self.log.info('All done! Download manager quitting...')
# finally, exit the process.
exit(0)