From bc5d12c91445a09015440f0c80f88f657928d8a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lidwin?= Date: Wed, 8 Apr 2026 18:41:44 +0200 Subject: [PATCH] feat: support encrypted manifests and chunks (#738) * feat: support encrypted manifests and chunks fix: ensure manifest_secrets is always defined in info fix: update all usage of DLManager with manifest secrets * chore: remove unused is_preloaded var --- legendary/cli.py | 24 +++- legendary/core.py | 33 +++-- legendary/downloader/mp/manager.py | 5 +- legendary/downloader/mp/workers.py | 5 +- legendary/lfs/lgndry.py | 4 +- legendary/models/chunk.py | 46 +++++-- legendary/models/game.py | 2 + legendary/models/manifest.py | 196 ++++++++++++++++++++++++++++- requirements.txt | 1 + setup.py | 3 +- 10 files changed, 280 insertions(+), 39 deletions(-) diff --git a/legendary/cli.py b/legendary/cli.py index 9d8430d..90a83c5 100644 --- a/legendary/cli.py +++ b/legendary/cli.py @@ -348,7 +348,7 @@ class LegendaryCLI: return elif args.app_name: args.app_name = self._resolve_aliases(args.app_name) - + manifest_secrets = dict() # check if we even need to log in if args.override_manifest: logger.info(f'Loading manifest from "{args.override_manifest}"') @@ -365,9 +365,12 @@ class LegendaryCLI: if not game: logger.fatal(f'Could not fetch metadata for "{args.app_name}" (check spelling/account ownership)') exit(1) - manifest_data, _ = self.core.get_cdn_manifest(game, platform=args.platform) + manifest_data, _, _, manifest_secrets = self.core.get_cdn_manifest(game, platform=args.platform) manifest = self.core.load_manifest(manifest_data) + if not manifest.decrypt(manifest_secrets): + logger.warning('Manifest key wasn\'t found. File names will be obfuscated') + files = sorted(manifest.file_manifest_list.elements, key=lambda a: a.filename.lower()) @@ -1224,6 +1227,8 @@ class LegendaryCLI: return manifest_data, _ = self.core.get_installed_manifest(args.app_name) + manifest_secrets = dict() + if manifest_data is None: if repair_mode: if not repair_online: @@ -1232,7 +1237,7 @@ class LegendaryCLI: logger.warning('No manifest could be loaded, the file may be missing. Downloading the latest manifest.') game = self.core.get_game(args.app_name, platform=igame.platform) - manifest_data, _ = self.core.get_cdn_manifest(game, igame.platform) + manifest_data, _, _, manifest_secrets = self.core.get_cdn_manifest(game, igame.platform) else: logger.critical(f'Manifest appears to be missing! To repair, run "legendary repair ' f'{args.app_name} --repair-and-update", this will however redownload all files ' @@ -1240,6 +1245,9 @@ class LegendaryCLI: return manifest = self.core.load_manifest(manifest_data) + if not manifest.decrypt(manifest_secrets): + logger.critical('Unable to decrypt the manifest. The key appears to be missing. Please report this on GitHub.') + return files = sorted(manifest.file_manifest_list.elements, key=lambda a: a.filename.lower()) @@ -1644,6 +1652,8 @@ class LegendaryCLI: manifest_data = None entitlements = None + is_preloaded = False + manifest_secrets = dict() # load installed manifest or URI if args.offline or manifest_uri: if app_name and self.core.is_installed(app_name): @@ -1663,8 +1673,7 @@ class LegendaryCLI: game.metadata = egl_meta # Get manifest if asset exists for current platform if args.platform in game.asset_infos: - manifest_data, _ = self.core.get_cdn_manifest(game, args.platform) - + manifest_data, _, is_preloaded, manifest_secrets = self.core.get_cdn_manifest(game, args.platform) if game: game_infos = info_items['game'] game_infos.append(InfoItem('App name', 'app_name', game.app_name, game.app_name)) @@ -1788,6 +1797,8 @@ class LegendaryCLI: if manifest_data: manifest_info = info_items['manifest'] manifest = self.core.load_manifest(manifest_data) + manifest.decrypt(manifest_secrets) + manifest_size = len(manifest_data) manifest_size_human = f'{manifest_size / 1024:.01f} KiB' manifest_info.append(InfoItem('Manifest size', 'size', manifest_size_human, manifest_size)) @@ -1796,6 +1807,8 @@ class LegendaryCLI: manifest_info.append(InfoItem('Manifest version', 'version', manifest.version, manifest.version)) manifest_info.append(InfoItem('Manifest feature level', 'feature_level', manifest.meta.feature_level, manifest.meta.feature_level)) + manifest_info.append(InfoItem('Manifest compressed', 'compressed', bool(manifest.compressed), bool(manifest.compressed))) + manifest_info.append(InfoItem('Manifest encrypted', 'encrypted', bool(manifest.encrypted), bool(manifest.encrypted))) manifest_info.append(InfoItem('Manifest app name', 'app_name', manifest.meta.app_name, manifest.meta.app_name)) manifest_info.append(InfoItem('Launch EXE', 'launch_exe', @@ -1892,6 +1905,7 @@ class LegendaryCLI: tag_disk_size_human or 'N/A', tag_disk_size)) manifest_info.append(InfoItem('Download size by install tag', 'tag_download_size', tag_download_size_human or 'N/A', tag_download_size)) + manifest_info.append(InfoItem('Is preload', 'is_preloaded', is_preloaded, is_preloaded)) if not args.json: def print_info_item(item: InfoItem): diff --git a/legendary/core.py b/legendary/core.py index 04344e3..f46375c 100644 --- a/legendary/core.py +++ b/legendary/core.py @@ -1253,6 +1253,8 @@ class LegendaryCore: raise ValueError('Manifest response has more than one element!') manifest_hash = m_api_r['elements'][0]['hash'] + manifest_is_preloaded: bool = m_api_r['elements'][0].get('isPreloaded') or False + manifest_secrets: dict = m_api_r['elements'][0].get('secrets') or dict() base_urls = [] manifest_urls = [] for manifest in m_api_r['elements'][0]['manifests']: @@ -1266,10 +1268,10 @@ class LegendaryCore: else: manifest_urls.append(manifest['uri']) - return manifest_urls, base_urls, manifest_hash + return manifest_urls, base_urls, manifest_hash, manifest_is_preloaded, manifest_secrets def get_cdn_manifest(self, game, platform='Windows', disable_https=False): - manifest_urls, base_urls, manifest_hash = self.get_cdn_urls(game, platform) + manifest_urls, base_urls, manifest_hash, manifest_is_preloaded, manifest_secrets = self.get_cdn_urls(game, platform) if not manifest_urls: raise ValueError('No manifest URLs returned by API') @@ -1297,7 +1299,7 @@ class LegendaryCore: if sha1(manifest_bytes).hexdigest() != manifest_hash: raise ValueError('Manifest sha hash mismatch!') - return manifest_bytes, base_urls + return manifest_bytes, base_urls, manifest_is_preloaded, manifest_secrets def get_uri_manifest(self, uri): if uri.startswith('http'): @@ -1359,11 +1361,13 @@ class LegendaryCore: if override_manifest: self.log.info(f'Overriding manifest with "{override_manifest}"') new_manifest_data, _base_urls = self.get_uri_manifest(override_manifest) + # FIXME: Populate manifest secrets + manifest_secrets = dict() # if override manifest has a base URL use that instead if _base_urls: base_urls = _base_urls else: - new_manifest_data, base_urls = self.get_cdn_manifest(game, platform, disable_https=disable_https) + new_manifest_data, base_urls, _, manifest_secrets = self.get_cdn_manifest(game, platform, disable_https=disable_https) # overwrite base urls in metadata with current ones to avoid using old/dead CDNs game.base_urls = base_urls # save base urls to game metadata @@ -1371,9 +1375,12 @@ class LegendaryCore: self.log.info('Parsing game manifest...') new_manifest = self.load_manifest(new_manifest_data) + if not new_manifest.decrypt(manifest_secrets): + raise ValueError('Decrypting manifest failed, key was missing, preloading isnt implemented yet') + self.log.debug(f'Base urls: {base_urls}') # save manifest with version name as well for testing/downgrading/etc. - self.lgd.save_manifest(game.app_name, new_manifest_data, + self.lgd.save_manifest(game.app_name, new_manifest, version=new_manifest.meta.build_version, platform=platform) @@ -1496,7 +1503,7 @@ class LegendaryCore: if not max_workers: max_workers = self.lgd.config.getint('Legendary', 'max_workers', fallback=0) - dlm = DLManager(install_path, base_url, resume_file=resume_file, status_q=status_q, + dlm = DLManager(install_path, base_url, manifest_secrets, resume_file=resume_file, status_q=status_q, max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers, dl_timeout=dl_timeout, bind_ip=bind_ip) anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest, @@ -1761,9 +1768,10 @@ class LegendaryCore: if not needs_verify: self.log.debug(f'No in-progress installation found, assuming complete...') + manifest_secrets = dict() if not manifest_data: self.log.info(f'Downloading latest manifest for "{game.app_name}"') - manifest_data, base_urls = self.get_cdn_manifest(game) + manifest_data, base_urls, _, manifest_secrets = self.get_cdn_manifest(game) if not game.base_urls: game.base_urls = base_urls self.lgd.set_game_meta(game.app_name, game) @@ -1773,7 +1781,8 @@ class LegendaryCore: # parse and save manifest to disk for verification step of import new_manifest = self.load_manifest(manifest_data) - self.lgd.save_manifest(game.app_name, manifest_data, + new_manifest.decrypt(manifest_secrets) + self.lgd.save_manifest(game.app_name, new_manifest, version=new_manifest.meta.build_version, platform=platform) install_size = sum(fm.file_size for fm in new_manifest.file_manifest_list.elements) @@ -1848,7 +1857,7 @@ class LegendaryCore: with open(manifest_filename, 'rb') as f: manifest_data = f.read() new_manifest = self.load_manifest(manifest_data) - self.lgd.save_manifest(lgd_igame.app_name, manifest_data, + self.lgd.save_manifest(lgd_igame.app_name, new_manifest, version=new_manifest.meta.build_version, platform='Windows') @@ -2040,7 +2049,7 @@ class LegendaryCore: if not self.logged_in: self.egs.start_session(client_credentials=True) - _manifest, base_urls = self.get_cdn_manifest(EOSOverlayApp) + _manifest, base_urls, _, manifest_secrets = self.get_cdn_manifest(EOSOverlayApp) manifest = self.load_manifest(_manifest) if igame := self.lgd.get_overlay_install_info(): @@ -2048,7 +2057,7 @@ class LegendaryCore: else: path = path or os.path.join(self.get_default_install_dir(), '.overlay') - dlm = DLManager(path, base_urls[0]) + dlm = DLManager(path, base_urls[0], manifest_secrets) analysis_result = dlm.run_analysis(manifest=manifest) install_size = analysis_result.install_size @@ -2097,7 +2106,7 @@ class LegendaryCore: if os.path.exists(path): raise FileExistsError(f'Bottle {bottle_name} already exists') - dlm = DLManager(path, base_url) + dlm = DLManager(path, base_url, dict()) analysis_result = dlm.run_analysis(manifest=manifest) install_size = analysis_result.install_size diff --git a/legendary/downloader/mp/manager.py b/legendary/downloader/mp/manager.py index 90ab37a..348ccda 100644 --- a/legendary/downloader/mp/manager.py +++ b/legendary/downloader/mp/manager.py @@ -20,7 +20,7 @@ from legendary.models.manifest import ManifestComparison, Manifest class DLManager(Process): - def __init__(self, download_dir, base_url, cache_dir=None, status_q=None, + def __init__(self, download_dir, base_url, manifest_secrets: dict, cache_dir=None, status_q=None, max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None, max_shared_memory=1024 * 1024 * 1024, bind_ip=None): super().__init__(name='DLManager') @@ -30,6 +30,7 @@ class DLManager(Process): self.base_url = base_url self.dl_dir = download_dir self.cache_dir = cache_dir or os.path.join(download_dir, '.cache') + self.manifest_secrets = manifest_secrets # All the queues! self.logging_queue = None @@ -666,7 +667,7 @@ class DLManager(Process): w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q, self.shared_memory.name, logging_queue=self.logging_queue, - dl_timeout=self.dl_timeout, bind_addr=bind_ip) + dl_timeout=self.dl_timeout, bind_addr=bind_ip, secrets=self.manifest_secrets) self.children.append(w) w.start() diff --git a/legendary/downloader/mp/workers.py b/legendary/downloader/mp/workers.py index 4b63192..7e5e3e2 100644 --- a/legendary/downloader/mp/workers.py +++ b/legendary/downloader/mp/workers.py @@ -35,10 +35,11 @@ class BindingHTTPAdapter(HTTPAdapter): class DLWorker(Process): def __init__(self, name, queue, out_queue, shm, max_retries=7, - logging_queue=None, dl_timeout=10, bind_addr=None): + logging_queue=None, dl_timeout=10, bind_addr=None, secrets=dict()): super().__init__(name=name) self.q = queue self.o_q = out_queue + self.secrets = secrets self.session = requests.session() self.session.headers.update({ 'User-Agent': 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit' @@ -107,7 +108,7 @@ class DLWorker(Process): continue else: compressed = len(r.content) - chunk = Chunk.read_buffer(r.content) + chunk = Chunk.read_buffer(r.content, self.secrets) break else: raise TimeoutError('Max retries reached') diff --git a/legendary/lfs/lgndry.py b/legendary/lfs/lgndry.py index 2e9b0bf..dfab258 100644 --- a/legendary/lfs/lgndry.py +++ b/legendary/lfs/lgndry.py @@ -235,9 +235,9 @@ class LGDLFS: except FileNotFoundError: # all other errors should propagate return None - def save_manifest(self, app_name, manifest_data, version, platform='Windows'): + def save_manifest(self, app_name, manifest, version, platform='Windows'): with open(self._get_manifest_filename(app_name, version, platform), 'wb') as f: - f.write(manifest_data) + manifest.write(f) def get_game_meta(self, app_name): if _meta := self._game_metadata.get(app_name, None): diff --git a/legendary/models/chunk.py b/legendary/models/chunk.py index 6510912..701e0a3 100644 --- a/legendary/models/chunk.py +++ b/legendary/models/chunk.py @@ -6,6 +6,7 @@ import zlib from hashlib import sha1 from io import BytesIO from uuid import uuid4 +from Cryptodome.Cipher import AES from legendary.utils.rolling_hash import get_hash @@ -26,6 +27,10 @@ class Chunk: self.sha_hash = None self.uncompressed_size = 1024 * 1024 + self.secret_guid = None + self.secret_key = None + self.encryption_tag = None + self._guid_str = '' self._guid_num = 0 self._bio = None @@ -35,12 +40,15 @@ class Chunk: def data(self): if self._data: return self._data - + + data = self._bio.read() + if self.encrypted: + cipher = AES.new(bytes.fromhex(self.secret_key), AES.MODE_GCM, nonce=self.sha_hash[:12]) + data = cipher.decrypt_and_verify(data, self.encryption_tag) if self.compressed: - self._data = zlib.decompress(self._bio.read()) - else: - self._data = self._bio.read() + data = zlib.decompress(data) + self._data = data # close BytesIO with raw data since we no longer need it self._bio.close() self._bio = None @@ -78,14 +86,18 @@ class Chunk: @property def compressed(self): return self.stored_as & 0x1 + + @property + def encrypted(self): + return self.stored_as & 0x2 @classmethod - def read_buffer(cls, data): + def read_buffer(cls, data, secrets): _sio = BytesIO(data) - return cls.read(_sio) + return cls.read(_sio, secrets) @classmethod - def read(cls, bio): + def read(cls, bio, secrets=dict()): head_start = bio.tell() if struct.unpack('= 3: _chunk.uncompressed_size = struct.unpack('= 4: + _chunk.secret_guid = struct.unpack('= 4 else 66 + bio.write(struct.pack('= 3: + bio.write(struct.pack('= 4: + bio.write(struct.pack('= 15: + if version >= 22: + return 'ChunksV5' + elif version >= 15: return 'ChunksV4' elif version >= 6: return 'ChunksV3' @@ -73,6 +77,8 @@ class Manifest: self.sha_hash = '' self.stored_as = 0 self.version = 18 + self.secret_guid = (0,0,0,0) + self.encryption_tag = b'' self.data = b'' # remainder @@ -80,11 +86,49 @@ class Manifest: self.chunk_data_list: Optional[CDL] = None self.file_manifest_list: Optional[FML] = None self.custom_fields: Optional[CustomFields] = None + self.encrypted_data: Optional[EncryptedData] = None @property def compressed(self): return self.stored_as & 0x1 + + @property + def encrypted(self): + return self.stored_as & 0x2 + + def decrypt(self, secrets): + if not self.encrypted: + return True + secret_str = ''.join('{:08X}'.format(guid) for guid in self.secret_guid) + secret = secrets.get(secret_str) + if secret is None: + return False + cipher = AES.new(bytes.fromhex(secret), AES.MODE_GCM, nonce=self.encrypted_data.encrypted_header.iv) + decrypted = cipher.decrypt_and_verify(self.encrypted_data.ciphertext, self.encryption_tag) + if self.encrypted_data.encrypted_header.compressed: + decrypted = zlib.decompress(decrypted) + bio = BytesIO(decrypted) + self.meta.launch_exe = read_fstring(bio) + self.meta.launch_command = read_fstring(bio) + prereq_count = struct.unpack('= 22: + _manifest.secret_guid = struct.unpack(' 21: + if target_version > 24: logger.warning(f'Trying to serialise an unknown target version: {target_version},' - f'clamping to 21.') - target_version = 21 + f'clamping to 24.') + target_version = 24 # Ensure metadata will be correct self.meta.feature_level = target_version @@ -164,6 +212,8 @@ class Manifest: self.chunk_data_list.write(body_bio) self.file_manifest_list.write(body_bio) self.custom_fields.write(body_bio) + if target_version >= 22: + self.encrypted_data.write(body_bio) self.data = body_bio.getvalue() self.size_uncompressed = self.size_compressed = len(self.data) @@ -183,6 +233,10 @@ class Manifest: bio.write(self.sha_hash) bio.write(struct.pack('B', self.stored_as)) bio.write(struct.pack('= 22: + bio.write(struct.pack('= 22: + for chunk in _cdl.elements: + chunk.secret_guid = struct.unpack('= 22: + for chunk in self.elements: + bio.write(struct.pack('= 22: + secret_b64 = base64.urlsafe_b64encode(struct.pack('=3.4'],