diff --git a/legendary/downloader/mp/manager.py b/legendary/downloader/mp/manager.py index 90ab37a..eff9813 100644 --- a/legendary/downloader/mp/manager.py +++ b/legendary/downloader/mp/manager.py @@ -367,6 +367,7 @@ class DLManager(Process): ct.cleanup = True # add to cache if not already cached elif cp.guid_num not in cached: + ct.cache = True dl_cache_guids.add(cp.guid_num) cached.add(cp.guid_num) current_cache_size += analysis_res.biggest_chunk @@ -398,7 +399,7 @@ class DLManager(Process): last_cache_size = current_cache_size self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') - analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe + analysis_res.min_memory = analysis_res.biggest_chunk * 32 # Todo implement on-disk caching to avoid this issue. if analysis_res.min_memory > self.max_shared_memory: @@ -407,12 +408,7 @@ class DLManager(Process): suggested_mib = round(self.max_shared_memory / 1024 / 1024 + (analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32) - if processing_optimization: - message = f'Try running legendary with "--enable-reordering --max-shared-memory {suggested_mib:.0f}"' - else: - message = 'Try running legendary with "--enable-reordering" to reduce memory usage, ' \ - f'or use "--max-shared-memory {suggested_mib:.0f}" to increase the limit.' - + message = f'Try running legendary with "--max-shared-memory {suggested_mib:.0f}"' raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. ' + message) @@ -473,6 +469,7 @@ class DLManager(Process): def dl_results_handler(self, task_cond: Condition): in_buffer = dict() + in_cache = dict() task = self.tasks.popleft() current_file = '' @@ -494,10 +491,27 @@ class DLManager(Process): break continue - while (task.chunk_guid in in_buffer) or task.chunk_file: + while (task.chunk_guid in in_cache or task.chunk_guid in in_buffer or task.chunk_file): res_shm = None - if not task.chunk_file: # not re-using from an old file + cache_file = None + flags = TaskFlags.NONE + if task.chunk_guid in in_cache: + cache_file = in_cache[task.chunk_guid] + if task.cleanup: + flags = TaskFlags.CLEAR_CACHE + del in_cache[task.chunk_guid] + elif task.chunk_guid in in_buffer: # not re-using from an old file res_shm = in_buffer[task.chunk_guid].shm + flags = TaskFlags.RELEASE_MEMORY + del in_buffer[task.chunk_guid] + if task.cache: # Blocking caching - not the best idea + self.log.debug(f'Adding {task.chunk_guid} to cache') + shm_offset = res_shm.offset + task.chunk_offset + shm_end = shm_offset + task.chunk_size + cache_file = f"{task.chunk_guid}.chunk" + with open(os.path.join(self.cache_dir, cache_file), 'wb') as f: + f.write(self.shared_memory.buf[shm_offset:shm_end]) + in_cache[task.chunk_guid] = cache_file try: self.log.debug(f'Adding {task.chunk_guid} to writer queue') @@ -505,15 +519,12 @@ class DLManager(Process): filename=current_file, shared_memory=res_shm, chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, chunk_guid=task.chunk_guid, old_file=task.chunk_file, - flags=TaskFlags.RELEASE_MEMORY if task.cleanup else TaskFlags.NONE + cache_file=cache_file, flags=flags, ), timeout=1.0) except Exception as e: self.log.warning(f'Adding to queue failed: {e!r}') break - if task.cleanup and not task.chunk_file: - del in_buffer[task.chunk_guid] - try: task = self.tasks.popleft() if isinstance(task, FileTask): @@ -578,6 +589,13 @@ class DLManager(Process): with shm_cond: shm_cond.notify() + if res.flags & TaskFlags.CLEAR_CACHE: + self.log.debug(f'Removing cache {res.cache_file}') + try: + os.remove(os.path.join(self.cache_dir, res.cache_file)) + except FileNotFoundError: + self.log.debug(f'Cache file {res.cache_file} not found, skipping removal.') + if res.chunk_guid: self.bytes_written_since_last += res.size # if there's no shared memory we must have read from disk. diff --git a/legendary/downloader/mp/workers.py b/legendary/downloader/mp/workers.py index 4b63192..9b4ba28 100644 --- a/legendary/downloader/mp/workers.py +++ b/legendary/downloader/mp/workers.py @@ -170,6 +170,9 @@ class FileWorker(Process): last_filename = '' current_file = None + if not os.path.exists(self.cache_path): + os.makedirs(self.cache_path) + while True: try: try: @@ -275,8 +278,6 @@ class FileWorker(Process): current_file.write(self.shm.buf[shm_offset:shm_end]) elif j.cache_file: with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f: - if j.chunk_offset: - f.seek(j.chunk_offset) current_file.write(f.read(j.chunk_size)) elif j.old_file: with open(os.path.join(self.base_path, j.old_file), 'rb') as f: diff --git a/legendary/models/downloading.py b/legendary/models/downloading.py index d2d945a..fe3df5d 100644 --- a/legendary/models/downloading.py +++ b/legendary/models/downloading.py @@ -48,7 +48,9 @@ class ChunkTask: chunk_guid: int chunk_offset: int = 0 chunk_size: int = 0 - # Whether this chunk can be removed from memory/disk after having been written + # Chunk should be added to the disk cache + cache: bool = False + # Whether this chunk can be removed from disk after having been written cleanup: bool = False # Path to the file the chunk is read from (if not from memory) chunk_file: Optional[str] = None @@ -64,6 +66,7 @@ class TaskFlags(Flag): RELEASE_MEMORY = auto() MAKE_EXECUTABLE = auto() SILENT = auto() + CLEAR_CACHE = auto() @dataclass