mirror of
https://github.com/derrod/legendary.git
synced 2025-01-10 14:25:28 +00:00
[downloader] Update logging (more debug/cleanup)
This commit is contained in:
parent
f3afb5b393
commit
6f8da36947
|
@ -156,6 +156,7 @@ class DLManager(Process):
|
||||||
res_shm = in_buffer[task.chunk_guid].shm
|
res_shm = in_buffer[task.chunk_guid].shm
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
self.log.debug(f'Adding {task.chunk_guid} to writer queue')
|
||||||
self.writer_queue.put(WriterTask(
|
self.writer_queue.put(WriterTask(
|
||||||
filename=current_file, shared_memory=res_shm,
|
filename=current_file, shared_memory=res_shm,
|
||||||
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
|
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
|
||||||
|
@ -184,6 +185,7 @@ class DLManager(Process):
|
||||||
task_cond.notify()
|
task_cond.notify()
|
||||||
|
|
||||||
if res.success:
|
if res.success:
|
||||||
|
self.log.debug(f'Download for {res.guid} succeeded, adding to in_buffer...')
|
||||||
in_buffer[res.guid] = res
|
in_buffer[res.guid] = res
|
||||||
self.bytes_downloaded_since_last += res.compressed_size
|
self.bytes_downloaded_since_last += res.compressed_size
|
||||||
self.bytes_decompressed_since_last += res.size
|
self.bytes_decompressed_since_last += res.size
|
||||||
|
|
|
@ -48,12 +48,12 @@ class DLWorker(Process):
|
||||||
empty = False
|
empty = False
|
||||||
except Empty:
|
except Empty:
|
||||||
if not empty:
|
if not empty:
|
||||||
logger.debug(f'[{self.name}] Queue Empty, waiting for more...')
|
logger.debug(f'Queue Empty, waiting for more...')
|
||||||
empty = True
|
empty = True
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if job.kill: # let worker die
|
if job.kill: # let worker die
|
||||||
logger.debug(f'[{self.name}] Worker received kill signal, shutting down...')
|
logger.debug(f'Worker received kill signal, shutting down...')
|
||||||
break
|
break
|
||||||
|
|
||||||
tries = 0
|
tries = 0
|
||||||
|
@ -64,19 +64,19 @@ class DLWorker(Process):
|
||||||
try:
|
try:
|
||||||
while tries < self.max_retries:
|
while tries < self.max_retries:
|
||||||
# print('Downloading', job.url)
|
# print('Downloading', job.url)
|
||||||
logger.debug(f'[{self.name}] Downloading {job.url}')
|
logger.debug(f'Downloading {job.url}')
|
||||||
dl_start = time.time()
|
dl_start = time.time()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = self.session.get(job.url, timeout=self.dl_timeout)
|
r = self.session.get(job.url, timeout=self.dl_timeout)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f'[{self.name}] Chunk download failed ({e!r}), retrying...')
|
logger.warning(f'Chunk download for {job.guid} failed: ({e!r}), retrying...')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
dl_end = time.time()
|
dl_end = time.time()
|
||||||
if r.status_code != 200:
|
if r.status_code != 200:
|
||||||
logger.warning(f'[{self.name}] Chunk download failed (Status {r.status_code}), retrying...')
|
logger.warning(f'Chunk download for {job.guid} failed: status {r.status_code}, retrying...')
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
compressed = len(r.content)
|
compressed = len(r.content)
|
||||||
|
@ -85,12 +85,12 @@ class DLWorker(Process):
|
||||||
else:
|
else:
|
||||||
raise TimeoutError('Max retries reached')
|
raise TimeoutError('Max retries reached')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f'[{self.name}] Job failed with: {e!r}, fetching next one...')
|
logger.error(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
|
||||||
# add failed job to result queue to be requeued
|
# add failed job to result queue to be requeued
|
||||||
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
||||||
|
|
||||||
if not chunk:
|
if not chunk:
|
||||||
logger.warning(f'[{self.name}] Chunk smoehow None?')
|
logger.warning(f'Chunk somehow None?')
|
||||||
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ class DLWorker(Process):
|
||||||
url=job.url, size=size, compressed_size=compressed,
|
url=job.url, size=size, compressed_size=compressed,
|
||||||
time_delta=dl_end - dl_start))
|
time_delta=dl_end - dl_start))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f'[{self.name}] Job failed with: {e!r}, fetching next one...')
|
logger.warning(f'Job for {job.guid} failed with: {e!r}, fetching next one...')
|
||||||
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
self.o_q.put(DownloaderTaskResult(success=False, chunk_guid=job.guid, shm=job.shm, url=job.url))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -254,7 +254,7 @@ class FileWorker(Process):
|
||||||
shm=j.shm, size=j.chunk_size,
|
shm=j.shm, size=j.chunk_size,
|
||||||
time_delta=post_write-pre_write))
|
time_delta=post_write-pre_write))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f'[{self.name}] Job {j.filename} failed with: {e!r}, fetching next one...')
|
logger.warning(f'Job {j.filename} failed with: {e!r}, fetching next one...')
|
||||||
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid))
|
self.o_q.put(WriterTaskResult(success=False, filename=j.filename, chunk_guid=j.chunk_guid))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -262,7 +262,7 @@ class FileWorker(Process):
|
||||||
current_file.close()
|
current_file.close()
|
||||||
current_file = None
|
current_file = None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f'[{self.name}] Closing file after error failed: {e!r}')
|
logger.error(f'Closing file after error failed: {e!r}')
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
if current_file:
|
if current_file:
|
||||||
current_file.close()
|
current_file.close()
|
||||||
|
|
Loading…
Reference in a new issue