[downloader] Greatly simplify download task creation

This is a change to something that was so massively stupid and
overcomplicated that I feel like I need to explain and justify myself:

After figuring out the format for manifests and spending countless
hours staring at IDA/Ghidra I kinda was sick of that, so I decided to
figure out what to do with the manifest myself by playing around with
it, which was also a lot more fun than looking through disassembly.

When looking at the chunks and files it quickly became obvious that the
way they're created is by concatenating all files into 1 MiB chunks that
can be downloaded and reassmebled (mostly) sequentially. What I did not
know was how the order of files in this "stream" was determined.

In playing around with it I came up with the old method: essentially
forming a chain of files, because each file's end generally pointed to
the start of the next file. And it worked great! At least until now...

Yesterday somebody alerted me to a game where this failed and it took me
a bit to figure out. Essentially the chaining had failed because
multiple files started at the same offset, but some of them would follow
another chain that never went back to the chunk it started at,
effectively skipping those files. This was rather annoying to deal with,
I came up with a workaround but it wasn't pretty. So I decided to jump
back into IDA/Ghidra and find out how Epic does it for real.

Well it took me a while, but thanks to symbols (yay macOS!) and a decent
decompiler in Ghidra even a noob like me was able to find it eventually.
The answer is as simple as it can be: the files are sorted alphabetically
(case-insensitive).

So really all I ever had to do was to sort files alphabetically and then
run through them to create the list of tasks.

I feel so stupid.

P.S.: I tested a few games and for the most part the resulting file
processing order is identical between the old and the new method. The
cases where it differs is when there's heavy de-duplication happening
(e.g. Diabotical's small model files) but the runtime cache size remains
the same so both methods are equally efficient, the old one just can't
handle certain cases.
This commit is contained in:
derrod 2020-04-30 09:43:17 +02:00
parent ef0ea26372
commit 8f7db143a6

View file

@ -44,7 +44,6 @@ class DLManager(Process):
# Analysis stuff
self.analysis = None
self.tasks = deque()
self.dl_cache_guids = set() # guids that should be cached
self.chunks_to_dl = deque()
self.chunk_data_list = None
@ -301,35 +300,16 @@ class DLManager(Process):
analysis_res.unchanged = len(mc.unchanged)
self.log.debug(f'{analysis_res.unchanged} unchanged files')
# count references to chunks for determining runtime cache size later
references = Counter()
chunkstream_starts = list()
# Chunks can have multiple entire files in them, the deque for a guid contains all files that start
# in that chunk (sorted by offset) so we can quickly and easily find the next link in the chunkstream.
# A nice side effect is that we can use this to check whether or not we missed something in the process.
chunk_to_file_map = defaultdict(deque)
# Find chunkstream starts and also count references to each chunk
# Note that this has to be sorted to ensure the file map will be in the correct order
self.log.debug('Looking for chunkstreams and counting references...')
for fm in sorted(manifest.file_manifest_list.elements,
key=lambda x: x.chunk_parts[0].offset if x.chunk_parts else 0):
if not fm.chunk_parts:
self.tasks.append(FileTask(fm.filename, empty=True))
continue
for fm in manifest.file_manifest_list.elements:
# chunks of unchanged files are not downloaded so we can skip them
if fm.filename in mc.unchanged:
analysis_res.unchanged += fm.file_size
continue
for index, cp in enumerate(fm.chunk_parts):
if index == 0:
chunk_to_file_map[cp.guid_num].append(fm)
if cp.offset == 0:
self.log.debug(f'Found chunk stream start: {fm.filename}, {fm.chunk_parts[0]}')
chunkstream_starts.append(fm.chunk_parts[0])
# do not add references in case the file is unchanged and we do not need to download it anyway
if fm.filename not in mc.unchanged:
references[cp.guid_num] += 1
for cp in fm.chunk_parts:
references[cp.guid_num] += 1
# determine reusable chunks and prepare lookup table for reusable ones
re_usable = defaultdict(dict)
@ -353,90 +333,81 @@ class DLManager(Process):
analysis_res.reuse_size += cp.size
last_cache_size = current_cache_size = 0
# set to determine whether a file is currently cached or not
cached = set()
# Using this secondary set is orders of magnitude faster than checking the deque.
chunks_in_dl_list = set()
# This is just used to count all unique guids that have been cached
dl_cache_guids = set()
# run through the chunkstreams and create the download jobs,
# also determine minimum runtime cache requirement.
# Yeah this is a bit of a mess but still runs extremely
# quickly even with tens of thousands of files/chunks
# run through the list of files and create the download jobs and also determine minimum
# runtime cache requirement by simulating adding/removing from cache during download.
self.log.debug('Creating filetasks and chunktasks...')
for next_chunk in chunkstream_starts:
self.log.debug(f'- Chunkstream start: {next_chunk!r}')
for current_file in sorted(manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower()):
# skip unchanged and empty files
if current_file.filename in mc.unchanged:
continue
elif not current_file.chunk_parts:
self.tasks.append(FileTask(current_file.filename, empty=True))
continue
while file_deque := chunk_to_file_map.get(next_chunk.guid_num):
current_file = file_deque.popleft()
existing_chunks = re_usable.get(current_file.filename, None)
chunk_tasks = []
reused = 0
if len(file_deque) == 0:
del chunk_to_file_map[next_chunk.guid_num]
for cp in current_file.chunk_parts:
ct = ChunkTask(cp.guid_num, cp.offset, cp.size)
# skip unchanged files
if current_file.filename in mc.unchanged:
# self.log.debug(f' + Skipping unchanged file: {current_file.filename}')
next_chunk = current_file.chunk_parts[-1]
continue
# re-use the chunk from the existing file if we can
if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks:
reused += 1
ct.chunk_file = current_file.filename
ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)]
else:
# add to DL list if not already in it
if cp.guid_num not in chunks_in_dl_list:
self.chunks_to_dl.append(cp.guid_num)
chunks_in_dl_list.add(cp.guid_num)
existing_chunks = re_usable.get(current_file.filename, None)
chunk_tasks = []
reused = 0
# if chunk has more than one use or is already in cache,
# check if we need to add or remove it again.
if references[cp.guid_num] > 1 or cp.guid_num in cached:
references[cp.guid_num] -= 1
for cp in current_file.chunk_parts:
ct = ChunkTask(cp.guid_num, cp.offset, cp.size)
# re-use the chunk from the existing file if we can
if existing_chunks and (cp.guid_num, cp.offset, cp.size) in existing_chunks:
reused += 1
ct.chunk_file = current_file.filename
ct.chunk_offset = existing_chunks[(cp.guid_num, cp.offset, cp.size)]
else:
# add to DL list if not already in it
if cp.guid_num not in chunks_in_dl_list:
self.chunks_to_dl.append(cp.guid_num)
chunks_in_dl_list.add(cp.guid_num)
# if chunk has more than one use or is already in cache,
# check if we need to add or remove it again.
if references[cp.guid_num] > 1 or cp.guid_num in cached:
references[cp.guid_num] -= 1
if references[cp.guid_num] < 1: # delete from cache again
current_cache_size -= analysis_res.biggest_chunk
cached.remove(cp.guid_num)
ct.cleanup = True
elif cp.guid_num not in cached: # add to cache
self.dl_cache_guids.add(cp.guid_num)
cached.add(cp.guid_num)
current_cache_size += analysis_res.biggest_chunk
else:
# delete from cache if no references left
if references[cp.guid_num] < 1:
current_cache_size -= analysis_res.biggest_chunk
cached.remove(cp.guid_num)
ct.cleanup = True
# add to cache if not already cached
elif cp.guid_num not in cached:
dl_cache_guids.add(cp.guid_num)
cached.add(cp.guid_num)
current_cache_size += analysis_res.biggest_chunk
else:
ct.cleanup = True
chunk_tasks.append(ct)
if reused:
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True))
else:
self.tasks.append(FileTask(current_file.filename, fopen=True))
chunk_tasks.append(ct)
if reused:
self.log.debug(f' + Reusing {reused} chunks from: {current_file.filename}')
# open temporary file that will contain download + old file contents
self.tasks.append(FileTask(current_file.filename + u'.tmp', fopen=True))
self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True))
# delete old file and rename temproary
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True,
temporary_filename=current_file.filename + u'.tmp'))
else:
self.tasks.append(FileTask(current_file.filename, fopen=True))
self.tasks.extend(chunk_tasks)
self.tasks.append(FileTask(current_file.filename, close=True))
if reused:
self.tasks.append(FileTask(current_file.filename + u'.tmp', close=True))
self.tasks.append(FileTask(current_file.filename, delete=True, rename=True,
temporary_filename=current_file.filename + u'.tmp'))
else:
self.tasks.append(FileTask(current_file.filename, close=True))
if current_cache_size > last_cache_size:
self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB')
last_cache_size = current_cache_size
next_chunk = current_file.chunk_parts[-1]
# If this is not empty something went horribly wrong.
if chunk_to_file_map:
raise ValueError('Some files were not processed:', chunk_to_file_map)
# check if runtime cache size has changed
if current_cache_size > last_cache_size:
self.log.debug(f' * New maximum cache size: {current_cache_size / 1024 / 1024:.02f} MiB')
last_cache_size = current_cache_size
self.log.debug(f'Final cache size requirement: {last_cache_size / 1024 / 1024} MiB.')
analysis_res.min_memory = last_cache_size + (1024 * 1024 * 32) # add some padding just to be safe
@ -457,7 +428,7 @@ class DLManager(Process):
for fname in mc.removed:
self.tasks.append(FileTask(fname, delete=True))
analysis_res.num_chunks_cache = len(self.dl_cache_guids)
analysis_res.num_chunks_cache = len(dl_cache_guids)
self.chunk_data_list = manifest.chunk_data_list
self.analysis = analysis_res