From 359985e05a07c49bc97cd166a294babfc505045e Mon Sep 17 00:00:00 2001 From: Roman513 Date: Sun, 22 Dec 2024 15:06:32 +0400 Subject: [PATCH] Implement file cache Write chunks to disk if they have to be cached instead of keeping in memory Better to remove blocking cache creation and move it out from manager, but now it is the only way to guarantee that cache file saved before we need it for another file writing task --- legendary/downloader/mp/manager.py | 44 +++++++++++++++++++++--------- legendary/downloader/mp/workers.py | 5 ++-- legendary/models/downloading.py | 5 +++- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/legendary/downloader/mp/manager.py b/legendary/downloader/mp/manager.py index 90ab37a..eff9813 100644 --- a/legendary/downloader/mp/manager.py +++ b/legendary/downloader/mp/manager.py @@ -367,6 +367,7 @@ class DLManager(Process): ct.cleanup = True # add to cache if not already cached elif cp.guid_num not in cached: + ct.cache = True dl_cache_guids.add(cp.guid_num) cached.add(cp.guid_num) current_cache_size += analysis_res.biggest_chunk @@ -398,7 +399,7 @@ class DLManager(Process): last_cache_size = current_cache_size self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') - analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe + analysis_res.min_memory = analysis_res.biggest_chunk * 32 # Todo implement on-disk caching to avoid this issue. if analysis_res.min_memory > self.max_shared_memory: @@ -407,12 +408,7 @@ class DLManager(Process): suggested_mib = round(self.max_shared_memory / 1024 / 1024 + (analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32) - if processing_optimization: - message = f'Try running legendary with "--enable-reordering --max-shared-memory {suggested_mib:.0f}"' - else: - message = 'Try running legendary with "--enable-reordering" to reduce memory usage, ' \ - f'or use "--max-shared-memory {suggested_mib:.0f}" to increase the limit.' - + message = f'Try running legendary with "--max-shared-memory {suggested_mib:.0f}"' raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. ' + message) @@ -473,6 +469,7 @@ class DLManager(Process): def dl_results_handler(self, task_cond: Condition): in_buffer = dict() + in_cache = dict() task = self.tasks.popleft() current_file = '' @@ -494,10 +491,27 @@ class DLManager(Process): break continue - while (task.chunk_guid in in_buffer) or task.chunk_file: + while (task.chunk_guid in in_cache or task.chunk_guid in in_buffer or task.chunk_file): res_shm = None - if not task.chunk_file: # not re-using from an old file + cache_file = None + flags = TaskFlags.NONE + if task.chunk_guid in in_cache: + cache_file = in_cache[task.chunk_guid] + if task.cleanup: + flags = TaskFlags.CLEAR_CACHE + del in_cache[task.chunk_guid] + elif task.chunk_guid in in_buffer: # not re-using from an old file res_shm = in_buffer[task.chunk_guid].shm + flags = TaskFlags.RELEASE_MEMORY + del in_buffer[task.chunk_guid] + if task.cache: # Blocking caching - not the best idea + self.log.debug(f'Adding {task.chunk_guid} to cache') + shm_offset = res_shm.offset + task.chunk_offset + shm_end = shm_offset + task.chunk_size + cache_file = f"{task.chunk_guid}.chunk" + with open(os.path.join(self.cache_dir, cache_file), 'wb') as f: + f.write(self.shared_memory.buf[shm_offset:shm_end]) + in_cache[task.chunk_guid] = cache_file try: self.log.debug(f'Adding {task.chunk_guid} to writer queue') @@ -505,15 +519,12 @@ class DLManager(Process): filename=current_file, shared_memory=res_shm, chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, chunk_guid=task.chunk_guid, old_file=task.chunk_file, - flags=TaskFlags.RELEASE_MEMORY if task.cleanup else TaskFlags.NONE + cache_file=cache_file, flags=flags, ), timeout=1.0) except Exception as e: self.log.warning(f'Adding to queue failed: {e!r}') break - if task.cleanup and not task.chunk_file: - del in_buffer[task.chunk_guid] - try: task = self.tasks.popleft() if isinstance(task, FileTask): @@ -578,6 +589,13 @@ class DLManager(Process): with shm_cond: shm_cond.notify() + if res.flags & TaskFlags.CLEAR_CACHE: + self.log.debug(f'Removing cache {res.cache_file}') + try: + os.remove(os.path.join(self.cache_dir, res.cache_file)) + except FileNotFoundError: + self.log.debug(f'Cache file {res.cache_file} not found, skipping removal.') + if res.chunk_guid: self.bytes_written_since_last += res.size # if there's no shared memory we must have read from disk. diff --git a/legendary/downloader/mp/workers.py b/legendary/downloader/mp/workers.py index 4b63192..9b4ba28 100644 --- a/legendary/downloader/mp/workers.py +++ b/legendary/downloader/mp/workers.py @@ -170,6 +170,9 @@ class FileWorker(Process): last_filename = '' current_file = None + if not os.path.exists(self.cache_path): + os.makedirs(self.cache_path) + while True: try: try: @@ -275,8 +278,6 @@ class FileWorker(Process): current_file.write(self.shm.buf[shm_offset:shm_end]) elif j.cache_file: with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f: - if j.chunk_offset: - f.seek(j.chunk_offset) current_file.write(f.read(j.chunk_size)) elif j.old_file: with open(os.path.join(self.base_path, j.old_file), 'rb') as f: diff --git a/legendary/models/downloading.py b/legendary/models/downloading.py index d2d945a..fe3df5d 100644 --- a/legendary/models/downloading.py +++ b/legendary/models/downloading.py @@ -48,7 +48,9 @@ class ChunkTask: chunk_guid: int chunk_offset: int = 0 chunk_size: int = 0 - # Whether this chunk can be removed from memory/disk after having been written + # Chunk should be added to the disk cache + cache: bool = False + # Whether this chunk can be removed from disk after having been written cleanup: bool = False # Path to the file the chunk is read from (if not from memory) chunk_file: Optional[str] = None @@ -64,6 +66,7 @@ class TaskFlags(Flag): RELEASE_MEMORY = auto() MAKE_EXECUTABLE = auto() SILENT = auto() + CLEAR_CACHE = auto() @dataclass