[cli/downloader] Migrate to Queue based logging for subprocesses

This commit is contained in:
derrod 2020-04-26 13:19:28 +02:00
parent 035e23b964
commit 0485a728e3
3 changed files with 136 additions and 64 deletions

View file

@ -3,32 +3,41 @@
import argparse
import logging
import multiprocessing
import os
import shlex
import subprocess
import time
import webbrowser
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import freeze_support, Queue as MPQueue
from sys import exit
from legendary.core import LegendaryCore
from legendary.models.exceptions import InvalidCredentialsError
# todo custom formatter for cli logger (clean info, highlighted error/warning)
logging.basicConfig(
format='[%(asctime)s] [%(name)s] %(levelname)s: %(message)s',
format='[%(name)s] %(levelname)s: %(message)s',
level=logging.INFO
)
logger = logging.getLogger('cli')
# todo logger with QueueHandler/QueueListener
# todo custom formatter for cli logger (clean info, highlighted error/warning)
class LegendaryCLI:
def __init__(self):
self.core = LegendaryCore()
self.logger = logging.getLogger('cli')
self.logging_queue = None
def setup_threaded_logging(self):
self.logging_queue = MPQueue(-1)
shandler = logging.StreamHandler()
sformatter = logging.Formatter('[%(asctime)s] [%(name)s] %(levelname)s: %(message)s')
shandler.setFormatter(sformatter)
ql = QueueListener(self.logging_queue, shandler)
ql.start()
return ql
def auth(self, args):
try:
@ -223,6 +232,10 @@ class LegendaryCLI:
start_t = time.time()
try:
# set up logging stuff (should be moved somewhere else later)
dlm.logging_queue = self.logging_queue
dlm.proc_debug = args.dlm_debug
dlm.start()
dlm.join()
except Exception as e:
@ -243,7 +256,7 @@ class LegendaryCLI:
print(f' - {dlc.app_title} (App name: {dlc.app_name}, version: {dlc.app_version})')
# todo recursively call install with modified args to install DLC automatically (after confirm)
print('Installing DLCs works the same as the main game, just use the DLC app name instead.')
print('Automatic installation of DLC is currently not supported.')
print('(Automatic installation of DLC is currently not supported.)')
if postinstall:
self._handle_postinstall(postinstall, igame, yes=args.yes)
@ -354,6 +367,8 @@ def main():
help='Do not mark game as intalled and do not run prereq installers after download.')
install_parser.add_argument('--update-only', dest='update_pnly', action='store_true',
help='Abort if game is not already installed (for automation)')
install_parser.add_argument('--dlm-debug', dest='dlm_debug', action='store_true',
help='Set download manager and worker processes\' loglevel to debug')
launch_parser.add_argument('--offline', dest='offline', action='store_true',
default=False, help='Skip login and launch game without online authentication')
@ -381,6 +396,7 @@ def main():
return
cli = LegendaryCLI()
ql = cli.setup_threaded_logging()
config_ll = cli.core.lgd.config.get('Legendary', 'log_level', fallback='info')
if config_ll == 'debug' or args.debug:
@ -391,6 +407,7 @@ def main():
# technically args.func() with setdefaults could work (see docs on subparsers)
# but that would require all funcs to accept args and extra...
try:
if args.subparser_name == 'auth':
cli.auth(args)
elif args.subparser_name == 'list-games':
@ -403,11 +420,15 @@ def main():
cli.install_game(args)
elif args.subparser_name == 'uninstall':
cli.uninstall_game(args)
except KeyboardInterrupt:
logger.info('Command was aborted via KeyboardInterrupt, cleaning up...')
cli.core.exit()
ql.stop()
exit(0)
if __name__ == '__main__':
multiprocessing.freeze_support() # required for pyinstaller
# required for pyinstaller on Windows, does nothing on other platforms.
freeze_support()
main()

View file

@ -8,6 +8,7 @@ import os
import time
from collections import Counter, defaultdict, deque
from logging.handlers import QueueHandler
from multiprocessing import cpu_count, Process, Queue as MPQueue
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
@ -24,14 +25,15 @@ class DLManager(Process):
max_jobs=100, max_failures=5, max_workers=0, update_interval=1.0,
max_shared_memory=1024 * 1024 * 1024, resume_file=None):
super().__init__(name='DLManager')
self.log = logging.getLogger('DLManager')
self.log_level = self.log.level
self.log = logging.getLogger('DLM')
self.proc_debug = False
self.base_url = base_url
self.dl_dir = download_dir
self.cache_dir = cache_dir if cache_dir else os.path.join(download_dir, '.cache')
# All the queues!
self.logging_queue = None
self.dl_worker_queue = None
self.writer_queue = None
self.dl_result_q = None
@ -63,6 +65,8 @@ class DLManager(Process):
self.running = True
self.active_tasks = 0
self.children = []
self.threads = []
self.conditions = []
# bytes downloaded and decompressed since last report
self.bytes_downloaded_since_last = 0
self.bytes_decompressed_since_last = 0
@ -451,19 +455,43 @@ class DLManager(Process):
if not self.analysis:
raise ValueError('Did not run analysis before trying to run download!')
# fix loglevel in subprocess
self.log.setLevel(self.log_level)
# Subprocess will use its own root logger that logs to a Queue instead
_root = logging.getLogger()
_root.setLevel(logging.DEBUG if self.proc_debug else logging.INFO)
if self.logging_queue:
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
self.log = logging.getLogger('DLMProc')
self.log.info(f'Download Manager running with process-id: {os.getpid()}')
try:
self.run_real()
except KeyboardInterrupt:
self.log.warning('Immediate exit requested!')
self.running = False
for proc in self.children:
# send conditions to unlock threads if they aren't already
for cond in self.conditions:
with cond:
cond.notify()
# make sure threads are dead.
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')
# clean up all the queues, otherwise this process won't terminate properly
for name, q in zip(('Download jobs', 'Writer jobs', 'Download results', 'Writer results'),
(self.dl_worker_queue, self.writer_queue, self.dl_result_q, self.writer_result_q)):
self.log.debug(f'Cleaning up queue "{name}"')
try:
proc.terminate()
except Exception as e:
print(f'Terminating process {repr(proc)} failed: {e!r}')
while True:
_ = q.get_nowait()
except Empty:
q.close()
q.join_thread()
def run_real(self):
self.shared_memory = SharedMemory(create=True, size=self.max_shared_memory)
@ -478,21 +506,22 @@ class DLManager(Process):
self.log.debug(f'Created {len(self.sms)} shared memory segments.')
# Create queues
self.dl_worker_queue = MPQueue()
self.writer_queue = MPQueue()
self.dl_result_q = MPQueue()
self.writer_result_q = MPQueue()
self.dl_worker_queue = MPQueue(-1)
self.writer_queue = MPQueue(-1)
self.dl_result_q = MPQueue(-1)
self.writer_result_q = MPQueue(-1)
self.log.info(f'Starting download workers...')
for i in range(self.max_workers):
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue,
self.dl_result_q, self.shared_memory.name)
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
self.shared_memory.name, logging_queue=self.logging_queue)
self.children.append(w)
w.start()
self.log.info('Starting file writing worker...')
writer_p = FileWorker(self.writer_queue, self.writer_result_q, self.dl_dir,
self.shared_memory.name, self.cache_dir)
self.shared_memory.name, self.cache_dir, self.logging_queue)
self.children.append(writer_p)
writer_p.start()
num_chunk_tasks = sum(isinstance(t, ChunkTask) for t in self.tasks)
@ -511,14 +540,15 @@ class DLManager(Process):
# synchronization conditions
shm_cond = Condition()
task_cond = Condition()
self.conditions = [shm_cond, task_cond]
# start threads
s_time = time.time()
dlj_e = Thread(target=self.download_job_manager, args=(task_cond, shm_cond))
dlr_e = Thread(target=self.dl_results_handler, args=(task_cond,))
fwr_e = Thread(target=self.fw_results_handler, args=(shm_cond,))
self.threads.append(Thread(target=self.download_job_manager, args=(task_cond, shm_cond)))
self.threads.append(Thread(target=self.dl_results_handler, args=(task_cond,)))
self.threads.append(Thread(target=self.fw_results_handler, args=(shm_cond,)))
for t in (dlj_e, dlr_e, fwr_e):
for t in self.threads:
t.start()
last_update = time.time()
@ -597,7 +627,7 @@ class DLManager(Process):
child.terminate()
# make sure all the threads are dead.
for t in (dlj_e, dlr_e, fwr_e):
for t in self.threads:
t.join(timeout=5.0)
if t.is_alive():
self.log.warning(f'Thread did not terminate! {repr(t)}')

View file

@ -6,6 +6,7 @@ import requests
import time
import logging
from logging.handlers import QueueHandler
from multiprocessing import Process
from multiprocessing.shared_memory import SharedMemory
from queue import Empty
@ -15,7 +16,7 @@ from legendary.models.downloading import DownloaderTaskResult, WriterTaskResult
class DLWorker(Process):
def __init__(self, name, queue, out_queue, shm, max_retries=5):
def __init__(self, name, queue, out_queue, shm, max_retries=5, logging_queue=None):
super().__init__(name=name)
self.q = queue
self.o_q = out_queue
@ -25,9 +26,19 @@ class DLWorker(Process):
})
self.max_retries = max_retries
self.shm = SharedMemory(name=shm)
self.log = logging.getLogger('DLWorker')
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
empty = False
while True:
try:
@ -35,12 +46,12 @@ class DLWorker(Process):
empty = False
except Empty:
if not empty:
self.log.debug(f'[{self.name}] Queue Empty, waiting for more...')
logger.debug(f'[{self.name}] Queue Empty, waiting for more...')
empty = True
continue
if job.kill: # let worker die
self.log.info(f'[{self.name}] Worker received kill signal, shutting down...')
logger.debug(f'[{self.name}] Worker received kill signal, shutting down...')
break
tries = 0
@ -51,19 +62,19 @@ class DLWorker(Process):
try:
while tries < self.max_retries:
# print('Downloading', job.url)
self.log.debug(f'[{self.name}] Downloading {job.url}')
logger.debug(f'[{self.name}] Downloading {job.url}')
dl_start = time.time()
try:
r = self.session.get(job.url, timeout=5.0)
r.raise_for_status()
except Exception as e:
self.log.warning(f'[{self.name}] Chunk download failed ({e!r}), retrying...')
logger.warning(f'[{self.name}] Chunk download failed ({e!r}), retrying...')
continue
dl_end = time.time()
if r.status_code != 200:
self.log.warning(f'[{self.name}] Chunk download failed (Status {r.status_code}), retrying...')
logger.warning(f'[{self.name}] Chunk download failed (Status {r.status_code}), retrying...')
continue
else:
compressed = len(r.content)
@ -72,12 +83,12 @@ class DLWorker(Process):
else:
raise TimeoutError('Max retries reached')
except Exception as e:
self.log.error(f'[{self.name}] Job failed with: {e!r}, fetching next one...')
logger.error(f'[{self.name}] Job failed with: {e!r}, fetching next one...')
# add failed job to result queue to be requeued
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
if not chunk:
self.log.warning(f'[{self.name}] Chunk smoehow None?')
logger.warning(f'[{self.name}] Chunk smoehow None?')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
@ -85,7 +96,7 @@ class DLWorker(Process):
try:
size = len(chunk.data)
if size > job.shm.size:
self.log.fatal(f'Downloaded chunk is longer than SharedMemorySegment!')
logger.fatal(f'Downloaded chunk is longer than SharedMemorySegment!')
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
del chunk
@ -93,7 +104,7 @@ class DLWorker(Process):
url=job.url, size=size, compressed_size=compressed,
time_delta=dl_end - dl_start))
except Exception as e:
self.log.warning(f'[{self.name}] Job failed with: {e!r}, fetching next one...')
logger.warning(f'[{self.name}] Job failed with: {e!r}, fetching next one...')
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
continue
@ -101,16 +112,26 @@ class DLWorker(Process):
class FileWorker(Process):
def __init__(self, queue, out_queue, base_path, shm, cache_path=None):
super().__init__(name='File worker')
def __init__(self, queue, out_queue, base_path, shm, cache_path=None, logging_queue=None):
super().__init__(name='FileWorker')
self.q = queue
self.o_q = out_queue
self.base_path = base_path
self.cache_path = cache_path if cache_path else os.path.join(base_path, '.cache')
self.shm = SharedMemory(name=shm)
self.log = logging.getLogger('DLWorker')
self.log_level = logging.getLogger().level
self.logging_queue = logging_queue
def run(self):
# we have to fix up the logger before we can start
_root = logging.getLogger()
_root.handlers = []
_root.addHandler(QueueHandler(self.logging_queue))
logger = logging.getLogger(self.name)
logger.setLevel(self.log_level)
logger.debug(f'Download worker reporting for duty!')
last_filename = ''
current_file = None
@ -119,7 +140,7 @@ class FileWorker(Process):
try:
j = self.q.get(timeout=10.0)
except Empty:
self.log.warning('Writer queue empty!')
logger.warning('Writer queue empty!')
continue
if j.kill:
@ -141,7 +162,7 @@ class FileWorker(Process):
continue
elif j.open:
if current_file:
self.log.warning(f'Opening new file {j.filename} without closing previous! {last_filename}')
logger.warning(f'Opening new file {j.filename} without closing previous! {last_filename}')
current_file.close()
current_file = open(full_path, 'wb')
@ -154,27 +175,27 @@ class FileWorker(Process):
current_file.close()
current_file = None
else:
self.log.warning(f'Asking to close file that is not open: {j.filename}')
logger.warning(f'Asking to close file that is not open: {j.filename}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename, closed=True))
continue
elif j.rename:
if current_file:
self.log.warning('Trying to rename file without closing first!')
logger.warning('Trying to rename file without closing first!')
current_file.close()
current_file = None
if j.delete:
try:
os.remove(full_path)
except OSError as e:
self.log.error(f'Removing file failed: {e!r}')
logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename))
continue
try:
os.rename(os.path.join(self.base_path, j.old_filename), full_path)
except OSError as e:
self.log.error(f'Renaming file failed: {e!r}')
logger.error(f'Renaming file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename))
continue
@ -182,14 +203,14 @@ class FileWorker(Process):
continue
elif j.delete:
if current_file:
self.log.warning('Trying to delete file without closing first!')
logger.warning('Trying to delete file without closing first!')
current_file.close()
current_file = None
try:
os.remove(full_path)
except OSError as e:
self.log.error(f'Removing file failed: {e!r}')
logger.error(f'Removing file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=True, filename=j.filename))
continue
@ -218,7 +239,7 @@ class FileWorker(Process):
current_file.write(f.read(j.chunk_size))
post_write = time.time()
except Exception as e:
self.log.warning(f'Something in writing a file failed: {e!r}')
logger.warning(f'Something in writing a file failed: {e!r}')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename,
chunk_guid=j.chunk_guid,
release_memory=j.release_memory,
@ -231,7 +252,7 @@ class FileWorker(Process):
shm=j.shm, size=j.chunk_size,
time_delta=post_write-pre_write))
except Exception as e:
self.log.warning(f'[{self.name}] Job {j.filename} failed with: {e!r}, fetching next one...')
logger.warning(f'[{self.name}] Job {j.filename} failed with: {e!r}, fetching next one...')
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid))
try:
@ -239,7 +260,7 @@ class FileWorker(Process):
current_file.close()
current_file = None
except Exception as e:
self.log.error(f'[{self.name}] Closing file after error failed: {e!r}')
logger.error(f'[{self.name}] Closing file after error failed: {e!r}')
except KeyboardInterrupt:
if current_file:
current_file.close()