[downloader] Reformat manager

This commit is contained in:
derrod 2020-05-05 13:21:55 +02:00
parent 6f8da36947
commit 0e86791237

View file

@ -78,171 +78,6 @@ class DLManager(Process):
self.num_processed_since_last = 0 self.num_processed_since_last = 0
self.num_tasks_processed_since_last = 0 self.num_tasks_processed_since_last = 0
def download_job_manager(self, task_cond: Condition, shm_cond: Condition):
while self.chunks_to_dl and self.running:
while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl:
try:
sms = self.sms.popleft()
no_shm = False
except IndexError: # no free cache
no_shm = True
break
c_guid = self.chunks_to_dl.popleft()
chunk = self.chunk_data_list.get_chunk_by_guid(c_guid)
self.log.debug(f'Adding {chunk.guid_num} (active: {self.active_tasks})')
try:
self.dl_worker_queue.put(DownloaderTask(url=self.base_url + '/' + chunk.path,
chunk_guid=c_guid, shm=sms),
timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to add to download queue: {e!r}')
self.chunks_to_dl.appendleft(c_guid)
break
self.active_tasks += 1
else:
# active tasks limit hit, wait for tasks to finish
with task_cond:
self.log.debug('Waiting for download tasks to complete..')
task_cond.wait(timeout=1.0)
continue
if no_shm:
# if we break we ran out of shared memory, so wait for that.
with shm_cond:
self.log.debug('Waiting for more shared memory...')
shm_cond.wait(timeout=1.0)
self.log.info('Download Job Manager quitting...')
def dl_results_handler(self, task_cond: Condition):
in_buffer = dict()
task = self.tasks.popleft()
current_file = ''
while task and self.running:
if isinstance(task, FileTask): # this wasn't necessarily a good idea...
try:
if task.empty:
self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0)
elif task.rename:
self.writer_queue.put(WriterTask(task.filename, rename=True,
delete=task.delete,
old_filename=task.temporary_filename),
timeout=1.0)
elif task.delete:
self.writer_queue.put(WriterTask(task.filename, delete=True), timeout=1.0)
elif task.open:
self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0)
current_file = task.filename
elif task.close:
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
except Exception as e:
self.tasks.appendleft(task)
self.log.warning(f'Adding to queue failed: {e!r}')
continue
try:
task = self.tasks.popleft()
except IndexError: # finished
break
continue
while (task.chunk_guid in in_buffer) or task.chunk_file:
res_shm = None
if not task.chunk_file: # not re-using from an old file
res_shm = in_buffer[task.chunk_guid].shm
try:
self.log.debug(f'Adding {task.chunk_guid} to writer queue')
self.writer_queue.put(WriterTask(
filename=current_file, shared_memory=res_shm,
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
chunk_guid=task.chunk_guid, release_memory=task.cleanup,
old_file=task.chunk_file # todo on-disk cache
), timeout=1.0)
except Exception as e:
self.log.warning(f'Adding to queue failed: {e!r}')
break
if task.cleanup and not task.chunk_file:
del in_buffer[task.chunk_guid]
try:
task = self.tasks.popleft()
if isinstance(task, FileTask):
break
except IndexError: # finished
task = None
break
else: # only enter blocking code if the loop did not break
try:
res = self.dl_result_q.get(timeout=1)
self.active_tasks -= 1
with task_cond:
task_cond.notify()
if res.success:
self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...')
in_buffer[res.guid] = res
self.bytes_downloaded_since_last += res.compressed_size
self.bytes_decompressed_since_last += res.size
else:
self.log.error(f'Download for {res.guid} failed, retrying...')
try:
self.dl_worker_queue.put(DownloaderTask(
url=res.url, chunk_guid=res.guid, shm=res.shm
), timeout=1.0)
self.active_tasks += 1
except Exception as e:
self.log.warning(f'Failed adding retry task to queue! {e!r}')
# If this failed for whatever reason, put the chunk at the front of the DL list
self.chunks_to_dl.appendleft(res.chunk_guid)
except Empty:
pass
except Exception as e:
self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}')
self.log.info('Download result handler quitting...')
def fw_results_handler(self, shm_cond: Condition):
while self.running:
try:
res = self.writer_result_q.get(timeout=1.0)
self.num_tasks_processed_since_last += 1
if res.closed and self.resume_file:
# write last completed file to super simple resume file
with open(self.resume_file, 'ab') as rf:
rf.write(f'{res.filename}\n'.encode('utf-8'))
if res.kill:
self.log.info('Got termination command in FW result handler')
break
if not res.success:
# todo make this kill the installation process or at least skip the file and mark it as failed
self.log.fatal(f'Writing for {res.filename} failed!')
if res.release_memory:
self.sms.appendleft(res.shm)
with shm_cond:
shm_cond.notify()
if res.chunk_guid:
self.bytes_written_since_last += res.size
# if there's no shared memory we must have read from disk.
if not res.shm:
self.bytes_read_since_last += res.size
self.num_processed_since_last += 1
except Empty:
continue
except Exception as e:
self.log.warning(f'Exception when trying to read writer result queue: {e!r}')
self.log.info('Writer result handler quitting...')
def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None, def run_analysis(self, manifest: Manifest, old_manifest: Manifest = None,
patch=True, resume=True, file_prefix_filter=None, patch=True, resume=True, file_prefix_filter=None,
file_exclude_filter=None, file_install_tag=None, file_exclude_filter=None, file_install_tag=None,
@ -514,6 +349,171 @@ class DLManager(Process):
return analysis_res return analysis_res
def download_job_manager(self, task_cond: Condition, shm_cond: Condition):
while self.chunks_to_dl and self.running:
while self.active_tasks < self.max_workers * 2 and self.chunks_to_dl:
try:
sms = self.sms.popleft()
no_shm = False
except IndexError: # no free cache
no_shm = True
break
c_guid = self.chunks_to_dl.popleft()
chunk = self.chunk_data_list.get_chunk_by_guid(c_guid)
self.log.debug(f'Adding {chunk.guid_num} (active: {self.active_tasks})')
try:
self.dl_worker_queue.put(DownloaderTask(url=self.base_url + '/' + chunk.path,
chunk_guid=c_guid, shm=sms),
timeout=1.0)
except Exception as e:
self.log.warning(f'Failed to add to download queue: {e!r}')
self.chunks_to_dl.appendleft(c_guid)
break
self.active_tasks += 1
else:
# active tasks limit hit, wait for tasks to finish
with task_cond:
self.log.debug('Waiting for download tasks to complete..')
task_cond.wait(timeout=1.0)
continue
if no_shm:
# if we break we ran out of shared memory, so wait for that.
with shm_cond:
self.log.debug('Waiting for more shared memory...')
shm_cond.wait(timeout=1.0)
self.log.info('Download Job Manager quitting...')
def dl_results_handler(self, task_cond: Condition):
in_buffer = dict()
task = self.tasks.popleft()
current_file = ''
while task and self.running:
if isinstance(task, FileTask): # this wasn't necessarily a good idea...
try:
if task.empty:
self.writer_queue.put(WriterTask(task.filename, empty=True), timeout=1.0)
elif task.rename:
self.writer_queue.put(WriterTask(task.filename, rename=True,
delete=task.delete,
old_filename=task.temporary_filename),
timeout=1.0)
elif task.delete:
self.writer_queue.put(WriterTask(task.filename, delete=True), timeout=1.0)
elif task.open:
self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0)
current_file = task.filename
elif task.close:
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
except Exception as e:
self.tasks.appendleft(task)
self.log.warning(f'Adding to queue failed: {e!r}')
continue
try:
task = self.tasks.popleft()
except IndexError: # finished
break
continue
while (task.chunk_guid in in_buffer) or task.chunk_file:
res_shm = None
if not task.chunk_file: # not re-using from an old file
res_shm = in_buffer[task.chunk_guid].shm
try:
self.log.debug(f'Adding {task.chunk_guid} to writer queue')
self.writer_queue.put(WriterTask(
filename=current_file, shared_memory=res_shm,
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
chunk_guid=task.chunk_guid, release_memory=task.cleanup,
old_file=task.chunk_file # todo on-disk cache
), timeout=1.0)
except Exception as e:
self.log.warning(f'Adding to queue failed: {e!r}')
break
if task.cleanup and not task.chunk_file:
del in_buffer[task.chunk_guid]
try:
task = self.tasks.popleft()
if isinstance(task, FileTask):
break
except IndexError: # finished
task = None
break
else: # only enter blocking code if the loop did not break
try:
res = self.dl_result_q.get(timeout=1)
self.active_tasks -= 1
with task_cond:
task_cond.notify()
if res.success:
self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...')
in_buffer[res.guid] = res
self.bytes_downloaded_since_last += res.compressed_size
self.bytes_decompressed_since_last += res.size
else:
self.log.error(f'Download for {res.guid} failed, retrying...')
try:
self.dl_worker_queue.put(DownloaderTask(
url=res.url, chunk_guid=res.guid, shm=res.shm
), timeout=1.0)
self.active_tasks += 1
except Exception as e:
self.log.warning(f'Failed adding retry task to queue! {e!r}')
# If this failed for whatever reason, put the chunk at the front of the DL list
self.chunks_to_dl.appendleft(res.chunk_guid)
except Empty:
pass
except Exception as e:
self.log.warning(f'Unhandled exception when trying to read download result queue: {e!r}')
self.log.info('Download result handler quitting...')
def fw_results_handler(self, shm_cond: Condition):
while self.running:
try:
res = self.writer_result_q.get(timeout=1.0)
self.num_tasks_processed_since_last += 1
if res.closed and self.resume_file:
# write last completed file to super simple resume file
with open(self.resume_file, 'ab') as rf:
rf.write(f'{res.filename}\n'.encode('utf-8'))
if res.kill:
self.log.info('Got termination command in FW result handler')
break
if not res.success:
# todo make this kill the installation process or at least skip the file and mark it as failed
self.log.fatal(f'Writing for {res.filename} failed!')
if res.release_memory:
self.sms.appendleft(res.shm)
with shm_cond:
shm_cond.notify()
if res.chunk_guid:
self.bytes_written_since_last += res.size
# if there's no shared memory we must have read from disk.
if not res.shm:
self.bytes_read_since_last += res.size
self.num_processed_since_last += 1
except Empty:
continue
except Exception as e:
self.log.warning(f'Exception when trying to read writer result queue: {e!r}')
self.log.info('Writer result handler quitting...')
def run(self): def run(self):
if not self.analysis: if not self.analysis:
raise ValueError('Did not run analysis before trying to run download!') raise ValueError('Did not run analysis before trying to run download!')