mirror of
https://github.com/derrod/legendary.git
synced 2026-05-07 06:23:22 +00:00
feat: support encrypted manifests and chunks
fix: ensure manifest_secrets is always defined in info fix: update all usage of DLManager with manifest secrets
This commit is contained in:
parent
b23d351678
commit
9f3762e80d
|
|
@ -348,7 +348,7 @@ class LegendaryCLI:
|
|||
return
|
||||
elif args.app_name:
|
||||
args.app_name = self._resolve_aliases(args.app_name)
|
||||
|
||||
manifest_secrets = dict()
|
||||
# check if we even need to log in
|
||||
if args.override_manifest:
|
||||
logger.info(f'Loading manifest from "{args.override_manifest}"')
|
||||
|
|
@ -365,9 +365,12 @@ class LegendaryCLI:
|
|||
if not game:
|
||||
logger.fatal(f'Could not fetch metadata for "{args.app_name}" (check spelling/account ownership)')
|
||||
exit(1)
|
||||
manifest_data, _ = self.core.get_cdn_manifest(game, platform=args.platform)
|
||||
manifest_data, _, _, manifest_secrets = self.core.get_cdn_manifest(game, platform=args.platform)
|
||||
|
||||
manifest = self.core.load_manifest(manifest_data)
|
||||
if not manifest.decrypt(manifest_secrets):
|
||||
logger.warning('Manifest key wasn\'t found. File names will be obfuscated')
|
||||
|
||||
files = sorted(manifest.file_manifest_list.elements,
|
||||
key=lambda a: a.filename.lower())
|
||||
|
||||
|
|
@ -1224,6 +1227,8 @@ class LegendaryCLI:
|
|||
return
|
||||
|
||||
manifest_data, _ = self.core.get_installed_manifest(args.app_name)
|
||||
manifest_secrets = dict()
|
||||
|
||||
if manifest_data is None:
|
||||
if repair_mode:
|
||||
if not repair_online:
|
||||
|
|
@ -1232,7 +1237,7 @@ class LegendaryCLI:
|
|||
|
||||
logger.warning('No manifest could be loaded, the file may be missing. Downloading the latest manifest.')
|
||||
game = self.core.get_game(args.app_name, platform=igame.platform)
|
||||
manifest_data, _ = self.core.get_cdn_manifest(game, igame.platform)
|
||||
manifest_data, _, _, manifest_secrets = self.core.get_cdn_manifest(game, igame.platform)
|
||||
else:
|
||||
logger.critical(f'Manifest appears to be missing! To repair, run "legendary repair '
|
||||
f'{args.app_name} --repair-and-update", this will however redownload all files '
|
||||
|
|
@ -1240,6 +1245,9 @@ class LegendaryCLI:
|
|||
return
|
||||
|
||||
manifest = self.core.load_manifest(manifest_data)
|
||||
if not manifest.decrypt(manifest_secrets):
|
||||
logger.critical('Unable to decrypt the manifest. The key appears to be missing. Please report this on GitHub.')
|
||||
return
|
||||
|
||||
files = sorted(manifest.file_manifest_list.elements,
|
||||
key=lambda a: a.filename.lower())
|
||||
|
|
@ -1644,6 +1652,8 @@ class LegendaryCLI:
|
|||
|
||||
manifest_data = None
|
||||
entitlements = None
|
||||
is_preloaded = False
|
||||
manifest_secrets = dict()
|
||||
# load installed manifest or URI
|
||||
if args.offline or manifest_uri:
|
||||
if app_name and self.core.is_installed(app_name):
|
||||
|
|
@ -1663,8 +1673,7 @@ class LegendaryCLI:
|
|||
game.metadata = egl_meta
|
||||
# Get manifest if asset exists for current platform
|
||||
if args.platform in game.asset_infos:
|
||||
manifest_data, _ = self.core.get_cdn_manifest(game, args.platform)
|
||||
|
||||
manifest_data, _, is_preloaded, manifest_secrets = self.core.get_cdn_manifest(game, args.platform)
|
||||
if game:
|
||||
game_infos = info_items['game']
|
||||
game_infos.append(InfoItem('App name', 'app_name', game.app_name, game.app_name))
|
||||
|
|
@ -1788,6 +1797,8 @@ class LegendaryCLI:
|
|||
if manifest_data:
|
||||
manifest_info = info_items['manifest']
|
||||
manifest = self.core.load_manifest(manifest_data)
|
||||
manifest.decrypt(manifest_secrets)
|
||||
|
||||
manifest_size = len(manifest_data)
|
||||
manifest_size_human = f'{manifest_size / 1024:.01f} KiB'
|
||||
manifest_info.append(InfoItem('Manifest size', 'size', manifest_size_human, manifest_size))
|
||||
|
|
@ -1796,6 +1807,8 @@ class LegendaryCLI:
|
|||
manifest_info.append(InfoItem('Manifest version', 'version', manifest.version, manifest.version))
|
||||
manifest_info.append(InfoItem('Manifest feature level', 'feature_level',
|
||||
manifest.meta.feature_level, manifest.meta.feature_level))
|
||||
manifest_info.append(InfoItem('Manifest compressed', 'compressed', bool(manifest.compressed), bool(manifest.compressed)))
|
||||
manifest_info.append(InfoItem('Manifest encrypted', 'encrypted', bool(manifest.encrypted), bool(manifest.encrypted)))
|
||||
manifest_info.append(InfoItem('Manifest app name', 'app_name', manifest.meta.app_name,
|
||||
manifest.meta.app_name))
|
||||
manifest_info.append(InfoItem('Launch EXE', 'launch_exe',
|
||||
|
|
@ -1892,6 +1905,7 @@ class LegendaryCLI:
|
|||
tag_disk_size_human or 'N/A', tag_disk_size))
|
||||
manifest_info.append(InfoItem('Download size by install tag', 'tag_download_size',
|
||||
tag_download_size_human or 'N/A', tag_download_size))
|
||||
manifest_info.append(InfoItem('Is preload', 'is_preloaded', is_preloaded, is_preloaded))
|
||||
|
||||
if not args.json:
|
||||
def print_info_item(item: InfoItem):
|
||||
|
|
|
|||
|
|
@ -1253,6 +1253,8 @@ class LegendaryCore:
|
|||
raise ValueError('Manifest response has more than one element!')
|
||||
|
||||
manifest_hash = m_api_r['elements'][0]['hash']
|
||||
manifest_is_preloaded: bool = m_api_r['elements'][0].get('isPreloaded') or False
|
||||
manifest_secrets: dict = m_api_r['elements'][0].get('secrets') or dict()
|
||||
base_urls = []
|
||||
manifest_urls = []
|
||||
for manifest in m_api_r['elements'][0]['manifests']:
|
||||
|
|
@ -1266,10 +1268,10 @@ class LegendaryCore:
|
|||
else:
|
||||
manifest_urls.append(manifest['uri'])
|
||||
|
||||
return manifest_urls, base_urls, manifest_hash
|
||||
return manifest_urls, base_urls, manifest_hash, manifest_is_preloaded, manifest_secrets
|
||||
|
||||
def get_cdn_manifest(self, game, platform='Windows', disable_https=False):
|
||||
manifest_urls, base_urls, manifest_hash = self.get_cdn_urls(game, platform)
|
||||
manifest_urls, base_urls, manifest_hash, manifest_is_preloaded, manifest_secrets = self.get_cdn_urls(game, platform)
|
||||
if not manifest_urls:
|
||||
raise ValueError('No manifest URLs returned by API')
|
||||
|
||||
|
|
@ -1297,7 +1299,7 @@ class LegendaryCore:
|
|||
if sha1(manifest_bytes).hexdigest() != manifest_hash:
|
||||
raise ValueError('Manifest sha hash mismatch!')
|
||||
|
||||
return manifest_bytes, base_urls
|
||||
return manifest_bytes, base_urls, manifest_is_preloaded, manifest_secrets
|
||||
|
||||
def get_uri_manifest(self, uri):
|
||||
if uri.startswith('http'):
|
||||
|
|
@ -1359,11 +1361,14 @@ class LegendaryCore:
|
|||
if override_manifest:
|
||||
self.log.info(f'Overriding manifest with "{override_manifest}"')
|
||||
new_manifest_data, _base_urls = self.get_uri_manifest(override_manifest)
|
||||
is_preloaded = False
|
||||
# FIXME: Populate manifest secrets
|
||||
manifest_secrets = dict()
|
||||
# if override manifest has a base URL use that instead
|
||||
if _base_urls:
|
||||
base_urls = _base_urls
|
||||
else:
|
||||
new_manifest_data, base_urls = self.get_cdn_manifest(game, platform, disable_https=disable_https)
|
||||
new_manifest_data, base_urls, _, manifest_secrets = self.get_cdn_manifest(game, platform, disable_https=disable_https)
|
||||
# overwrite base urls in metadata with current ones to avoid using old/dead CDNs
|
||||
game.base_urls = base_urls
|
||||
# save base urls to game metadata
|
||||
|
|
@ -1371,9 +1376,12 @@ class LegendaryCore:
|
|||
|
||||
self.log.info('Parsing game manifest...')
|
||||
new_manifest = self.load_manifest(new_manifest_data)
|
||||
if not new_manifest.decrypt(manifest_secrets):
|
||||
raise ValueError('Decrypting manifest failed, key was missing, preloading isnt implemented yet')
|
||||
|
||||
self.log.debug(f'Base urls: {base_urls}')
|
||||
# save manifest with version name as well for testing/downgrading/etc.
|
||||
self.lgd.save_manifest(game.app_name, new_manifest_data,
|
||||
self.lgd.save_manifest(game.app_name, new_manifest,
|
||||
version=new_manifest.meta.build_version,
|
||||
platform=platform)
|
||||
|
||||
|
|
@ -1496,7 +1504,7 @@ class LegendaryCore:
|
|||
if not max_workers:
|
||||
max_workers = self.lgd.config.getint('Legendary', 'max_workers', fallback=0)
|
||||
|
||||
dlm = DLManager(install_path, base_url, resume_file=resume_file, status_q=status_q,
|
||||
dlm = DLManager(install_path, base_url, manifest_secrets, resume_file=resume_file, status_q=status_q,
|
||||
max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers,
|
||||
dl_timeout=dl_timeout, bind_ip=bind_ip)
|
||||
anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest,
|
||||
|
|
@ -1761,9 +1769,10 @@ class LegendaryCore:
|
|||
if not needs_verify:
|
||||
self.log.debug(f'No in-progress installation found, assuming complete...')
|
||||
|
||||
manifest_secrets = dict()
|
||||
if not manifest_data:
|
||||
self.log.info(f'Downloading latest manifest for "{game.app_name}"')
|
||||
manifest_data, base_urls = self.get_cdn_manifest(game)
|
||||
manifest_data, base_urls, is_preloaded, manifest_secrets = self.get_cdn_manifest(game)
|
||||
if not game.base_urls:
|
||||
game.base_urls = base_urls
|
||||
self.lgd.set_game_meta(game.app_name, game)
|
||||
|
|
@ -1773,7 +1782,8 @@ class LegendaryCore:
|
|||
|
||||
# parse and save manifest to disk for verification step of import
|
||||
new_manifest = self.load_manifest(manifest_data)
|
||||
self.lgd.save_manifest(game.app_name, manifest_data,
|
||||
new_manifest.decrypt(manifest_secrets)
|
||||
self.lgd.save_manifest(game.app_name, new_manifest,
|
||||
version=new_manifest.meta.build_version, platform=platform)
|
||||
install_size = sum(fm.file_size for fm in new_manifest.file_manifest_list.elements)
|
||||
|
||||
|
|
@ -1848,7 +1858,7 @@ class LegendaryCore:
|
|||
with open(manifest_filename, 'rb') as f:
|
||||
manifest_data = f.read()
|
||||
new_manifest = self.load_manifest(manifest_data)
|
||||
self.lgd.save_manifest(lgd_igame.app_name, manifest_data,
|
||||
self.lgd.save_manifest(lgd_igame.app_name, new_manifest,
|
||||
version=new_manifest.meta.build_version,
|
||||
platform='Windows')
|
||||
|
||||
|
|
@ -2040,7 +2050,7 @@ class LegendaryCore:
|
|||
if not self.logged_in:
|
||||
self.egs.start_session(client_credentials=True)
|
||||
|
||||
_manifest, base_urls = self.get_cdn_manifest(EOSOverlayApp)
|
||||
_manifest, base_urls, _, manifest_secrets = self.get_cdn_manifest(EOSOverlayApp)
|
||||
manifest = self.load_manifest(_manifest)
|
||||
|
||||
if igame := self.lgd.get_overlay_install_info():
|
||||
|
|
@ -2048,7 +2058,7 @@ class LegendaryCore:
|
|||
else:
|
||||
path = path or os.path.join(self.get_default_install_dir(), '.overlay')
|
||||
|
||||
dlm = DLManager(path, base_urls[0])
|
||||
dlm = DLManager(path, base_urls[0], manifest_secrets)
|
||||
analysis_result = dlm.run_analysis(manifest=manifest)
|
||||
|
||||
install_size = analysis_result.install_size
|
||||
|
|
@ -2097,7 +2107,7 @@ class LegendaryCore:
|
|||
if os.path.exists(path):
|
||||
raise FileExistsError(f'Bottle {bottle_name} already exists')
|
||||
|
||||
dlm = DLManager(path, base_url)
|
||||
dlm = DLManager(path, base_url, dict())
|
||||
analysis_result = dlm.run_analysis(manifest=manifest)
|
||||
|
||||
install_size = analysis_result.install_size
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from legendary.models.manifest import ManifestComparison, Manifest
|
|||
|
||||
|
||||
class DLManager(Process):
|
||||
def __init__(self, download_dir, base_url, cache_dir=None, status_q=None,
|
||||
def __init__(self, download_dir, base_url, manifest_secrets: dict, cache_dir=None, status_q=None,
|
||||
max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None,
|
||||
max_shared_memory=1024 * 1024 * 1024, bind_ip=None):
|
||||
super().__init__(name='DLManager')
|
||||
|
|
@ -30,6 +30,7 @@ class DLManager(Process):
|
|||
self.base_url = base_url
|
||||
self.dl_dir = download_dir
|
||||
self.cache_dir = cache_dir or os.path.join(download_dir, '.cache')
|
||||
self.manifest_secrets = manifest_secrets
|
||||
|
||||
# All the queues!
|
||||
self.logging_queue = None
|
||||
|
|
@ -666,7 +667,7 @@ class DLManager(Process):
|
|||
|
||||
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
|
||||
self.shared_memory.name, logging_queue=self.logging_queue,
|
||||
dl_timeout=self.dl_timeout, bind_addr=bind_ip)
|
||||
dl_timeout=self.dl_timeout, bind_addr=bind_ip, secrets=self.manifest_secrets)
|
||||
self.children.append(w)
|
||||
w.start()
|
||||
|
||||
|
|
|
|||
|
|
@ -35,10 +35,11 @@ class BindingHTTPAdapter(HTTPAdapter):
|
|||
|
||||
class DLWorker(Process):
|
||||
def __init__(self, name, queue, out_queue, shm, max_retries=7,
|
||||
logging_queue=None, dl_timeout=10, bind_addr=None):
|
||||
logging_queue=None, dl_timeout=10, bind_addr=None, secrets=dict()):
|
||||
super().__init__(name=name)
|
||||
self.q = queue
|
||||
self.o_q = out_queue
|
||||
self.secrets = secrets
|
||||
self.session = requests.session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
|
||||
|
|
@ -107,7 +108,7 @@ class DLWorker(Process):
|
|||
continue
|
||||
else:
|
||||
compressed = len(r.content)
|
||||
chunk = Chunk.read_buffer(r.content)
|
||||
chunk = Chunk.read_buffer(r.content, self.secrets)
|
||||
break
|
||||
else:
|
||||
raise TimeoutError('Max retries reached')
|
||||
|
|
|
|||
|
|
@ -235,9 +235,9 @@ class LGDLFS:
|
|||
except FileNotFoundError: # all other errors should propagate
|
||||
return None
|
||||
|
||||
def save_manifest(self, app_name, manifest_data, version, platform='Windows'):
|
||||
def save_manifest(self, app_name, manifest, version, platform='Windows'):
|
||||
with open(self._get_manifest_filename(app_name, version, platform), 'wb') as f:
|
||||
f.write(manifest_data)
|
||||
manifest.write(f)
|
||||
|
||||
def get_game_meta(self, app_name):
|
||||
if _meta := self._game_metadata.get(app_name, None):
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import zlib
|
|||
from hashlib import sha1
|
||||
from io import BytesIO
|
||||
from uuid import uuid4
|
||||
from Cryptodome.Cipher import AES
|
||||
|
||||
from legendary.utils.rolling_hash import get_hash
|
||||
|
||||
|
|
@ -26,6 +27,10 @@ class Chunk:
|
|||
self.sha_hash = None
|
||||
self.uncompressed_size = 1024 * 1024
|
||||
|
||||
self.secret_guid = None
|
||||
self.secret_key = None
|
||||
self.encryption_tag = None
|
||||
|
||||
self._guid_str = ''
|
||||
self._guid_num = 0
|
||||
self._bio = None
|
||||
|
|
@ -35,12 +40,15 @@ class Chunk:
|
|||
def data(self):
|
||||
if self._data:
|
||||
return self._data
|
||||
|
||||
|
||||
data = self._bio.read()
|
||||
if self.encrypted:
|
||||
cipher = AES.new(bytes.fromhex(self.secret_key), AES.MODE_GCM, nonce=self.sha_hash[:12])
|
||||
data = cipher.decrypt_and_verify(data, self.encryption_tag)
|
||||
if self.compressed:
|
||||
self._data = zlib.decompress(self._bio.read())
|
||||
else:
|
||||
self._data = self._bio.read()
|
||||
data = zlib.decompress(data)
|
||||
|
||||
self._data = data
|
||||
# close BytesIO with raw data since we no longer need it
|
||||
self._bio.close()
|
||||
self._bio = None
|
||||
|
|
@ -78,14 +86,18 @@ class Chunk:
|
|||
@property
|
||||
def compressed(self):
|
||||
return self.stored_as & 0x1
|
||||
|
||||
@property
|
||||
def encrypted(self):
|
||||
return self.stored_as & 0x2
|
||||
|
||||
@classmethod
|
||||
def read_buffer(cls, data):
|
||||
def read_buffer(cls, data, secrets):
|
||||
_sio = BytesIO(data)
|
||||
return cls.read(_sio)
|
||||
return cls.read(_sio, secrets)
|
||||
|
||||
@classmethod
|
||||
def read(cls, bio):
|
||||
def read(cls, bio, secrets=dict()):
|
||||
head_start = bio.tell()
|
||||
|
||||
if struct.unpack('<I', bio.read(4))[0] != cls.header_magic:
|
||||
|
|
@ -106,6 +118,11 @@ class Chunk:
|
|||
|
||||
if _chunk.header_version >= 3:
|
||||
_chunk.uncompressed_size = struct.unpack('<I', bio.read(4))[0]
|
||||
|
||||
if _chunk.header_version >= 4:
|
||||
_chunk.secret_guid = struct.unpack('<IIII', bio.read(16))
|
||||
_chunk.secret_key = secrets.get(''.join('{:08X}'.format(g) for g in _chunk.secret_guid))
|
||||
_chunk.encryption_tag = bio.read(16)
|
||||
|
||||
if bio.tell() - head_start != _chunk.header_size:
|
||||
raise ValueError('Did not read entire chunk header!')
|
||||
|
|
@ -122,9 +139,10 @@ class Chunk:
|
|||
self.compressed_size = len(self._data)
|
||||
|
||||
bio.write(struct.pack('<I', self.header_magic))
|
||||
# we only serialize the latest version so version/size are hardcoded to 3/66
|
||||
bio.write(struct.pack('<I', 3))
|
||||
bio.write(struct.pack('<I', 66))
|
||||
# we only serialize the latest version so version/size are hardcoded to 4/98
|
||||
header_size = 98 if self.header_version >= 4 else 66
|
||||
bio.write(struct.pack('<I', self.header_version))
|
||||
bio.write(struct.pack('<I', header_size))
|
||||
bio.write(struct.pack('<I', self.compressed_size))
|
||||
bio.write(struct.pack('<IIII', *self.guid))
|
||||
bio.write(struct.pack('<Q', self.hash))
|
||||
|
|
@ -135,7 +153,13 @@ class Chunk:
|
|||
bio.write(struct.pack('B', self.hash_type))
|
||||
|
||||
# header version 3 stuff
|
||||
bio.write(struct.pack('<I', self.uncompressed_size))
|
||||
if self.header_version >= 3:
|
||||
bio.write(struct.pack('<I', self.uncompressed_size))
|
||||
|
||||
# header version 4
|
||||
if self.header_version >= 4:
|
||||
bio.write(struct.pack('<IIII', *self.secret_guid))
|
||||
bio.write(self.encryption_tag)
|
||||
|
||||
# finally, add the data
|
||||
bio.write(self._data)
|
||||
|
|
|
|||
|
|
@ -192,6 +192,7 @@ class InstalledGame:
|
|||
uninstaller: Optional[Dict] = None
|
||||
requires_ot: bool = False
|
||||
save_path: Optional[str] = None
|
||||
is_preloaded: bool = False
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json):
|
||||
|
|
@ -218,6 +219,7 @@ class InstalledGame:
|
|||
tmp.install_size = json.get('install_size', 0)
|
||||
tmp.egl_guid = json.get('egl_guid', '')
|
||||
tmp.install_tags = json.get('install_tags', [])
|
||||
tmp.is_preloaded = json.get('is_preloaded', False)
|
||||
return tmp
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,9 @@ from __future__ import annotations
|
|||
import hashlib
|
||||
import logging
|
||||
import struct
|
||||
import base64
|
||||
import zlib
|
||||
from Cryptodome.Cipher import AES
|
||||
|
||||
from base64 import b64encode
|
||||
from io import BytesIO
|
||||
|
|
@ -52,7 +54,9 @@ def write_fstring(bio, string):
|
|||
|
||||
def get_chunk_dir(version):
|
||||
# The lowest version I've ever seen was 12 (Unreal Tournament), but for completeness sake leave all of them in
|
||||
if version >= 15:
|
||||
if version >= 22:
|
||||
return 'ChunksV5'
|
||||
elif version >= 15:
|
||||
return 'ChunksV4'
|
||||
elif version >= 6:
|
||||
return 'ChunksV3'
|
||||
|
|
@ -73,6 +77,8 @@ class Manifest:
|
|||
self.sha_hash = ''
|
||||
self.stored_as = 0
|
||||
self.version = 18
|
||||
self.secret_guid = (0,0,0,0)
|
||||
self.encryption_tag = b''
|
||||
self.data = b''
|
||||
|
||||
# remainder
|
||||
|
|
@ -80,11 +86,49 @@ class Manifest:
|
|||
self.chunk_data_list: Optional[CDL] = None
|
||||
self.file_manifest_list: Optional[FML] = None
|
||||
self.custom_fields: Optional[CustomFields] = None
|
||||
self.encrypted_data: Optional[EncryptedData] = None
|
||||
|
||||
@property
|
||||
def compressed(self):
|
||||
return self.stored_as & 0x1
|
||||
|
||||
@property
|
||||
def encrypted(self):
|
||||
return self.stored_as & 0x2
|
||||
|
||||
def decrypt(self, secrets):
|
||||
if not self.encrypted:
|
||||
return True
|
||||
secret_str = ''.join('{:08X}'.format(guid) for guid in self.secret_guid)
|
||||
secret = secrets.get(secret_str)
|
||||
if secret is None:
|
||||
return False
|
||||
cipher = AES.new(bytes.fromhex(secret), AES.MODE_GCM, nonce=self.encrypted_data.encrypted_header.iv)
|
||||
decrypted = cipher.decrypt_and_verify(self.encrypted_data.ciphertext, self.encryption_tag)
|
||||
if self.encrypted_data.encrypted_header.compressed:
|
||||
decrypted = zlib.decompress(decrypted)
|
||||
|
||||
bio = BytesIO(decrypted)
|
||||
self.meta.launch_exe = read_fstring(bio)
|
||||
self.meta.launch_command = read_fstring(bio)
|
||||
prereq_count = struct.unpack('<I', bio.read(4))[0]
|
||||
self.meta.prereq_ids = [read_fstring(bio) for _ in range(prereq_count)]
|
||||
self.meta.prereq_name = read_fstring(bio)
|
||||
self.meta.prereq_path = read_fstring(bio)
|
||||
self.meta.prereq_args = read_fstring(bio)
|
||||
self.meta.uninstall_action_path = read_fstring(bio)
|
||||
self.meta.uninstall_action_args = read_fstring(bio)
|
||||
|
||||
for file in self.file_manifest_list.elements:
|
||||
file.filename = read_fstring(bio)
|
||||
file.symlink_target = read_fstring(bio)
|
||||
|
||||
self.stored_as ^= 0x2
|
||||
self.secret_guid = (0,0,0,0)
|
||||
self.encrypted_data.reset()
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def read_all(cls, data):
|
||||
_m = cls.read(data)
|
||||
|
|
@ -94,6 +138,7 @@ class Manifest:
|
|||
_m.chunk_data_list = CDL.read(_tmp, _m.meta.feature_level)
|
||||
_m.file_manifest_list = FML.read(_tmp)
|
||||
_m.custom_fields = CustomFields.read(_tmp)
|
||||
_m.encrypted_data = EncryptedData.read(_tmp, _m.meta.feature_level)
|
||||
|
||||
if unhandled_data := _tmp.read():
|
||||
logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! '
|
||||
|
|
@ -119,6 +164,9 @@ class Manifest:
|
|||
_manifest.sha_hash = bio.read(20)
|
||||
_manifest.stored_as = struct.unpack('B', bio.read(1))[0]
|
||||
_manifest.version = struct.unpack('<I', bio.read(4))[0]
|
||||
if _manifest.version >= 22:
|
||||
_manifest.secret_guid = struct.unpack('<IIII', bio.read(16))
|
||||
_manifest.encryption_tag = bio.read(16)
|
||||
|
||||
if bio.tell() != _manifest.header_size:
|
||||
logger.warning(f'Did not read entire header {bio.tell()} != {_manifest.header_size}! '
|
||||
|
|
@ -152,10 +200,10 @@ class Manifest:
|
|||
target_version = max(18, target_version)
|
||||
|
||||
# Downgrade manifest if unknown newer version
|
||||
if target_version > 21:
|
||||
if target_version > 24:
|
||||
logger.warning(f'Trying to serialise an unknown target version: {target_version},'
|
||||
f'clamping to 21.')
|
||||
target_version = 21
|
||||
f'clamping to 24.')
|
||||
target_version = 24
|
||||
|
||||
# Ensure metadata will be correct
|
||||
self.meta.feature_level = target_version
|
||||
|
|
@ -164,6 +212,8 @@ class Manifest:
|
|||
self.chunk_data_list.write(body_bio)
|
||||
self.file_manifest_list.write(body_bio)
|
||||
self.custom_fields.write(body_bio)
|
||||
if target_version >= 22:
|
||||
self.encrypted_data.write(body_bio)
|
||||
|
||||
self.data = body_bio.getvalue()
|
||||
self.size_uncompressed = self.size_compressed = len(self.data)
|
||||
|
|
@ -183,6 +233,10 @@ class Manifest:
|
|||
bio.write(self.sha_hash)
|
||||
bio.write(struct.pack('B', self.stored_as))
|
||||
bio.write(struct.pack('<I', target_version))
|
||||
if target_version >= 22:
|
||||
bio.write(struct.pack('<IIII', *self.secret_guid))
|
||||
bio.write(self.encryption_tag)
|
||||
|
||||
bio.write(self.data)
|
||||
|
||||
return bio.tell() if fp else bio.getvalue()
|
||||
|
|
@ -433,6 +487,16 @@ class CDL:
|
|||
for chunk in _cdl.elements:
|
||||
chunk.file_size = struct.unpack('<q', bio.read(8))[0]
|
||||
|
||||
if manifest_version >= 22:
|
||||
for chunk in _cdl.elements:
|
||||
chunk.secret_guid = struct.unpack('<IIII', bio.read(16))
|
||||
|
||||
for chunk in _cdl.elements:
|
||||
chunk.window_size_compressed = struct.unpack('<I', bio.read(4))[0]
|
||||
|
||||
for chunk in _cdl.elements:
|
||||
chunk.encryption_tag = bio.read(16)
|
||||
|
||||
if (size_read := bio.tell() - cdl_start) != _cdl.size:
|
||||
logger.warning(f'Did not read entire chunk data list! Version: {_cdl.version}, '
|
||||
f'{_cdl.size - size_read} bytes missing, skipping...')
|
||||
|
|
@ -461,6 +525,16 @@ class CDL:
|
|||
for chunk in self.elements:
|
||||
bio.write(struct.pack('<q', chunk.file_size))
|
||||
|
||||
if self._manifest_version >= 22:
|
||||
for chunk in self.elements:
|
||||
bio.write(struct.pack('<IIII', *chunk.secret_guid))
|
||||
|
||||
for chunk in self.elements:
|
||||
bio.write(struct.pack('<I', chunk.window_size_compressed))
|
||||
|
||||
for chunk in self.elements:
|
||||
bio.write(chunk.encryption_tag)
|
||||
|
||||
cdl_end = bio.tell()
|
||||
bio.seek(cdl_start)
|
||||
bio.write(struct.pack('<I', cdl_end - cdl_start))
|
||||
|
|
@ -474,6 +548,9 @@ class ChunkInfo:
|
|||
self.sha_hash = b''
|
||||
self.window_size = 0
|
||||
self.file_size = 0
|
||||
self.secret_guid = None
|
||||
self.window_size_compressed = 0
|
||||
self.encryption_tag = b''
|
||||
|
||||
self._manifest_version = manifest_version
|
||||
# caches for things that are "expensive" to compute
|
||||
|
|
@ -518,6 +595,13 @@ class ChunkInfo:
|
|||
|
||||
@property
|
||||
def path(self):
|
||||
if self._manifest_version >= 22:
|
||||
secret_b64 = base64.urlsafe_b64encode(struct.pack('<IIII', *self.secret_guid)).decode().strip('=')
|
||||
hash_b64 = base64.urlsafe_b64encode(struct.pack('<Q', self.hash)).decode().strip('=')
|
||||
guid_b64 = base64.urlsafe_b64encode(struct.pack('<IIII', *self.guid)).decode().strip('=')
|
||||
return '{}/{}/{:02d}/{}_{}.chunk'.format(
|
||||
get_chunk_dir(self._manifest_version), secret_b64,
|
||||
self.group_num, hash_b64, guid_b64)
|
||||
return '{}/{:02d}/{:016X}_{}.chunk'.format(
|
||||
get_chunk_dir(self._manifest_version), self.group_num,
|
||||
self.hash, ''.join('{:08X}'.format(g) for g in self.guid))
|
||||
|
|
@ -810,6 +894,110 @@ class CustomFields:
|
|||
bio.seek(cf_end)
|
||||
|
||||
|
||||
class EncryptedData:
|
||||
def __init__(self):
|
||||
self.size = 0
|
||||
self.version = 0
|
||||
|
||||
self.encrypted_header = EncryptedDataHeader()
|
||||
self.ciphertext = b''
|
||||
|
||||
def reset(self):
|
||||
self.size = 0
|
||||
self.version = 0
|
||||
self.ciphertext = b''
|
||||
self.encrypted_header = EncryptedDataHeader()
|
||||
|
||||
@classmethod
|
||||
def read(cls, bio, feature_level):
|
||||
_ed = cls()
|
||||
if feature_level < 24:
|
||||
return _ed
|
||||
|
||||
ed_start = bio.tell()
|
||||
_ed.size = struct.unpack('<I', bio.read(4))[0]
|
||||
_ed.version = struct.unpack('B', bio.read(1))[0]
|
||||
cipher_size = struct.unpack('<I', bio.read(4))[0]
|
||||
data = BytesIO(bio.read(cipher_size))
|
||||
_ed.encrypted_header = EncryptedDataHeader.read(data)
|
||||
ciphertext_size = struct.unpack('<I', data.read(4))[0]
|
||||
_ed.ciphertext = data.read(ciphertext_size)
|
||||
|
||||
if (size_read := bio.tell() - ed_start) != _ed.size:
|
||||
logger.warning(f'Did not read entire encrypted data part! Version: {_ed.version}, '
|
||||
f'{_ed.size - size_read} bytes missing, skipping...')
|
||||
bio.seek(_ed.size - size_read, 1)
|
||||
_ed.version = 0
|
||||
return _ed
|
||||
|
||||
def write(self, bio):
|
||||
ed_start = bio.tell()
|
||||
bio.write(struct.pack('<I', 0)) # placeholder size
|
||||
bio.write(struct.pack('B', self.version))
|
||||
|
||||
# FIXME: Ensure all sizes are properly set in the header
|
||||
cipher_start = bio.tell()
|
||||
bio.write(struct.pack('<I', 0)) # cipher size
|
||||
self.encrypted_header.write(bio)
|
||||
bio.write(struct.pack('<I', len(self.ciphertext)))
|
||||
bio.write(self.ciphertext)
|
||||
|
||||
ed_end = bio.tell()
|
||||
bio.seek(cipher_start)
|
||||
bio.write(struct.pack('<I', ed_end - cipher_start))
|
||||
bio.seek(ed_start)
|
||||
bio.write(struct.pack('<I', ed_end - ed_start))
|
||||
bio.seek(ed_end)
|
||||
|
||||
class EncryptedDataHeader:
|
||||
def __init__(self):
|
||||
self.size = 0
|
||||
self.version = 0
|
||||
self.stored_as = 0
|
||||
self.data_uncompressed = 0
|
||||
self.data_compressed = 0
|
||||
self.iv = b''
|
||||
|
||||
@property
|
||||
def compressed(self):
|
||||
return self.stored_as & 0x1
|
||||
|
||||
@classmethod
|
||||
def read(cls, bio):
|
||||
_edh = cls()
|
||||
|
||||
edh_start = bio.tell()
|
||||
_edh.size = struct.unpack('<I', bio.read(4))[0]
|
||||
_edh.version = struct.unpack('<I', bio.read(4))[0]
|
||||
_edh.stored_as = struct.unpack('B', bio.read(1))[0]
|
||||
_edh.data_uncompressed = struct.unpack('<I', bio.read(4))[0]
|
||||
_edh.data_compressed = struct.unpack('<I', bio.read(4))[0]
|
||||
iv_len = struct.unpack('<I', bio.read(4))[0]
|
||||
_edh.iv = bio.read(iv_len)
|
||||
|
||||
if (size_read := bio.tell() - edh_start) != _edh.size:
|
||||
logger.warning(f'Did not read entire encrypted header data part! Version: {_edh.version}, '
|
||||
f'{_edh.size - size_read} bytes missing, skipping...')
|
||||
bio.seek(_edh.size - size_read, 1)
|
||||
_edh.version = 0
|
||||
|
||||
return _edh
|
||||
|
||||
def write(self, bio):
|
||||
edh_start = bio.tell()
|
||||
bio.write(struct.pack('<I', 0))
|
||||
bio.write(struct.pack('<I', self.version))
|
||||
bio.write(struct.pack('B', self.stored_as))
|
||||
bio.write(struct.pack('<I', self.data_uncompressed))
|
||||
bio.write(struct.pack('<I', self.data_compressed))
|
||||
bio.write(struct.pack('<I', len(self.iv)))
|
||||
bio.write(self.iv)
|
||||
|
||||
end = bio.tell()
|
||||
bio.seek(edh_start)
|
||||
bio.write(struct.pack('<I', end - edh_start))
|
||||
bio.seek(end)
|
||||
|
||||
class ManifestComparison:
|
||||
def __init__(self):
|
||||
self.added = set()
|
||||
|
|
|
|||
|
|
@ -1,2 +1,3 @@
|
|||
requests<3.0
|
||||
filelock
|
||||
pycryptodomex
|
||||
Loading…
Reference in a new issue