feat: support encrypted manifests and chunks (#738)

* feat: support encrypted manifests and chunks

fix: ensure manifest_secrets is always defined in info

fix: update all usage of DLManager with manifest secrets

* chore: remove unused is_preloaded var
This commit is contained in:
Paweł Lidwin 2026-04-08 18:41:44 +02:00 committed by GitHub
parent cdf6645d5c
commit bc5d12c914
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 280 additions and 39 deletions

View file

@ -348,7 +348,7 @@ class LegendaryCLI:
return
elif args.app_name:
args.app_name = self._resolve_aliases(args.app_name)
manifest_secrets = dict()
# check if we even need to log in
if args.override_manifest:
logger.info(f'Loading manifest from "{args.override_manifest}"')
@ -365,9 +365,12 @@ class LegendaryCLI:
if not game:
logger.fatal(f'Could not fetch metadata for "{args.app_name}" (check spelling/account ownership)')
exit(1)
manifest_data, _ = self.core.get_cdn_manifest(game, platform=args.platform)
manifest_data, _, _, manifest_secrets = self.core.get_cdn_manifest(game, platform=args.platform)
manifest = self.core.load_manifest(manifest_data)
if not manifest.decrypt(manifest_secrets):
logger.warning('Manifest key wasn\'t found. File names will be obfuscated')
files = sorted(manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower())
@ -1224,6 +1227,8 @@ class LegendaryCLI:
return
manifest_data, _ = self.core.get_installed_manifest(args.app_name)
manifest_secrets = dict()
if manifest_data is None:
if repair_mode:
if not repair_online:
@ -1232,7 +1237,7 @@ class LegendaryCLI:
logger.warning('No manifest could be loaded, the file may be missing. Downloading the latest manifest.')
game = self.core.get_game(args.app_name, platform=igame.platform)
manifest_data, _ = self.core.get_cdn_manifest(game, igame.platform)
manifest_data, _, _, manifest_secrets = self.core.get_cdn_manifest(game, igame.platform)
else:
logger.critical(f'Manifest appears to be missing! To repair, run "legendary repair '
f'{args.app_name} --repair-and-update", this will however redownload all files '
@ -1240,6 +1245,9 @@ class LegendaryCLI:
return
manifest = self.core.load_manifest(manifest_data)
if not manifest.decrypt(manifest_secrets):
logger.critical('Unable to decrypt the manifest. The key appears to be missing. Please report this on GitHub.')
return
files = sorted(manifest.file_manifest_list.elements,
key=lambda a: a.filename.lower())
@ -1644,6 +1652,8 @@ class LegendaryCLI:
manifest_data = None
entitlements = None
is_preloaded = False
manifest_secrets = dict()
# load installed manifest or URI
if args.offline or manifest_uri:
if app_name and self.core.is_installed(app_name):
@ -1663,8 +1673,7 @@ class LegendaryCLI:
game.metadata = egl_meta
# Get manifest if asset exists for current platform
if args.platform in game.asset_infos:
manifest_data, _ = self.core.get_cdn_manifest(game, args.platform)
manifest_data, _, is_preloaded, manifest_secrets = self.core.get_cdn_manifest(game, args.platform)
if game:
game_infos = info_items['game']
game_infos.append(InfoItem('App name', 'app_name', game.app_name, game.app_name))
@ -1788,6 +1797,8 @@ class LegendaryCLI:
if manifest_data:
manifest_info = info_items['manifest']
manifest = self.core.load_manifest(manifest_data)
manifest.decrypt(manifest_secrets)
manifest_size = len(manifest_data)
manifest_size_human = f'{manifest_size / 1024:.01f} KiB'
manifest_info.append(InfoItem('Manifest size', 'size', manifest_size_human, manifest_size))
@ -1796,6 +1807,8 @@ class LegendaryCLI:
manifest_info.append(InfoItem('Manifest version', 'version', manifest.version, manifest.version))
manifest_info.append(InfoItem('Manifest feature level', 'feature_level',
manifest.meta.feature_level, manifest.meta.feature_level))
manifest_info.append(InfoItem('Manifest compressed', 'compressed', bool(manifest.compressed), bool(manifest.compressed)))
manifest_info.append(InfoItem('Manifest encrypted', 'encrypted', bool(manifest.encrypted), bool(manifest.encrypted)))
manifest_info.append(InfoItem('Manifest app name', 'app_name', manifest.meta.app_name,
manifest.meta.app_name))
manifest_info.append(InfoItem('Launch EXE', 'launch_exe',
@ -1892,6 +1905,7 @@ class LegendaryCLI:
tag_disk_size_human or 'N/A', tag_disk_size))
manifest_info.append(InfoItem('Download size by install tag', 'tag_download_size',
tag_download_size_human or 'N/A', tag_download_size))
manifest_info.append(InfoItem('Is preload', 'is_preloaded', is_preloaded, is_preloaded))
if not args.json:
def print_info_item(item: InfoItem):

View file

@ -1253,6 +1253,8 @@ class LegendaryCore:
raise ValueError('Manifest response has more than one element!')
manifest_hash = m_api_r['elements'][0]['hash']
manifest_is_preloaded: bool = m_api_r['elements'][0].get('isPreloaded') or False
manifest_secrets: dict = m_api_r['elements'][0].get('secrets') or dict()
base_urls = []
manifest_urls = []
for manifest in m_api_r['elements'][0]['manifests']:
@ -1266,10 +1268,10 @@ class LegendaryCore:
else:
manifest_urls.append(manifest['uri'])
return manifest_urls, base_urls, manifest_hash
return manifest_urls, base_urls, manifest_hash, manifest_is_preloaded, manifest_secrets
def get_cdn_manifest(self, game, platform='Windows', disable_https=False):
manifest_urls, base_urls, manifest_hash = self.get_cdn_urls(game, platform)
manifest_urls, base_urls, manifest_hash, manifest_is_preloaded, manifest_secrets = self.get_cdn_urls(game, platform)
if not manifest_urls:
raise ValueError('No manifest URLs returned by API')
@ -1297,7 +1299,7 @@ class LegendaryCore:
if sha1(manifest_bytes).hexdigest() != manifest_hash:
raise ValueError('Manifest sha hash mismatch!')
return manifest_bytes, base_urls
return manifest_bytes, base_urls, manifest_is_preloaded, manifest_secrets
def get_uri_manifest(self, uri):
if uri.startswith('http'):
@ -1359,11 +1361,13 @@ class LegendaryCore:
if override_manifest:
self.log.info(f'Overriding manifest with "{override_manifest}"')
new_manifest_data, _base_urls = self.get_uri_manifest(override_manifest)
# FIXME: Populate manifest secrets
manifest_secrets = dict()
# if override manifest has a base URL use that instead
if _base_urls:
base_urls = _base_urls
else:
new_manifest_data, base_urls = self.get_cdn_manifest(game, platform, disable_https=disable_https)
new_manifest_data, base_urls, _, manifest_secrets = self.get_cdn_manifest(game, platform, disable_https=disable_https)
# overwrite base urls in metadata with current ones to avoid using old/dead CDNs
game.base_urls = base_urls
# save base urls to game metadata
@ -1371,9 +1375,12 @@ class LegendaryCore:
self.log.info('Parsing game manifest...')
new_manifest = self.load_manifest(new_manifest_data)
if not new_manifest.decrypt(manifest_secrets):
raise ValueError('Decrypting manifest failed, key was missing, preloading isnt implemented yet')
self.log.debug(f'Base urls: {base_urls}')
# save manifest with version name as well for testing/downgrading/etc.
self.lgd.save_manifest(game.app_name, new_manifest_data,
self.lgd.save_manifest(game.app_name, new_manifest,
version=new_manifest.meta.build_version,
platform=platform)
@ -1496,7 +1503,7 @@ class LegendaryCore:
if not max_workers:
max_workers = self.lgd.config.getint('Legendary', 'max_workers', fallback=0)
dlm = DLManager(install_path, base_url, resume_file=resume_file, status_q=status_q,
dlm = DLManager(install_path, base_url, manifest_secrets, resume_file=resume_file, status_q=status_q,
max_shared_memory=max_shm * 1024 * 1024, max_workers=max_workers,
dl_timeout=dl_timeout, bind_ip=bind_ip)
anlres = dlm.run_analysis(manifest=new_manifest, old_manifest=old_manifest,
@ -1761,9 +1768,10 @@ class LegendaryCore:
if not needs_verify:
self.log.debug(f'No in-progress installation found, assuming complete...')
manifest_secrets = dict()
if not manifest_data:
self.log.info(f'Downloading latest manifest for "{game.app_name}"')
manifest_data, base_urls = self.get_cdn_manifest(game)
manifest_data, base_urls, _, manifest_secrets = self.get_cdn_manifest(game)
if not game.base_urls:
game.base_urls = base_urls
self.lgd.set_game_meta(game.app_name, game)
@ -1773,7 +1781,8 @@ class LegendaryCore:
# parse and save manifest to disk for verification step of import
new_manifest = self.load_manifest(manifest_data)
self.lgd.save_manifest(game.app_name, manifest_data,
new_manifest.decrypt(manifest_secrets)
self.lgd.save_manifest(game.app_name, new_manifest,
version=new_manifest.meta.build_version, platform=platform)
install_size = sum(fm.file_size for fm in new_manifest.file_manifest_list.elements)
@ -1848,7 +1857,7 @@ class LegendaryCore:
with open(manifest_filename, 'rb') as f:
manifest_data = f.read()
new_manifest = self.load_manifest(manifest_data)
self.lgd.save_manifest(lgd_igame.app_name, manifest_data,
self.lgd.save_manifest(lgd_igame.app_name, new_manifest,
version=new_manifest.meta.build_version,
platform='Windows')
@ -2040,7 +2049,7 @@ class LegendaryCore:
if not self.logged_in:
self.egs.start_session(client_credentials=True)
_manifest, base_urls = self.get_cdn_manifest(EOSOverlayApp)
_manifest, base_urls, _, manifest_secrets = self.get_cdn_manifest(EOSOverlayApp)
manifest = self.load_manifest(_manifest)
if igame := self.lgd.get_overlay_install_info():
@ -2048,7 +2057,7 @@ class LegendaryCore:
else:
path = path or os.path.join(self.get_default_install_dir(), '.overlay')
dlm = DLManager(path, base_urls[0])
dlm = DLManager(path, base_urls[0], manifest_secrets)
analysis_result = dlm.run_analysis(manifest=manifest)
install_size = analysis_result.install_size
@ -2097,7 +2106,7 @@ class LegendaryCore:
if os.path.exists(path):
raise FileExistsError(f'Bottle {bottle_name} already exists')
dlm = DLManager(path, base_url)
dlm = DLManager(path, base_url, dict())
analysis_result = dlm.run_analysis(manifest=manifest)
install_size = analysis_result.install_size

View file

@ -20,7 +20,7 @@ from legendary.models.manifest import ManifestComparison, Manifest
class DLManager(Process):
def __init__(self, download_dir, base_url, cache_dir=None, status_q=None,
def __init__(self, download_dir, base_url, manifest_secrets: dict, cache_dir=None, status_q=None,
max_workers=0, update_interval=1.0, dl_timeout=10, resume_file=None,
max_shared_memory=1024 * 1024 * 1024, bind_ip=None):
super().__init__(name='DLManager')
@ -30,6 +30,7 @@ class DLManager(Process):
self.base_url = base_url
self.dl_dir = download_dir
self.cache_dir = cache_dir or os.path.join(download_dir, '.cache')
self.manifest_secrets = manifest_secrets
# All the queues!
self.logging_queue = None
@ -666,7 +667,7 @@ class DLManager(Process):
w = DLWorker(f'DLWorker {i + 1}', self.dl_worker_queue, self.dl_result_q,
self.shared_memory.name, logging_queue=self.logging_queue,
dl_timeout=self.dl_timeout, bind_addr=bind_ip)
dl_timeout=self.dl_timeout, bind_addr=bind_ip, secrets=self.manifest_secrets)
self.children.append(w)
w.start()

View file

@ -35,10 +35,11 @@ class BindingHTTPAdapter(HTTPAdapter):
class DLWorker(Process):
def __init__(self, name, queue, out_queue, shm, max_retries=7,
logging_queue=None, dl_timeout=10, bind_addr=None):
logging_queue=None, dl_timeout=10, bind_addr=None, secrets=dict()):
super().__init__(name=name)
self.q = queue
self.o_q = out_queue
self.secrets = secrets
self.session = requests.session()
self.session.headers.update({
'User-Agent': 'EpicGamesLauncher/11.0.1-14907503+++Portal+Release-Live Windows/10.0.19041.1.256.64bit'
@ -107,7 +108,7 @@ class DLWorker(Process):
continue
else:
compressed = len(r.content)
chunk = Chunk.read_buffer(r.content)
chunk = Chunk.read_buffer(r.content, self.secrets)
break
else:
raise TimeoutError('Max retries reached')

View file

@ -235,9 +235,9 @@ class LGDLFS:
except FileNotFoundError: # all other errors should propagate
return None
def save_manifest(self, app_name, manifest_data, version, platform='Windows'):
def save_manifest(self, app_name, manifest, version, platform='Windows'):
with open(self._get_manifest_filename(app_name, version, platform), 'wb') as f:
f.write(manifest_data)
manifest.write(f)
def get_game_meta(self, app_name):
if _meta := self._game_metadata.get(app_name, None):

View file

@ -6,6 +6,7 @@ import zlib
from hashlib import sha1
from io import BytesIO
from uuid import uuid4
from Cryptodome.Cipher import AES
from legendary.utils.rolling_hash import get_hash
@ -26,6 +27,10 @@ class Chunk:
self.sha_hash = None
self.uncompressed_size = 1024 * 1024
self.secret_guid = None
self.secret_key = None
self.encryption_tag = None
self._guid_str = ''
self._guid_num = 0
self._bio = None
@ -35,12 +40,15 @@ class Chunk:
def data(self):
if self._data:
return self._data
data = self._bio.read()
if self.encrypted:
cipher = AES.new(bytes.fromhex(self.secret_key), AES.MODE_GCM, nonce=self.sha_hash[:12])
data = cipher.decrypt_and_verify(data, self.encryption_tag)
if self.compressed:
self._data = zlib.decompress(self._bio.read())
else:
self._data = self._bio.read()
data = zlib.decompress(data)
self._data = data
# close BytesIO with raw data since we no longer need it
self._bio.close()
self._bio = None
@ -78,14 +86,18 @@ class Chunk:
@property
def compressed(self):
return self.stored_as & 0x1
@property
def encrypted(self):
return self.stored_as & 0x2
@classmethod
def read_buffer(cls, data):
def read_buffer(cls, data, secrets):
_sio = BytesIO(data)
return cls.read(_sio)
return cls.read(_sio, secrets)
@classmethod
def read(cls, bio):
def read(cls, bio, secrets=dict()):
head_start = bio.tell()
if struct.unpack('<I', bio.read(4))[0] != cls.header_magic:
@ -106,6 +118,11 @@ class Chunk:
if _chunk.header_version >= 3:
_chunk.uncompressed_size = struct.unpack('<I', bio.read(4))[0]
if _chunk.header_version >= 4:
_chunk.secret_guid = struct.unpack('<IIII', bio.read(16))
_chunk.secret_key = secrets.get(''.join('{:08X}'.format(g) for g in _chunk.secret_guid))
_chunk.encryption_tag = bio.read(16)
if bio.tell() - head_start != _chunk.header_size:
raise ValueError('Did not read entire chunk header!')
@ -122,9 +139,10 @@ class Chunk:
self.compressed_size = len(self._data)
bio.write(struct.pack('<I', self.header_magic))
# we only serialize the latest version so version/size are hardcoded to 3/66
bio.write(struct.pack('<I', 3))
bio.write(struct.pack('<I', 66))
# we only serialize the latest version so version/size are hardcoded to 4/98
header_size = 98 if self.header_version >= 4 else 66
bio.write(struct.pack('<I', self.header_version))
bio.write(struct.pack('<I', header_size))
bio.write(struct.pack('<I', self.compressed_size))
bio.write(struct.pack('<IIII', *self.guid))
bio.write(struct.pack('<Q', self.hash))
@ -135,7 +153,13 @@ class Chunk:
bio.write(struct.pack('B', self.hash_type))
# header version 3 stuff
bio.write(struct.pack('<I', self.uncompressed_size))
if self.header_version >= 3:
bio.write(struct.pack('<I', self.uncompressed_size))
# header version 4
if self.header_version >= 4:
bio.write(struct.pack('<IIII', *self.secret_guid))
bio.write(self.encryption_tag)
# finally, add the data
bio.write(self._data)

View file

@ -192,6 +192,7 @@ class InstalledGame:
uninstaller: Optional[Dict] = None
requires_ot: bool = False
save_path: Optional[str] = None
is_preloaded: bool = False
@classmethod
def from_json(cls, json):
@ -218,6 +219,7 @@ class InstalledGame:
tmp.install_size = json.get('install_size', 0)
tmp.egl_guid = json.get('egl_guid', '')
tmp.install_tags = json.get('install_tags', [])
tmp.is_preloaded = json.get('is_preloaded', False)
return tmp

View file

@ -5,7 +5,9 @@ from __future__ import annotations
import hashlib
import logging
import struct
import base64
import zlib
from Cryptodome.Cipher import AES
from base64 import b64encode
from io import BytesIO
@ -52,7 +54,9 @@ def write_fstring(bio, string):
def get_chunk_dir(version):
# The lowest version I've ever seen was 12 (Unreal Tournament), but for completeness sake leave all of them in
if version >= 15:
if version >= 22:
return 'ChunksV5'
elif version >= 15:
return 'ChunksV4'
elif version >= 6:
return 'ChunksV3'
@ -73,6 +77,8 @@ class Manifest:
self.sha_hash = ''
self.stored_as = 0
self.version = 18
self.secret_guid = (0,0,0,0)
self.encryption_tag = b''
self.data = b''
# remainder
@ -80,11 +86,49 @@ class Manifest:
self.chunk_data_list: Optional[CDL] = None
self.file_manifest_list: Optional[FML] = None
self.custom_fields: Optional[CustomFields] = None
self.encrypted_data: Optional[EncryptedData] = None
@property
def compressed(self):
return self.stored_as & 0x1
@property
def encrypted(self):
return self.stored_as & 0x2
def decrypt(self, secrets):
if not self.encrypted:
return True
secret_str = ''.join('{:08X}'.format(guid) for guid in self.secret_guid)
secret = secrets.get(secret_str)
if secret is None:
return False
cipher = AES.new(bytes.fromhex(secret), AES.MODE_GCM, nonce=self.encrypted_data.encrypted_header.iv)
decrypted = cipher.decrypt_and_verify(self.encrypted_data.ciphertext, self.encryption_tag)
if self.encrypted_data.encrypted_header.compressed:
decrypted = zlib.decompress(decrypted)
bio = BytesIO(decrypted)
self.meta.launch_exe = read_fstring(bio)
self.meta.launch_command = read_fstring(bio)
prereq_count = struct.unpack('<I', bio.read(4))[0]
self.meta.prereq_ids = [read_fstring(bio) for _ in range(prereq_count)]
self.meta.prereq_name = read_fstring(bio)
self.meta.prereq_path = read_fstring(bio)
self.meta.prereq_args = read_fstring(bio)
self.meta.uninstall_action_path = read_fstring(bio)
self.meta.uninstall_action_args = read_fstring(bio)
for file in self.file_manifest_list.elements:
file.filename = read_fstring(bio)
file.symlink_target = read_fstring(bio)
self.stored_as ^= 0x2
self.secret_guid = (0,0,0,0)
self.encrypted_data.reset()
return True
@classmethod
def read_all(cls, data):
_m = cls.read(data)
@ -94,6 +138,7 @@ class Manifest:
_m.chunk_data_list = CDL.read(_tmp, _m.meta.feature_level)
_m.file_manifest_list = FML.read(_tmp)
_m.custom_fields = CustomFields.read(_tmp)
_m.encrypted_data = EncryptedData.read(_tmp, _m.meta.feature_level)
if unhandled_data := _tmp.read():
logger.warning(f'Did not read {len(unhandled_data)} remaining bytes in manifest! '
@ -119,6 +164,9 @@ class Manifest:
_manifest.sha_hash = bio.read(20)
_manifest.stored_as = struct.unpack('B', bio.read(1))[0]
_manifest.version = struct.unpack('<I', bio.read(4))[0]
if _manifest.version >= 22:
_manifest.secret_guid = struct.unpack('<IIII', bio.read(16))
_manifest.encryption_tag = bio.read(16)
if bio.tell() != _manifest.header_size:
logger.warning(f'Did not read entire header {bio.tell()} != {_manifest.header_size}! '
@ -152,10 +200,10 @@ class Manifest:
target_version = max(18, target_version)
# Downgrade manifest if unknown newer version
if target_version > 21:
if target_version > 24:
logger.warning(f'Trying to serialise an unknown target version: {target_version},'
f'clamping to 21.')
target_version = 21
f'clamping to 24.')
target_version = 24
# Ensure metadata will be correct
self.meta.feature_level = target_version
@ -164,6 +212,8 @@ class Manifest:
self.chunk_data_list.write(body_bio)
self.file_manifest_list.write(body_bio)
self.custom_fields.write(body_bio)
if target_version >= 22:
self.encrypted_data.write(body_bio)
self.data = body_bio.getvalue()
self.size_uncompressed = self.size_compressed = len(self.data)
@ -183,6 +233,10 @@ class Manifest:
bio.write(self.sha_hash)
bio.write(struct.pack('B', self.stored_as))
bio.write(struct.pack('<I', target_version))
if target_version >= 22:
bio.write(struct.pack('<IIII', *self.secret_guid))
bio.write(self.encryption_tag)
bio.write(self.data)
return bio.tell() if fp else bio.getvalue()
@ -433,6 +487,16 @@ class CDL:
for chunk in _cdl.elements:
chunk.file_size = struct.unpack('<q', bio.read(8))[0]
if manifest_version >= 22:
for chunk in _cdl.elements:
chunk.secret_guid = struct.unpack('<IIII', bio.read(16))
for chunk in _cdl.elements:
chunk.window_size_compressed = struct.unpack('<I', bio.read(4))[0]
for chunk in _cdl.elements:
chunk.encryption_tag = bio.read(16)
if (size_read := bio.tell() - cdl_start) != _cdl.size:
logger.warning(f'Did not read entire chunk data list! Version: {_cdl.version}, '
f'{_cdl.size - size_read} bytes missing, skipping...')
@ -461,6 +525,16 @@ class CDL:
for chunk in self.elements:
bio.write(struct.pack('<q', chunk.file_size))
if self._manifest_version >= 22:
for chunk in self.elements:
bio.write(struct.pack('<IIII', *chunk.secret_guid))
for chunk in self.elements:
bio.write(struct.pack('<I', chunk.window_size_compressed))
for chunk in self.elements:
bio.write(chunk.encryption_tag)
cdl_end = bio.tell()
bio.seek(cdl_start)
bio.write(struct.pack('<I', cdl_end - cdl_start))
@ -474,6 +548,9 @@ class ChunkInfo:
self.sha_hash = b''
self.window_size = 0
self.file_size = 0
self.secret_guid = None
self.window_size_compressed = 0
self.encryption_tag = b''
self._manifest_version = manifest_version
# caches for things that are "expensive" to compute
@ -518,6 +595,13 @@ class ChunkInfo:
@property
def path(self):
if self._manifest_version >= 22:
secret_b64 = base64.urlsafe_b64encode(struct.pack('<IIII', *self.secret_guid)).decode().strip('=')
hash_b64 = base64.urlsafe_b64encode(struct.pack('<Q', self.hash)).decode().strip('=')
guid_b64 = base64.urlsafe_b64encode(struct.pack('<IIII', *self.guid)).decode().strip('=')
return '{}/{}/{:02d}/{}_{}.chunk'.format(
get_chunk_dir(self._manifest_version), secret_b64,
self.group_num, hash_b64, guid_b64)
return '{}/{:02d}/{:016X}_{}.chunk'.format(
get_chunk_dir(self._manifest_version), self.group_num,
self.hash, ''.join('{:08X}'.format(g) for g in self.guid))
@ -810,6 +894,110 @@ class CustomFields:
bio.seek(cf_end)
class EncryptedData:
def __init__(self):
self.size = 0
self.version = 0
self.encrypted_header = EncryptedDataHeader()
self.ciphertext = b''
def reset(self):
self.size = 0
self.version = 0
self.ciphertext = b''
self.encrypted_header = EncryptedDataHeader()
@classmethod
def read(cls, bio, feature_level):
_ed = cls()
if feature_level < 24:
return _ed
ed_start = bio.tell()
_ed.size = struct.unpack('<I', bio.read(4))[0]
_ed.version = struct.unpack('B', bio.read(1))[0]
cipher_size = struct.unpack('<I', bio.read(4))[0]
data = BytesIO(bio.read(cipher_size))
_ed.encrypted_header = EncryptedDataHeader.read(data)
ciphertext_size = struct.unpack('<I', data.read(4))[0]
_ed.ciphertext = data.read(ciphertext_size)
if (size_read := bio.tell() - ed_start) != _ed.size:
logger.warning(f'Did not read entire encrypted data part! Version: {_ed.version}, '
f'{_ed.size - size_read} bytes missing, skipping...')
bio.seek(_ed.size - size_read, 1)
_ed.version = 0
return _ed
def write(self, bio):
ed_start = bio.tell()
bio.write(struct.pack('<I', 0)) # placeholder size
bio.write(struct.pack('B', self.version))
# FIXME: Ensure all sizes are properly set in the header
cipher_start = bio.tell()
bio.write(struct.pack('<I', 0)) # cipher size
self.encrypted_header.write(bio)
bio.write(struct.pack('<I', len(self.ciphertext)))
bio.write(self.ciphertext)
ed_end = bio.tell()
bio.seek(cipher_start)
bio.write(struct.pack('<I', ed_end - cipher_start))
bio.seek(ed_start)
bio.write(struct.pack('<I', ed_end - ed_start))
bio.seek(ed_end)
class EncryptedDataHeader:
def __init__(self):
self.size = 0
self.version = 0
self.stored_as = 0
self.data_uncompressed = 0
self.data_compressed = 0
self.iv = b''
@property
def compressed(self):
return self.stored_as & 0x1
@classmethod
def read(cls, bio):
_edh = cls()
edh_start = bio.tell()
_edh.size = struct.unpack('<I', bio.read(4))[0]
_edh.version = struct.unpack('<I', bio.read(4))[0]
_edh.stored_as = struct.unpack('B', bio.read(1))[0]
_edh.data_uncompressed = struct.unpack('<I', bio.read(4))[0]
_edh.data_compressed = struct.unpack('<I', bio.read(4))[0]
iv_len = struct.unpack('<I', bio.read(4))[0]
_edh.iv = bio.read(iv_len)
if (size_read := bio.tell() - edh_start) != _edh.size:
logger.warning(f'Did not read entire encrypted header data part! Version: {_edh.version}, '
f'{_edh.size - size_read} bytes missing, skipping...')
bio.seek(_edh.size - size_read, 1)
_edh.version = 0
return _edh
def write(self, bio):
edh_start = bio.tell()
bio.write(struct.pack('<I', 0))
bio.write(struct.pack('<I', self.version))
bio.write(struct.pack('B', self.stored_as))
bio.write(struct.pack('<I', self.data_uncompressed))
bio.write(struct.pack('<I', self.data_compressed))
bio.write(struct.pack('<I', len(self.iv)))
bio.write(self.iv)
end = bio.tell()
bio.seek(edh_start)
bio.write(struct.pack('<I', end - edh_start))
bio.seek(end)
class ManifestComparison:
def __init__(self):
self.added = set()

View file

@ -1,2 +1,3 @@
requests<3.0
filelock
pycryptodomex

View file

@ -38,7 +38,8 @@ setup(
'requests<3.0',
'setuptools',
'wheel',
'filelock'
'filelock',
'pycryptodomex'
],
extras_require=dict(
webview=['pywebview>=3.4'],