Implement file cache

Write chunks to disk if they have to be cached instead of keeping in memory
Better to remove blocking cache creation and move it out from manager, but now it is the only way to guarantee that cache file saved before we need it for another file writing task
This commit is contained in:
Roman513 2024-12-22 15:06:32 +04:00
parent 3963382b3f
commit 359985e05a
3 changed files with 38 additions and 16 deletions

View file

@ -367,6 +367,7 @@ class DLManager(Process):
ct.cleanup = True ct.cleanup = True
# add to cache if not already cached # add to cache if not already cached
elif cp.guid_num not in cached: elif cp.guid_num not in cached:
ct.cache = True
dl_cache_guids.add(cp.guid_num) dl_cache_guids.add(cp.guid_num)
cached.add(cp.guid_num) cached.add(cp.guid_num)
current_cache_size += analysis_res.biggest_chunk current_cache_size += analysis_res.biggest_chunk
@ -398,7 +399,7 @@ class DLManager(Process):
last_cache_size = current_cache_size last_cache_size = current_cache_size
self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.') self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.')
analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe analysis_res.min_memory = analysis_res.biggest_chunk * 32
# Todo implement on-disk caching to avoid this issue. # Todo implement on-disk caching to avoid this issue.
if analysis_res.min_memory > self.max_shared_memory: if analysis_res.min_memory > self.max_shared_memory:
@ -407,12 +408,7 @@ class DLManager(Process):
suggested_mib = round(self.max_shared_memory / 1024 / 1024 + suggested_mib = round(self.max_shared_memory / 1024 / 1024 +
(analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32) (analysis_res.min_memory - self.max_shared_memory) / 1024 / 1024 + 32)
if processing_optimization: message = f'Try running legendary with "--max-shared-memory {suggested_mib:.0f}"'
message = f'Try running legendary with "--enable-reordering --max-shared-memory {suggested_mib:.0f}"'
else:
message = 'Try running legendary with "--enable-reordering" to reduce memory usage, ' \
f'or use "--max-shared-memory {suggested_mib:.0f}" to increase the limit.'
raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. ' raise MemoryError(f'Current shared memory cache is smaller than required: {shared_mib} < {required_mib}. '
+ message) + message)
@ -473,6 +469,7 @@ class DLManager(Process):
def dl_results_handler(self, task_cond: Condition): def dl_results_handler(self, task_cond: Condition):
in_buffer = dict() in_buffer = dict()
in_cache = dict()
task = self.tasks.popleft() task = self.tasks.popleft()
current_file = '' current_file = ''
@ -494,10 +491,27 @@ class DLManager(Process):
break break
continue continue
while (task.chunk_guid in in_buffer) or task.chunk_file: while (task.chunk_guid in in_cache or task.chunk_guid in in_buffer or task.chunk_file):
res_shm = None res_shm = None
if not task.chunk_file: # not re-using from an old file cache_file = None
flags = TaskFlags.NONE
if task.chunk_guid in in_cache:
cache_file = in_cache[task.chunk_guid]
if task.cleanup:
flags = TaskFlags.CLEAR_CACHE
del in_cache[task.chunk_guid]
elif task.chunk_guid in in_buffer: # not re-using from an old file
res_shm = in_buffer[task.chunk_guid].shm res_shm = in_buffer[task.chunk_guid].shm
flags = TaskFlags.RELEASE_MEMORY
del in_buffer[task.chunk_guid]
if task.cache: # Blocking caching - not the best idea
self.log.debug(f'Adding {task.chunk_guid} to cache')
shm_offset = res_shm.offset + task.chunk_offset
shm_end = shm_offset + task.chunk_size
cache_file = f"{task.chunk_guid}.chunk"
with open(os.path.join(self.cache_dir, cache_file), 'wb') as f:
f.write(self.shared_memory.buf[shm_offset:shm_end])
in_cache[task.chunk_guid] = cache_file
try: try:
self.log.debug(f'Adding {task.chunk_guid} to writer queue') self.log.debug(f'Adding {task.chunk_guid} to writer queue')
@ -505,15 +519,12 @@ class DLManager(Process):
filename=current_file, shared_memory=res_shm, filename=current_file, shared_memory=res_shm,
chunk_offset=task.chunk_offset, chunk_size=task.chunk_size, chunk_offset=task.chunk_offset, chunk_size=task.chunk_size,
chunk_guid=task.chunk_guid, old_file=task.chunk_file, chunk_guid=task.chunk_guid, old_file=task.chunk_file,
flags=TaskFlags.RELEASE_MEMORY if task.cleanup else TaskFlags.NONE cache_file=cache_file, flags=flags,
), timeout=1.0) ), timeout=1.0)
except Exception as e: except Exception as e:
self.log.warning(f'Adding to queue failed: {e!r}') self.log.warning(f'Adding to queue failed: {e!r}')
break break
if task.cleanup and not task.chunk_file:
del in_buffer[task.chunk_guid]
try: try:
task = self.tasks.popleft() task = self.tasks.popleft()
if isinstance(task, FileTask): if isinstance(task, FileTask):
@ -578,6 +589,13 @@ class DLManager(Process):
with shm_cond: with shm_cond:
shm_cond.notify() shm_cond.notify()
if res.flags & TaskFlags.CLEAR_CACHE:
self.log.debug(f'Removing cache {res.cache_file}')
try:
os.remove(os.path.join(self.cache_dir, res.cache_file))
except FileNotFoundError:
self.log.debug(f'Cache file {res.cache_file} not found, skipping removal.')
if res.chunk_guid: if res.chunk_guid:
self.bytes_written_since_last += res.size self.bytes_written_since_last += res.size
# if there's no shared memory we must have read from disk. # if there's no shared memory we must have read from disk.

View file

@ -170,6 +170,9 @@ class FileWorker(Process):
last_filename = '' last_filename = ''
current_file = None current_file = None
if not os.path.exists(self.cache_path):
os.makedirs(self.cache_path)
while True: while True:
try: try:
try: try:
@ -275,8 +278,6 @@ class FileWorker(Process):
current_file.write(self.shm.buf[shm_offset:shm_end]) current_file.write(self.shm.buf[shm_offset:shm_end])
elif j.cache_file: elif j.cache_file:
with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f: with open(os.path.join(self.cache_path, j.cache_file), 'rb') as f:
if j.chunk_offset:
f.seek(j.chunk_offset)
current_file.write(f.read(j.chunk_size)) current_file.write(f.read(j.chunk_size))
elif j.old_file: elif j.old_file:
with open(os.path.join(self.base_path, j.old_file), 'rb') as f: with open(os.path.join(self.base_path, j.old_file), 'rb') as f:

View file

@ -48,7 +48,9 @@ class ChunkTask:
chunk_guid: int chunk_guid: int
chunk_offset: int = 0 chunk_offset: int = 0
chunk_size: int = 0 chunk_size: int = 0
# Whether this chunk can be removed from memory/disk after having been written # Chunk should be added to the disk cache
cache: bool = False
# Whether this chunk can be removed from disk after having been written
cleanup: bool = False cleanup: bool = False
# Path to the file the chunk is read from (if not from memory) # Path to the file the chunk is read from (if not from memory)
chunk_file: Optional[str] = None chunk_file: Optional[str] = None
@ -64,6 +66,7 @@ class TaskFlags(Flag):
RELEASE_MEMORY = auto() RELEASE_MEMORY = auto()
MAKE_EXECUTABLE = auto() MAKE_EXECUTABLE = auto()
SILENT = auto() SILENT = auto()
CLEAR_CACHE = auto()
@dataclass @dataclass