mirror of
https://github.com/derrod/legendary.git
synced 2024-12-22 17:55:27 +00:00
[downloader] Adjust for changes in new dataclass attribute names
This commit is contained in:
parent
782a18f8f1
commit
dc381cacb0
|
@ -341,14 +341,14 @@ class DLManager(Process):
|
||||||
if reused:
|
if reused:
|
||||||
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
|
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
|
||||||
# open temporary file that will contain download + old file contents
|
# open temporary file that will contain download + old file contents
|
||||||
self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True))
|
self.tasks.append(FileTask(current_file.filename + u'.tmp', open=True))
|
||||||
self.tasks.extend(chunk_tasks)
|
self.tasks.extend(chunk_tasks)
|
||||||
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True))
|
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True))
|
||||||
# delete old file and rename temporary
|
# delete old file and rename temporary
|
||||||
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True,
|
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True,
|
||||||
temporary_filename=current_file.filename + u'.tmp'))
|
temporary_filename=current_file.filename + u'.tmp'))
|
||||||
else:
|
else:
|
||||||
self.tasks.append(FileTask(current_file.filename, fopen=True))
|
self.tasks.append(FileTask(current_file.filename, open=True))
|
||||||
self.tasks.extend(chunk_tasks)
|
self.tasks.extend(chunk_tasks)
|
||||||
self.tasks.append(FileTask(current_file.filename, close=True))
|
self.tasks.append(FileTask(current_file.filename, close=True))
|
||||||
|
|
||||||
|
@ -450,7 +450,7 @@ class DLManager(Process):
|
||||||
elif task.delete:
|
elif task.delete:
|
||||||
self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0)
|
self.writer_queue.put(WriterTask(task.filename, delete=True, silent=task.silent), timeout=1.0)
|
||||||
elif task.open:
|
elif task.open:
|
||||||
self.writer_queue.put(WriterTask(task.filename, fopen=True), timeout=1.0)
|
self.writer_queue.put(WriterTask(task.filename, open=True), timeout=1.0)
|
||||||
current_file = task.filename
|
current_file = task.filename
|
||||||
elif task.close:
|
elif task.close:
|
||||||
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
|
self.writer_queue.put(WriterTask(task.filename, close=True), timeout=1.0)
|
||||||
|
@ -500,15 +500,15 @@ class DLManager(Process):
|
||||||
task_cond.notify()
|
task_cond.notify()
|
||||||
|
|
||||||
if res.success:
|
if res.success:
|
||||||
self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...')
|
self.log.debug(f'Download for {res.chunk_guid} succeeded, adding to in_buffer...')
|
||||||
in_buffer[res.guid] = res
|
in_buffer[res.chunk_guid] = res
|
||||||
self.bytes_downloaded_since_last += res.compressed_size
|
self.bytes_downloaded_since_last += res.compressed_size
|
||||||
self.bytes_decompressed_since_last += res.size
|
self.bytes_decompressed_since_last += res.size
|
||||||
else:
|
else:
|
||||||
self.log.error(f'Download for {res.guid} failed, retrying...')
|
self.log.error(f'Download for {res.chunk_guid} failed, retrying...')
|
||||||
try:
|
try:
|
||||||
self.dl_worker_queue.put(DownloaderTask(
|
self.dl_worker_queue.put(DownloaderTask(
|
||||||
url=res.url, chunk_guid=res.guid, shm=res.shm
|
url=res.url, chunk_guid=res.chunk_guid, shm=res.shared_memory
|
||||||
), timeout=1.0)
|
), timeout=1.0)
|
||||||
self.active_tasks += 1
|
self.active_tasks += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -545,14 +545,14 @@ class DLManager(Process):
|
||||||
# todo make this kill the installation process or at least skip the file and mark it as failed
|
# todo make this kill the installation process or at least skip the file and mark it as failed
|
||||||
self.log.fatal(f'Writing for {res.filename} failed!')
|
self.log.fatal(f'Writing for {res.filename} failed!')
|
||||||
if res.release_memory:
|
if res.release_memory:
|
||||||
self.sms.appendleft(res.shm)
|
self.sms.appendleft(res.shared_memory)
|
||||||
with shm_cond:
|
with shm_cond:
|
||||||
shm_cond.notify()
|
shm_cond.notify()
|
||||||
|
|
||||||
if res.chunk_guid:
|
if res.chunk_guid:
|
||||||
self.bytes_written_since_last += res.size
|
self.bytes_written_since_last += res.size
|
||||||
# if there's no shared memory we must have read from disk.
|
# if there's no shared memory we must have read from disk.
|
||||||
if not res.shm:
|
if not res.shared_memory:
|
||||||
self.bytes_read_since_last += res.size
|
self.bytes_read_since_last += res.size
|
||||||
self.num_processed_since_last += 1
|
self.num_processed_since_last += 1
|
||||||
|
|
||||||
|
|
|
@ -70,12 +70,12 @@ class DLWorker(Process):
|
||||||
r = self.session.get(job.url, timeout=self.dl_timeout)
|
r = self.session.get(job.url, timeout=self.dl_timeout)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f'Chunk download for {job.guid} failed: ({e!r}), retrying...')
|
logger.warning(f'Chunk download for {job.chunk_guid} failed: ({e!r}), retrying...')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
dl_end = time.time()
|
dl_end = time.time()
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
logger.warning(f'Chunk download for {job.guid} failed: status {r.status_code}, retrying...')
|
logger.warning(f'Chunk download for {job.chunk_guid} failed: status {r.status_code}, retrying...')
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
compressed = len(r.content)
|
compressed = len(r.content)
|
||||||
|
@ -84,16 +84,16 @@ class DLWorker(Process):
|
||||||
else:
|
else:
|
||||||
raise TimeoutError('Max retries reached')
|
raise TimeoutError('Max retries reached')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
|
logger.error(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...')
|
||||||
# add failed job to result queue to be requeued
|
# add failed job to result queue to be requeued
|
||||||
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url))
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.warning('Immediate exit requested, quitting...')
|
logger.warning('Immediate exit requested, quitting...')
|
||||||
break
|
break
|
||||||
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
logger.warning(f'Chunk somehow None?')
|
logger.warning(f'Chunk somehow None?')
|
||||||
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# decompress stuff
|
# decompress stuff
|
||||||
|
@ -104,12 +104,12 @@ class DLWorker(Process):
|
||||||
|
|
||||||
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
|
self.shm.buf[job.shm.offset:job.shm.offset + size] = bytes(chunk.data)
|
||||||
del chunk
|
del chunk
|
||||||
self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.guid, shm=job.shm,
|
self.o_q.put(DownloaderTaskResult(success=True, chunk_guid=job.chunk_guid, shm=job.shm,
|
||||||
url=job.url, size=size, compressed_size=compressed,
|
url=job.url, size=size, compressed_size=compressed,
|
||||||
time_delta=dl_end - dl_start))
|
time_delta=dl_end - dl_start))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
|
logger.warning(f'Job for {job.chunk_guid} failed with: {e!r}, fetching next one...')
|
||||||
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.chunk_guid, shm=job.shm, url=job.url))
|
||||||
continue
|
continue
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.warning('Immediate exit requested, quitting...')
|
logger.warning('Immediate exit requested, quitting...')
|
||||||
|
@ -226,9 +226,9 @@ class FileWorker(Process):
|
||||||
pre_write = post_write = 0
|
pre_write = post_write = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if j.shm:
|
if j.shared_memory:
|
||||||
pre_write = time.time()
|
pre_write = time.time()
|
||||||
shm_offset = j.shm.offset + j.chunk_offset
|
shm_offset = j.shared_memory.offset + j.chunk_offset
|
||||||
shm_end = shm_offset + j.chunk_size
|
shm_end = shm_offset + j.chunk_size
|
||||||
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
|
current_file.write(self.shm.buf[shm_offset:shm_end].tobytes())
|
||||||
post_write = time.time()
|
post_write = time.time()
|
||||||
|
@ -251,13 +251,13 @@ class FileWorker(Process):
|
||||||
self.o_q.put(WriterTaskResult(success=False, filename=j.filename,
|
self.o_q.put(WriterTaskResult(success=False, filename=j.filename,
|
||||||
chunk_guid=j.chunk_guid,
|
chunk_guid=j.chunk_guid,
|
||||||
release_memory=j.release_memory,
|
release_memory=j.release_memory,
|
||||||
shm=j.shm, size=j.chunk_size,
|
shared_memory=j.shared_memory, size=j.chunk_size,
|
||||||
time_delta=post_write-pre_write))
|
time_delta=post_write-pre_write))
|
||||||
else:
|
else:
|
||||||
self.o_q.put(WriterTaskResult(success=True, filename=j.filename,
|
self.o_q.put(WriterTaskResult(success=True, filename=j.filename,
|
||||||
chunk_guid=j.chunk_guid,
|
chunk_guid=j.chunk_guid,
|
||||||
release_memory=j.release_memory,
|
release_memory=j.release_memory,
|
||||||
shm=j.shm, size=j.chunk_size,
|
shared_memory=j.shared_memory, size=j.chunk_size,
|
||||||
time_delta=post_write-pre_write))
|
time_delta=post_write-pre_write))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
|
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
|
||||||
|
|
Loading…
Reference in a new issue