From 6b88d935766b1feaf68fce07f6c0fb1cc9da5978 Mon Sep 17 00:00:00 2001 From: derrod Date: Fri, 22 May 2020 14:31:49 +0200 Subject: [PATCH] [downloader] Clean up progress and other logging --- legendary/downloader/manager.py | 57 +++++++++++++++++++-------------- legendary/downloader/workers.py | 7 ++++ 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/legendary/downloader/manager.py b/legendary/downloader/manager.py index de90333..6d41c6b 100644 --- a/legendary/downloader/manager.py +++ b/legendary/downloader/manager.py @@ -424,7 +424,7 @@ class DLManager(Process): self.log.debug('Waiting for more shared memory...') shm_cond.wait(timeout=1.0) - self.log.info('Download Job Manager quitting...') + self.log.debug('Download Job Manager quitting...') def dl_results_handler(self, task_cond: Condition): in_buffer = dict() @@ -515,7 +515,7 @@ class DLManager(Process): except Exception as e: self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}') - self.log.info('Download result handler quitting...') + self.log.debug('Download result handler quitting...') def fw_results_handler(self, shm_cond: Condition): while self.running: @@ -530,7 +530,7 @@ class DLManager(Process): rf.write(f'{file_hash}:{res.filename}\n'.encode('utf-8')) if res.kill: - self.log.info('Got termination command in FW result handler') + self.log.debug('Got termination command in FW result handler') break if not res.success: @@ -552,7 +552,7 @@ class DLManager(Process): continue except Exception as e: self.log.warning(f'Exception when trying to read writer result queue: {e!r}') - self.log.info('Writer result handler quitting...') + self.log.debug('Writer result handler quitting...') def run(self): if not self.analysis: @@ -565,7 +565,7 @@ class DLManager(Process): _root.handlers = [] _root.addHandler(QueueHandler(self.logging_queue)) - self.log = logging.getLogger('DLMProc') + self.log = logging.getLogger('DLManager') self.log.info(f'Download Manager running with process-id: {os.getpid()}') try: @@ -674,7 +674,7 @@ class DLManager(Process): dl_unc_speed = self.bytes_decompressed_since_last / delta w_speed = self.bytes_written_since_last / delta r_speed = self.bytes_read_since_last / delta - c_speed = self.num_processed_since_last / delta + # c_speed = self.num_processed_since_last / delta # set temporary counters to 0 self.bytes_read_since_last = self.bytes_written_since_last = 0 @@ -683,24 +683,32 @@ class DLManager(Process): last_update = time.time() perc = (processed_chunks / num_chunk_tasks) * 100 - self.log.info(f'\n============== {time.time() - s_time:.01f} seconds since start') - self.log.info(f'Progress: {processed_chunks}/{num_chunk_tasks} ({perc:.02f}%) chunk tasks processed.') - self.log.info(f'Downloaded: {total_dl / 1024 / 1024:.02f} MiB, ' - f'Written: {total_write / 1024 / 1024:.02f} MiB') - - # speed meters - self.log.info('Speeds:') - self.log.info(f' + Download - {dl_speed / 1024 / 1024:.02f} MiB/s (raw) ' - f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)') - self.log.info(f' + Write (disk) - {w_speed / 1024 / 1024:.02f} MiB/s') - self.log.info(f' + Read (disk) - {r_speed / 1024 / 1024:.02f} MiB/s') - self.log.info(f' + Tasks - {c_speed:.02f} Chunks/s') - self.log.info(f'Active download tasks: {self.active_tasks}') - - # shared memory debugging + runtime = time.time() - s_time total_avail = len(self.sms) total_used = (num_shared_memory_segments - total_avail) * (self.analysis.biggest_chunk / 1024 / 1024) - self.log.info(f'Shared memory usage: {total_used} MiB, available: {total_avail}') + + if runtime and processed_chunks: + rt_hours, runtime = int(runtime // 3600), runtime % 3600 + rt_minutes, rt_seconds = int(runtime // 60), int(runtime % 60) + + average_speed = processed_chunks / runtime + estimate = (num_chunk_tasks - processed_chunks) / average_speed + hours, estimate = int(estimate // 3600), estimate % 3600 + minutes, seconds = int(estimate // 60), int(estimate % 60) + else: + hours = minutes = seconds = 0 + rt_hours = rt_minutes = rt_seconds = 0 + + self.log.info(f'= Progress: {perc:.02f}% ({processed_chunks}/{num_chunk_tasks}), ' + f'Running for {rt_hours:02d}:{rt_minutes:02d}:{rt_seconds:02d}, ' + f'ETA: {hours:02d}:{minutes:02d}:{seconds:02d}') + self.log.info(f' - Downloaded: {total_dl / 1024 / 1024:.02f} MiB, ' + f'Written: {total_write / 1024 / 1024:.02f} MiB') + self.log.info(f' - Cache usage: {total_used} MiB, active tasks: {self.active_tasks}') + self.log.info(f' + Download\t- {dl_speed / 1024 / 1024:.02f} MiB/s (raw) ' + f'/ {dl_unc_speed / 1024 / 1024:.02f} MiB/s (decompressed)') + self.log.info(f' + Disk\t- {w_speed / 1024 / 1024:.02f} MiB/s (write) / ' + f'{r_speed / 1024 / 1024:.02f} MiB/s (read)') # send status update to back to instantiator (if queue exists) if self.status_queue: @@ -717,12 +725,12 @@ class DLManager(Process): for i in range(self.max_workers): self.dl_worker_queue.put_nowait(DownloaderTask(kill=True)) + self.log.info('Waiting for installation to finish...') self.writer_queue.put_nowait(WriterTask('', kill=True)) - self.log.info('Waiting for writer process to finish...') writer_p.join(timeout=10.0) if writer_p.exitcode is None: - self.log.warning(f'Terminating writer process {e!r}') + self.log.warning(f'Terminating writer process, no exit code!') writer_p.terminate() # forcibly kill DL workers that are not actually dead yet @@ -748,5 +756,6 @@ class DLManager(Process): self.shared_memory.unlink() self.shared_memory = None + self.log.info('All done! Download manager quitting...') # finally, exit the process. exit(0) diff --git a/legendary/downloader/workers.py b/legendary/downloader/workers.py index 5fbbb4c..d02606b 100644 --- a/legendary/downloader/workers.py +++ b/legendary/downloader/workers.py @@ -87,6 +87,9 @@ class DLWorker(Process): logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...') # add failed job to result queue to be requeued self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) + except KeyboardInterrupt: + logger.warning('Immediate exit requested, quitting...') + break if not chunk: logger.warning(f'Chunk somehow None?') @@ -108,6 +111,9 @@ class DLWorker(Process): logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...') self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url)) continue + except KeyboardInterrupt: + logger.warning('Immediate exit requested, quitting...') + break self.shm.close() @@ -263,6 +269,7 @@ class FileWorker(Process): except Exception as e: logger.error(f'Closing file after error failed: {e!r}') except KeyboardInterrupt: + logger.warning('Immediate exit requested, quitting...') if current_file: current_file.close() return