diff --git a/robocop_ng/cogs/logfilereader.py b/robocop_ng/cogs/logfilereader.py index a24f823..eddb925 100644 --- a/robocop_ng/cogs/logfilereader.py +++ b/robocop_ng/cogs/logfilereader.py @@ -1,6 +1,5 @@ import logging import re -from typing import Optional import aiohttp from discord import Colour, Embed, Message, Attachment @@ -22,7 +21,9 @@ from robocop_ng.helpers.disabled_ids import ( is_ro_section_valid, add_disabled_ro_section, remove_disabled_ro_section, + remove_disable_id, ) +from robocop_ng.helpers.ryujinx_log_analyser import LogAnalyser logging.basicConfig( format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)", @@ -52,81 +53,35 @@ class LogFileReader(Cog): self.bot.config.named_roles[x] for x in self.disallowed_named_roles ] - async def download_file(self, log_url): + @staticmethod + async def download_file(log_url): async with aiohttp.ClientSession() as session: # Grabs first and last few bytes of log file to prevent abuse from large files headers = {"Range": "bytes=0-60000, -6000"} async with session.get(log_url, headers=headers) as response: return await response.text("UTF-8") - def get_main_ro_section(self, log_file: str) -> Optional[dict[str, str]]: - ro_section_regex = re.search( - r"PrintRoSectionInfo: main:[\r\n]*(.*)", log_file, re.DOTALL - ) - if ro_section_regex is not None and len(ro_section_regex.groups()) > 0: - ro_section = {"module": "", "sdk_libraries": []} - for line in ro_section_regex.group(1).splitlines(): - line = line.strip() - if line.startswith("Module:"): - ro_section["module"] = line[8:] - elif line.startswith("SDK Libraries:"): - ro_section["sdk_libraries"].append(line[19:]) - elif line.startswith("SDK "): - ro_section["sdk_libraries"].append(line[4:]) - else: - break - return ro_section - return None - - def get_app_info( - self, log_file: str - ) -> Optional[tuple[str, str, list[str], dict[str, str]]]: - game_name = re.search( - r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", - log_file, - re.MULTILINE, - ) - if game_name is not None and len(game_name.groups()) > 0: - game_name = game_name.group(1).rstrip() - app_id_regex = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name) - if app_id_regex: - app_id = app_id_regex.group(1).strip().upper() - else: - app_id = None - bids_regex = re.search( - r"Build ids found for title ([a-zA-Z0-9]*):[\n\r]*(.*)", - log_file, - re.DOTALL, - ) - if bids_regex is not None and len(bids_regex.groups()) > 0: - app_id_from_bids = bids_regex.group(1).strip().upper() - build_ids = [ - bid.strip().upper() - for bid in bids_regex.group(2).splitlines() - if is_build_id_valid(bid.strip()) - ] - - # TODO: Check if self.get_main_ro_section() is None and return an error - return ( - app_id, - app_id_from_bids, - build_ids, - self.get_main_ro_section(log_file), - ) - return None - - def is_log_valid(self, log_file: str) -> bool: - app_info = self.get_app_info(log_file) + @staticmethod + def is_log_valid(log_file: str) -> bool: + app_info = LogAnalyser.get_app_info(log_file) if app_info is None: return True - app_id, another_app_id, _, _ = app_info + game_name, app_id, another_app_id, build_ids, main_ro_section = app_info + if ( + game_name is None + or app_id is None + or another_app_id is None + or build_ids is None + or main_ro_section is None + ): + return False return app_id == another_app_id def is_game_blocked(self, log_file: str) -> bool: - app_info = self.get_app_info(log_file) + app_info = LogAnalyser.get_app_info(log_file) if app_info is None: return False - app_id, another_app_id, build_ids, main_ro_section = app_info + game_name, app_id, another_app_id, build_ids, main_ro_section = app_info if is_app_id_disabled(self.bot, app_id) or is_app_id_disabled( self.bot, another_app_id ): @@ -166,683 +121,123 @@ class LogFileReader(Cog): await message.delete() return embed + def format_analysed_log(self, author_name: str, analysed_log): + cleaned_game_name = re.sub( + r"\s\[(64|32)-bit\]$", "", analysed_log["game_info"]["game_name"] + ) + analysed_log["game_info"]["game_name"] = cleaned_game_name + + hardware_info = " | ".join( + ( + f"**CPU:** {analysed_log['hardware_info']['cpu']}", + f"**GPU:** {analysed_log['hardware_info']['gpu']}", + f"**RAM:** {analysed_log['hardware_info']['ram']}", + f"**OS:** {analysed_log['hardware_info']['os']}", + ) + ) + + system_settings_info = "\n".join( + ( + f"**Audio Backend:** `{analysed_log['settings']['audio_backend']}`", + f"**Console Mode:** `{analysed_log['settings']['docked']}`", + f"**PPTC Cache:** `{analysed_log['settings']['pptc']}`", + f"**Shader Cache:** `{analysed_log['settings']['shader_cache']}`", + f"**V-Sync:** `{analysed_log['settings']['vsync']}`", + ) + ) + + graphics_settings_info = "\n".join( + ( + f"**Graphics Backend:** `{analysed_log['settings']['graphics_backend']}`", + f"**Resolution:** `{analysed_log['settings']['resolution_scale']}`", + f"**Anisotropic Filtering:** `{analysed_log['settings']['anisotropic_filtering']}`", + f"**Aspect Ratio:** `{analysed_log['settings']['aspect_ratio']}`", + f"**Texture Recompression:** `{analysed_log['settings']['texture_recompression']}`", + ) + ) + + ryujinx_info = " | ".join( + ( + f"**Version:** {analysed_log['emu_info']['ryu_version']}", + f"**Firmware:** {analysed_log['emu_info']['ryu_firmware']}", + ) + ) + + log_embed = Embed(title=f"{cleaned_game_name}", colour=self.ryujinx_blue) + log_embed.set_footer(text=f"Log uploaded by {author_name}") + log_embed.add_field( + name="General Info", + value=" | ".join((ryujinx_info, hardware_info)), + inline=False, + ) + log_embed.add_field( + name="System Settings", + value=system_settings_info, + inline=True, + ) + log_embed.add_field( + name="Graphics Settings", + value=graphics_settings_info, + inline=True, + ) + if ( + cleaned_game_name == "Unknown" + and analysed_log["game_info"]["errors"] == "No errors found in log" + ): + log_embed.add_field( + name="Empty Log", + value=f"""The log file appears to be empty. To get a proper log, follow these steps: + 1) In Logging settings, ensure `Enable Logging to File` is checked. + 2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`. + 3) Start a game up. + 4) Play until your issue occurs. + 5) Upload the latest log file which is larger than 3KB.""", + inline=False, + ) + if ( + cleaned_game_name == "Unknown" + and analysed_log["game_info"]["errors"] != "No errors found in log" + ): + log_embed.add_field( + name="Latest Error Snippet", + value=analysed_log["game_info"]["errors"], + inline=False, + ) + log_embed.add_field( + name="No Game Boot Detected", + value=f"""No game boot has been detected in log file. To get a proper log, follow these steps: + 1) In Logging settings, ensure `Enable Logging to File` is checked. + 2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`. + 3) Start a game up. + 4) Play until your issue occurs. + 5) Upload the latest log file which is larger than 3KB.""", + inline=False, + ) + else: + log_embed.add_field( + name="Latest Error Snippet", + value=analysed_log["game_info"]["errors"], + inline=False, + ) + log_embed.add_field( + name="Mods", value=analysed_log["game_info"]["mods"], inline=False + ) + log_embed.add_field( + name="Cheats", value=analysed_log["game_info"]["cheats"], inline=False + ) + + log_embed.add_field( + name="Notes", + value=analysed_log["game_info"]["notes"], + inline=False, + ) + + return log_embed + async def log_file_read(self, message): - self.embed = { - "hardware_info": { - "cpu": "Unknown", - "gpu": "Unknown", - "ram": "Unknown", - "os": "Unknown", - }, - "emu_info": { - "ryu_version": "Unknown", - "ryu_firmware": "Unknown", - "logs_enabled": None, - }, - "game_info": { - "game_name": "Unknown", - "errors": "No errors found in log", - "mods": "No mods found", - "cheats": "No cheats found", - "notes": [], - }, - "settings": { - "audio_backend": "Unknown", - "backend_threading": "Unknown", - "docked": "Unknown", - "expand_ram": "Unknown", - "fs_integrity": "Unknown", - "graphics_backend": "Unknown", - "ignore_missing_services": "Unknown", - "memory_manager": "Unknown", - "pptc": "Unknown", - "shader_cache": "Unknown", - "vsync": "Unknown", - "resolution_scale": "Unknown", - "anisotropic_filtering": "Unknown", - "aspect_ratio": "Unknown", - "texture_recompression": "Unknown", - }, - } attached_log = message.attachments[0] author_name = f"@{message.author.name}" log_file = await self.download_file(attached_log.url) - # Large files show a header value when not downloaded completely - # this regex makes sure that the log text to read starts from the first timestamp, ignoring headers - log_file_header_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL) - log_file_match = re.search(log_file_header_regex, log_file) - - if log_file_match: - log_file = log_file_match.group(0) - else: - return Embed( - colour=self.ryujinx_blue, - description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.", - ) - - def get_hardware_info(log_file=log_file): - for setting in self.embed["hardware_info"]: - try: - if setting == "cpu": - self.embed["hardware_info"][setting] = ( - re.search(r"CPU:\s([^;\n\r]*)", log_file, re.MULTILINE) - .group(1) - .rstrip() - ) - if setting == "ram": - self.embed["hardware_info"][setting] = ( - re.search( - r"RAM:(\sTotal)?\s([^;\n\r]*)", log_file, re.MULTILINE - ) - .group(2) - .rstrip() - ) - if setting == "os": - self.embed["hardware_info"][setting] = ( - re.search( - r"Operating System:\s([^;\n\r]*)", - log_file, - re.MULTILINE, - ) - .group(1) - .rstrip() - ) - if setting == "gpu": - self.embed["hardware_info"][setting] = ( - re.search( - r"PrintGpuInformation:\s([^;\n\r]*)", - log_file, - re.MULTILINE, - ) - .group(1) - .rstrip() - ) - except AttributeError: - continue - - def get_ryujinx_info(log_file=log_file): - for setting in self.embed["emu_info"]: - try: - if setting == "ryu_version": - self.embed["emu_info"][setting] = [ - line.split()[-1] - for line in log_file.splitlines() - if "Ryujinx Version:" in line - ][0] - if setting == "logs_enabled": - self.embed["emu_info"][setting] = ( - re.search( - r"Logs Enabled:\s([^;\n\r]*)", log_file, re.MULTILINE - ) - .group(1) - .rstrip() - ) - if setting == "ryu_firmware": - self.embed["emu_info"]["ryu_firmware"] = [ - line.split()[-1] - for line in log_file.splitlines() - if "Firmware Version:" in line - ][0] - except (AttributeError, IndexError): - continue - - def format_log_embed(): - cleaned_game_name = re.sub( - r"\s\[(64|32)-bit\]$", "", self.embed["game_info"]["game_name"] - ) - self.embed["game_info"]["game_name"] = cleaned_game_name - - hardware_info = " | ".join( - ( - f"**CPU:** {self.embed['hardware_info']['cpu']}", - f"**GPU:** {self.embed['hardware_info']['gpu']}", - f"**RAM:** {self.embed['hardware_info']['ram']}", - f"**OS:** {self.embed['hardware_info']['os']}", - ) - ) - - system_settings_info = "\n".join( - ( - f"**Audio Backend:** `{self.embed['settings']['audio_backend']}`", - f"**Console Mode:** `{self.embed['settings']['docked']}`", - f"**PPTC Cache:** `{self.embed['settings']['pptc']}`", - f"**Shader Cache:** `{self.embed['settings']['shader_cache']}`", - f"**V-Sync:** `{self.embed['settings']['vsync']}`", - ) - ) - - graphics_settings_info = "\n".join( - ( - f"**Graphics Backend:** `{self.embed['settings']['graphics_backend']}`", - f"**Resolution:** `{self.embed['settings']['resolution_scale']}`", - f"**Anisotropic Filtering:** `{self.embed['settings']['anisotropic_filtering']}`", - f"**Aspect Ratio:** `{self.embed['settings']['aspect_ratio']}`", - f"**Texture Recompression:** `{self.embed['settings']['texture_recompression']}`", - ) - ) - - ryujinx_info = " | ".join( - ( - f"**Version:** {self.embed['emu_info']['ryu_version']}", - f"**Firmware:** {self.embed['emu_info']['ryu_firmware']}", - ) - ) - - log_embed = Embed(title=f"{cleaned_game_name}", colour=self.ryujinx_blue) - log_embed.set_footer(text=f"Log uploaded by {author_name}") - log_embed.add_field( - name="General Info", - value=" | ".join((ryujinx_info, hardware_info)), - inline=False, - ) - log_embed.add_field( - name="System Settings", - value=system_settings_info, - inline=True, - ) - log_embed.add_field( - name="Graphics Settings", - value=graphics_settings_info, - inline=True, - ) - if ( - cleaned_game_name == "Unknown" - and self.embed["game_info"]["errors"] == "No errors found in log" - ): - log_embed.add_field( - name="Empty Log", - value=f"""The log file appears to be empty. To get a proper log, follow these steps: -1) In Logging settings, ensure `Enable Logging to File` is checked. -2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`. -3) Start a game up. -4) Play until your issue occurs. -5) Upload the latest log file which is larger than 2KB.""", - inline=False, - ) - if ( - cleaned_game_name == "Unknown" - and self.embed["game_info"]["errors"] != "No errors found in log" - ): - log_embed.add_field( - name="Latest Error Snippet", - value=self.embed["game_info"]["errors"], - inline=False, - ) - log_embed.add_field( - name="No Game Boot Detected", - value=f"""No game boot has been detected in log file. To get a proper log, follow these steps: -1) In Logging settings, ensure `Enable Logging to File` is checked. -2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`. -3) Start a game up. -4) Play until your issue occurs. -5) Upload the latest log file which is larger than 3KB.""", - inline=False, - ) - else: - log_embed.add_field( - name="Latest Error Snippet", - value=self.embed["game_info"]["errors"], - inline=False, - ) - log_embed.add_field( - name="Mods", value=self.embed["game_info"]["mods"], inline=False - ) - log_embed.add_field( - name="Cheats", value=self.embed["game_info"]["cheats"], inline=False - ) - - try: - notes_value = "\n".join(game_notes) - except TypeError: - notes_value = "Nothing to note" - log_embed.add_field( - name="Notes", - value=notes_value, - inline=False, - ) - - return log_embed - - def analyse_log(log_file=log_file): - try: - for setting_name in self.embed["settings"]: - # Some log info may be missing for users that use older versions of Ryujinx, so reading the settings is not always possible. - # As settings are initialized with "Unknown" values, False should not be an issue for setting.get() - def get_setting(name, setting_string, log_file=log_file): - setting = self.embed["settings"] - setting_value = [ - line.split()[-1] - for line in log_file.splitlines() - if re.search(rf"LogValueChange: ({setting_string})\s", line) - ][-1] - if setting_value and setting.get(name): - setting[name] = setting_value - if name == "docked": - setting[ - name - ] = f"{'Docked' if setting_value == 'True' else 'Handheld'}" - if name == "resolution_scale": - resolution_map = { - "-1": "Custom", - "1": "Native (720p/1080p)", - "2": "2x (1440p/2160p)", - "3": "3x (2160p/3240p)", - "4": "4x (2880p/4320p)", - } - setting[name] = resolution_map[setting_value] - if name == "anisotropic_filtering": - anisotropic_map = { - "-1": "Auto", - "2": "2x", - "4": "4x", - "8": "8x", - "16": "16x", - } - setting[name] = anisotropic_map[setting_value] - if name == "aspect_ratio": - aspect_map = { - "Fixed4x3": "4:3", - "Fixed16x9": "16:9", - "Fixed16x10": "16:10", - "Fixed21x9": "21:9", - "Fixed32x9": "32:9", - "Stretched": "Stretch to Fit Window", - } - setting[name] = aspect_map[setting_value] - if name in [ - "pptc", - "shader_cache", - "texture_recompression", - "vsync", - ]: - setting[ - name - ] = f"{'Enabled' if setting_value == 'True' else 'Disabled'}" - return setting[name] - - setting_map = { - "anisotropic_filtering": "MaxAnisotropy", - "aspect_ratio": "AspectRatio", - "audio_backend": "AudioBackend", - "backend_threading": "BackendThreading", - "docked": "EnableDockedMode", - "expand_ram": "ExpandRam", - "fs_integrity": "EnableFsIntegrityChecks", - "graphics_backend": "GraphicsBackend", - "ignore_missing_services": "IgnoreMissingServices", - "memory_manager": "MemoryManagerMode", - "pptc": "EnablePtc", - "resolution_scale": "ResScale", - "shader_cache": "EnableShaderCache", - "texture_recompression": "EnableTextureRecompression", - "vsync": "EnableVsync", - } - try: - self.embed[setting_name] = get_setting( - setting_name, setting_map[setting_name], log_file=log_file - ) - except (AttributeError, IndexError) as error: - logging.info( - f"Settings exception: {setting_name}: {type(error).__name__}" - ) - continue - - def analyse_error_message(log_file=log_file): - try: - errors = [] - curr_error_lines = [] - for line in log_file.splitlines(): - if line == "": - continue - if "|E|" in line: - curr_error_lines = [line] - errors.append(curr_error_lines) - elif line[0] == " " or line == "": - curr_error_lines.append(line) - - def error_search(search_terms): - for term in search_terms: - for error_lines in errors: - line = "\n".join(error_lines) - if term in line: - return True - - return False - - shader_cache_collision = error_search(["Cache collision found"]) - dump_hash_error = error_search( - [ - "ResultFsInvalidIvfcHash", - "ResultFsNonRealDataVerificationFailed", - ] - ) - shader_cache_corruption = error_search( - [ - "Ryujinx.Graphics.Gpu.Shader.ShaderCache.Initialize()", - "System.IO.InvalidDataException: End of Central Directory record could not be found", - "ICSharpCode.SharpZipLib.Zip.ZipException: Cannot find central directory", - ] - ) - update_keys_error = error_search(["MissingKeyException"]) - file_permissions_error = error_search( - ["ResultFsPermissionDenied"] - ) - file_not_found_error = error_search(["ResultFsTargetNotFound"]) - missing_services_error = error_search( - ["ServiceNotImplementedException"] - ) - vulkan_out_of_memory_error = error_search( - ["ErrorOutOfDeviceMemory"] - ) - - last_errors = "\n".join( - errors[-1][:2] if "|E|" in errors[-1][0] else "" - ) - except IndexError: - last_errors = None - return ( - last_errors, - shader_cache_collision, - dump_hash_error, - shader_cache_corruption, - update_keys_error, - file_permissions_error, - file_not_found_error, - missing_services_error, - vulkan_out_of_memory_error, - ) - - # Finds the latest error denoted by |E| in the log and its first line - # Also warns of common issues - ( - last_error_snippet, - shader_cache_warn, - dump_hash_warning, - shader_cache_corruption_warn, - update_keys_warn, - file_permissions_warn, - file_not_found_warn, - missing_services_warn, - vulkan_out_of_memory_warn, - ) = analyse_error_message() - if last_error_snippet: - self.embed["game_info"]["errors"] = f"```{last_error_snippet}```" - else: - pass - # Game name parsed last so that user settings are visible with empty log - try: - self.embed["game_info"]["game_name"] = ( - re.search( - r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", - log_file, - re.MULTILINE, - ) - .group(1) - .rstrip() - ) - except AttributeError: - pass - - if shader_cache_warn: - shader_cache_warn = f"⚠️ Cache collision detected. Investigate possible shader cache issues" - self.embed["game_info"]["notes"].append(shader_cache_warn) - - if shader_cache_corruption_warn: - shader_cache_corruption_warn = f"⚠️ Cache corruption detected. Investigate possible shader cache issues" - self.embed["game_info"]["notes"].append( - shader_cache_corruption_warn - ) - - if dump_hash_warning: - dump_hash_warning = f"⚠️ Dump error detected. Investigate possible bad game/firmware dump issues" - self.embed["game_info"]["notes"].append(dump_hash_warning) - - if update_keys_warn: - update_keys_warn = ( - f"⚠️ Keys or firmware out of date, consider updating them" - ) - self.embed["game_info"]["notes"].append(update_keys_warn) - - if file_permissions_warn: - file_permissions_warn = f"⚠️ File permission error. Consider deleting save directory and allowing Ryujinx to make a new one" - self.embed["game_info"]["notes"].append(file_permissions_warn) - - if file_not_found_warn: - file_not_found_warn = f"⚠️ Save not found error. Consider starting game without a save file or using a new save file" - self.embed["game_info"]["notes"].append(file_not_found_warn) - - if ( - missing_services_warn - and self.embed["settings"]["ignore_missing_services"] == "False" - ): - missing_services_warn = f"⚠️ Consider enabling `Ignore Missing Services` in Ryujinx settings" - self.embed["game_info"]["notes"].append(missing_services_warn) - - if ( - vulkan_out_of_memory_warn - and self.embed["settings"]["texture_recompression"] == "Disabled" - ): - vulkan_out_of_memory_warn = f"⚠️ Consider enabling `Texture Recompression` in Ryujinx settings" - self.embed["game_info"]["notes"].append(vulkan_out_of_memory_warn) - - timestamp_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}") - latest_timestamp = re.findall(timestamp_regex, log_file)[-1] - if latest_timestamp: - timestamp_message = f"ℹ️ Time elapsed: `{latest_timestamp}`" - self.embed["game_info"]["notes"].append(timestamp_message) - - def mods_information(log_file=log_file): - mods_regex = re.compile(r"Found mod\s\'(.+?)\'\s(\[.+?\])") - matches = re.findall(mods_regex, log_file) - if matches: - mods = [ - {"mod": match[0], "status": match[1]} for match in matches - ] - mods_status = [ - f"ℹ️ {i['mod']} ({'ExeFS' if i['status'] == '[E]' else 'RomFS'})" - for i in mods - ] - # Remove duplicated mods from output - mods_status = list(dict.fromkeys(mods_status)) - return mods_status - - def cheat_information(log_file=log_file): - cheat_regex = re.compile(r"Installing cheat\s\'?\'") - matches = re.findall(cheat_regex, log_file) - if matches: - cheats = [f"ℹ️ {match}" for match in matches] - return list(set(cheats)) - - game_mods = mods_information() - if game_mods: - self.embed["game_info"]["mods"] = "\n".join(game_mods) - - game_cheats = cheat_information() - if game_cheats: - self.embed["game_info"]["cheats"] = "\n".join(game_cheats) - - if ( - re.search(r"UserId: 00000000000000010000000000000000", log_file) - is not None - ): - self.embed["game_info"]["notes"].append( - "⚠️ Default user profile in use, consider creating a custom one." - ) - - controllers_regex = re.compile(r"Hid Configure: ([^\r\n]+)") - controllers = re.findall(controllers_regex, log_file) - if controllers: - input_status = [f"ℹ {match}" for match in controllers] - # Hid Configure lines can appear multiple times, so converting to dict keys removes duplicate entries, - # also maintains the list order - input_status = list(dict.fromkeys(input_status)) - input_string = "\n".join(input_status) - self.embed["game_info"]["notes"].append(input_string) - # If emulator crashes on startup without game load, there is no need to show controller notification at all - if ( - not controllers - and self.embed["game_info"]["game_name"] != "Unknown" - ): - input_string = "⚠️ No controller information found" - self.embed["game_info"]["notes"].append(input_string) - - try: - ram_available_regex = re.compile( - r"Application\sPrint:\sRAM:(?:.*Available\s)(\d+)" - ) - ram_available = re.search(ram_available_regex, log_file)[1] - if int(ram_available) < 8000: - ram_warning = ( - f"⚠️ Less than 8GB RAM available ({str(ram_available)} MB)" - ) - self.embed["game_info"]["notes"].append(ram_warning) - except TypeError: - pass - - if ( - "Windows" in self.embed["hardware_info"]["os"] - and self.embed["settings"]["graphics_backend"] != "Vulkan" - ): - if "Intel" in self.embed["hardware_info"]["gpu"]: - intel_gpu_warning = "**⚠️ Intel iGPU users should consider using Vulkan graphics backend**" - self.embed["game_info"]["notes"].append(intel_gpu_warning) - if "AMD" in self.embed["hardware_info"]["gpu"]: - amd_gpu_warning = "**⚠️ AMD GPU users should consider using Vulkan graphics backend**" - self.embed["game_info"]["notes"].append(amd_gpu_warning) - - try: - default_logs = ["Info", "Warning", "Error", "Guest", "Stub"] - user_logs = ( - self.embed["emu_info"]["logs_enabled"] - .rstrip() - .replace(" ", "") - .split(",") - ) - if "Debug" in user_logs: - debug_warning = f"⚠️ **Debug logs enabled will have a negative impact on performance**" - self.embed["game_info"]["notes"].append(debug_warning) - disabled_logs = set(default_logs).difference(set(user_logs)) - if disabled_logs: - logs_status = [ - f"⚠️ {log} log is not enabled" for log in disabled_logs - ] - log_string = "\n".join(logs_status) - else: - log_string = "✅ Default logs enabled" - self.embed["game_info"]["notes"].append(log_string) - except AttributeError: - pass - - if self.embed["emu_info"]["ryu_firmware"] == "Unknown": - firmware_warning = f"**❌ Nintendo Switch firmware not found**" - self.embed["game_info"]["notes"].append(firmware_warning) - - if self.embed["settings"]["audio_backend"] == "Dummy": - dummy_warning = ( - f"⚠️ Dummy audio backend, consider changing to SDL2 or OpenAL" - ) - self.embed["game_info"]["notes"].append(dummy_warning) - - if self.embed["settings"]["pptc"] == "Disabled": - pptc_warning = f"🔴 **PPTC cache should be enabled**" - self.embed["game_info"]["notes"].append(pptc_warning) - - if self.embed["settings"]["shader_cache"] == "Disabled": - shader_warning = f"🔴 **Shader cache should be enabled**" - self.embed["game_info"]["notes"].append(shader_warning) - - if self.embed["settings"]["expand_ram"] == "True": - expand_ram_warning = f"⚠️ `Use alternative memory layout` should only be enabled for 4K mods" - self.embed["game_info"]["notes"].append(expand_ram_warning) - - if self.embed["settings"]["memory_manager"] == "SoftwarePageTable": - software_memory_manager_warning = "🔴 **`Software` setting in Memory Manager Mode will give slower performance than the default setting of `Host unchecked`**" - self.embed["game_info"]["notes"].append( - software_memory_manager_warning - ) - - if self.embed["settings"]["ignore_missing_services"] == "True": - ignore_missing_services_warning = "⚠️ `Ignore Missing Services` being enabled can cause instability" - self.embed["game_info"]["notes"].append( - ignore_missing_services_warning - ) - - if self.embed["settings"]["vsync"] == "Disabled": - vsync_warning = f"⚠️ V-Sync disabled can cause instability like games running faster than intended or longer load times" - self.embed["game_info"]["notes"].append(vsync_warning) - - if self.embed["settings"]["fs_integrity"] == "Disabled": - fs_integrity_warning = f"⚠️ Disabling file integrity checks may cause corrupted dumps to not be detected" - self.embed["game_info"]["notes"].append(fs_integrity_warning) - - if self.embed["settings"]["backend_threading"] == "Off": - backend_threading_warning = ( - f"🔴 **Graphics Backend Multithreading should be set to `Auto`**" - ) - self.embed["game_info"]["notes"].append(backend_threading_warning) - - mainline_version = re.compile(r"^\d\.\d\.\d+$") - old_mainline_version = re.compile(r"^\d\.\d\.(\d){4}$") - pr_version = re.compile(r"^\d\.\d\.\d\+([a-f]|\d){7}$") - ldn_version = re.compile(r"^\d\.\d\.\d\-ldn\d+\.\d+(?:\.\d+|$)") - mac_version = re.compile(r"^\d\.\d\.\d\-macos\d+(?:\.\d+(?:\.\d+|$)|$)") - - is_channel_allowed = False - - for ( - allowed_channel_id - ) in self.bot.config.bot_log_allowed_channels.values(): - if message.channel.id == allowed_channel_id: - is_channel_allowed = True - break - - if is_channel_allowed: - if re.match(pr_version, self.embed["emu_info"]["ryu_version"]): - pr_version_warning = f"**⚠️ PR build logs should be posted in <#{self.bot.config.bot_log_allowed_channels['pr-testing']}> if reporting bugs or tests**" - self.embed["game_info"]["notes"].append(pr_version_warning) - - if re.match( - old_mainline_version, self.embed["emu_info"]["ryu_version"] - ): - old_mainline_version_warning = f"**🔴 Old Ryujinx version, please re-download from the Ryujinx website as auto-updates will not work on this version**" - self.embed["game_info"]["notes"].append( - old_mainline_version_warning - ) - - if not ( - re.match( - mainline_version, self.embed["emu_info"]["ryu_version"] - ) - or re.match( - old_mainline_version, self.embed["emu_info"]["ryu_version"] - ) - or re.match(mac_version, self.embed["emu_info"]["ryu_version"]) - or re.match(ldn_version, self.embed["emu_info"]["ryu_version"]) - or re.match(pr_version, self.embed["emu_info"]["ryu_version"]) - or re.match("Unknown", self.embed["emu_info"]["ryu_version"]) - ): - custom_build_warning = ( - "**⚠️ Custom builds are not officially supported**" - ) - self.embed["game_info"]["notes"].append(custom_build_warning) - - def severity(log_note_string): - symbols = ["❌", "🔴", "⚠️", "ℹ", "✅"] - return next( - i - for i, symbol in enumerate(symbols) - if symbol in log_note_string - ) - - game_notes = [note for note in self.embed["game_info"]["notes"]] - # Warnings split on the string after the warning symbol for alphabetical ordering - # Severity key then orders alphabetically sorted warnings to show most severe first - ordered_game_notes = sorted( - sorted(game_notes, key=lambda x: x.split()[1]), key=severity - ) - return ordered_game_notes - except AttributeError: - pass if self.is_game_blocked(log_file): return await self.blocked_game_action(message) @@ -865,25 +260,41 @@ class LogFileReader(Cog): embed.set_footer(text=f"Log uploaded by {author_name}") return embed - get_hardware_info() - get_ryujinx_info() - game_notes = analyse_log() + try: + analyser = LogAnalyser(log_file) + except ValueError: + return Embed( + colour=self.ryujinx_blue, + description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.", + ) - return format_log_embed() + is_channel_allowed = False + for allowed_channel_id in self.bot.config.bot_log_allowed_channels.values(): + if message.channel.id == allowed_channel_id: + is_channel_allowed = True + break + + return self.format_analysed_log( + author_name, + analyser.analyse_discord( + is_channel_allowed, + self.bot.config.bot_log_allowed_channels["pr-testing"], + ), + ) @commands.check(check_if_staff) @commands.command( aliases=["disallow_log_id", "forbid_log_id", "block_id", "blockid"] ) async def disable_log_id( - self, ctx: Context, block_id_type: str, block_id: str, note="" + self, ctx: Context, disable_id: str, block_id_type: str, *, block_id: str ): match block_id_type.lower(): case "app" | "app_id" | "appid" | "tid" | "title_id": if not is_app_id_valid(block_id): return await ctx.send("The specified app id is invalid.") - if add_disabled_app_id(self.bot, block_id, note): + if add_disabled_app_id(self.bot, disable_id, block_id): return await ctx.send( f"Application id '{block_id}' is now blocked!" ) @@ -895,13 +306,37 @@ class LogFileReader(Cog): if not is_build_id_valid(block_id): return await ctx.send("The specified build id is invalid.") - if add_disabled_build_id(self.bot, block_id, note): + if add_disabled_build_id(self.bot, disable_id, block_id): return await ctx.send(f"Build id '{block_id}' is now blocked!") else: return await ctx.send(f"Build id '{block_id}' is already blocked.") + case "ro_section" | "rosection": + ro_section_snippet = block_id.strip("`").splitlines() + ro_section_snippet = [ + line for line in ro_section_snippet if len(line.strip()) > 0 + ] + + ro_section_info_regex = re.search( + r"PrintRoSectionInfo: main:", ro_section_snippet[0] + ) + if ro_section_info_regex is None: + ro_section_snippet.insert(0, "PrintRoSectionInfo: main:") + + ro_section = LogAnalyser.get_main_ro_section( + "\n".join(ro_section_snippet) + ) + if ro_section is not None and is_ro_section_valid(ro_section): + if add_disabled_ro_section(self.bot, disable_id, ro_section): + return await ctx.send( + f"The specified read-only section for '{disable_id}' is now blocked." + ) + else: + return await ctx.send( + f"The specified read-only section for '{disable_id}' is already blocked." + ) case _: return await ctx.send( - "The specified id type is invalid. Valid id types are: ['app_id', 'build_id']" + "The specified id type is invalid. Valid id types are: ['app_id', 'build_id', 'ro_section']" ) @commands.check(check_if_staff) @@ -914,36 +349,49 @@ class LogFileReader(Cog): "unblockid", ] ) - async def enable_log_id(self, ctx: Context, block_id_type: str, block_id: str): + async def enable_log_id(self, ctx: Context, disable_id: str, block_id_type="all"): match block_id_type.lower(): - case "app" | "app_id" | "appid" | "tid" | "title_id": - if not is_app_id_valid(block_id): - return await ctx.send("The specified app id is invalid.") - - if remove_disabled_app_id(self.bot, block_id): + case "all": + if remove_disable_id(self.bot, disable_id): return await ctx.send( - f"Application id '{block_id}' is now unblocked!" + f"All ids for '{disable_id}' are now unblocked!" + ) + else: + return await ctx.send(f"No blocked ids for '{disable_id}' found.") + case "app" | "app_id" | "appid" | "tid" | "title_id": + if remove_disabled_app_id(self.bot, disable_id): + return await ctx.send( + f"Application id for '{disable_id}' is now unblocked!" ) else: return await ctx.send( - f"Application id '{block_id}' is not blocked." + f"No blocked application id for '{disable_id}' found." ) case "build" | "build_id" | "bid": - if not is_build_id_valid(block_id): - return await ctx.send("The specified build id is invalid.") - - if remove_disabled_build_id(self.bot, block_id): - return await ctx.send(f"Build id '{block_id}' is now unblocked!") + if remove_disabled_build_id(self.bot, disable_id): + return await ctx.send( + f"Build id for '{disable_id}' is now unblocked!" + ) else: - return await ctx.send(f"Build id '{block_id}' is not blocked.") + return await ctx.send(f"No blocked build id '{disable_id}' found.") + case "ro_section" | "rosection": + if remove_disabled_ro_section(self.bot, disable_id): + return await ctx.send( + f"Read-only section for '{disable_id}' is now unblocked!" + ) + else: + return await ctx.send( + f"No blocked read-only section for '{disable_id}' found." + ) case _: return await ctx.send( - "The specified id type is invalid. Valid id types are: ['app_id', 'build_id']" + "The specified id type is invalid. Valid id types are: ['all', 'app_id', 'build_id', 'ro_section']" ) @commands.check(check_if_staff) @commands.command( aliases=[ + "disabled_ids", "blocked_ids", "listblockedids", "list_blocked_log_ids", @@ -952,93 +400,40 @@ class LogFileReader(Cog): ) async def list_disabled_ids(self, ctx: Context): disabled_ids = get_disabled_ids(self.bot) + id_types = {"app_id": "AppID", "build_id": "BID", "ro_section": "RoSection"} + message = "**Blocking analysis of the following IDs:**\n" - for id_type, name in { - "app_id": "Application IDs", - "build_id": "Build IDs", - }.items(): - if len(disabled_ids[id_type].keys()) > 0: - message += f"- {name}:\n" - for disabled_id, note in disabled_ids[id_type].items(): - message += ( - f" - [{disabled_id.upper()}]: {note}\n" - if note != "" - else f" - [{disabled_id}]\n" - ) - message += "\n" - if len(disabled_ids["ro_section"].keys()) > 0: - message += "- Read-only sections:\n" - for note in disabled_ids["ro_section"].keys(): - f"- [{note}]" + for name, entry in disabled_ids.items(): + message += f"- {name}:\n" + for id_type, title in id_types.items(): + if len(entry[id_type]) > 0: + if id_type != "ro_section": + message += f" - __{title}__: {entry[id_type]}\n" + else: + message += f" - __{title}__\n" + message += "\n" return await ctx.send(message) - @commands.check(check_if_staff) - @commands.command( - aliases=[ - "disallow_ro_section", - "forbid_ro_section", - "block_ro_section", - "blockrosection", - ] - ) - async def disable_ro_section( - self, ctx: Context, note: str, *, ro_section_snippet: str - ): - ro_section_snippet = ro_section_snippet.strip("`").splitlines() - ro_section_snippet = [ - line for line in ro_section_snippet if len(line.strip()) > 0 - ] - - ro_section_info_regex = re.search( - r"PrintRoSectionInfo: main:", ro_section_snippet[0] - ) - if ro_section_info_regex is None: - ro_section_snippet.insert(0, "PrintRoSectionInfo: main:") - - ro_section = self.get_main_ro_section("\n".join(ro_section_snippet)) - if ro_section is not None and is_ro_section_valid(ro_section): - if add_disabled_ro_section(self.bot, note, ro_section): - return await ctx.send( - f"The specified read-only section '{note}' is now blocked." - ) - else: - return await ctx.send( - f"The specified read-only section '{note}' is already blocked." - ) - - @commands.check(check_if_staff) - @commands.command( - aliases=[ - "allow_ro_section", - "unblock_ro_section", - "allow_rosection", - "unblockrosection", - ] - ) - async def enable_ro_section(self, ctx: Context, note: str): - if remove_disabled_ro_section(self.bot, note): - return await ctx.send( - f"The read-only section for '{note}' is now unblocked!" - ) - else: - return await ctx.send(f"The read-only section for '{note}' is not blocked.") - @commands.check(check_if_staff) @commands.command( aliases=[ "get_blocked_ro_section", "disabled_ro_section", - "blocked_ro_section" "list_disabled_ro_section", + "blocked_ro_section", + "list_disabled_ro_section", "list_blocked_ro_section", ] ) - async def get_disabled_ro_section(self, ctx: Context, note: str): + async def get_disabled_ro_section(self, ctx: Context, disable_id: str): disabled_ids = get_disabled_ids(self.bot) - key_note = note.lower() - if key_note in disabled_ids["ro_section"].keys(): - message = f"**Disabled read-only section for '{note}'**:\n" + disable_id = disable_id.lower() + if ( + disable_id in disabled_ids.keys() + and len(disabled_ids[disable_id]["ro_section"]) > 0 + ): + message = f"**Blocked read-only section for '{disable_id}'**:\n" message += "```\n" - for key, content in disabled_ids["ro_section"][key_note].items(): + for key, content in disabled_ids[disable_id]["ro_section"].items(): match key: case "module": message += f"Module: {content}\n" @@ -1050,7 +445,7 @@ class LogFileReader(Cog): message += "```" return await ctx.send(message) else: - return await ctx.send("The specified read-only section is not blocked.") + return await ctx.send(f"No read-only section blocked for '{disable_id}'.") async def analyse_log_message(self, message: Message, attachment_index=0): author_id = message.author.id diff --git a/robocop_ng/helpers/disabled_ids.py b/robocop_ng/helpers/disabled_ids.py index 1392620..76f5c55 100644 --- a/robocop_ng/helpers/disabled_ids.py +++ b/robocop_ng/helpers/disabled_ids.py @@ -4,11 +4,7 @@ from typing import Union def get_disabled_ids_path(bot) -> str: - old_filepath = os.path.join(bot.state_dir, "data/disabled_tids.json") - new_filepath = os.path.join(bot.state_dir, "data/disabled_ids.json") - if os.path.isfile(old_filepath): - os.rename(old_filepath, new_filepath) - return new_filepath + return os.path.join(bot.state_dir, "data/disabled_ids.json") def is_app_id_valid(app_id: str) -> bool: @@ -28,11 +24,25 @@ def get_disabled_ids(bot) -> dict[str, dict[str, Union[str, dict[str, str]]]]: with open(get_disabled_ids_path(bot), "r") as f: disabled_ids = json.load(f) # Migration code - if "app_id" not in disabled_ids.keys(): - disabled_ids = {"app_id": disabled_ids, "build_id": {}, "ro_section": {}} + if "app_id" in disabled_ids.keys(): + old_disabled_ids = disabled_ids.copy() + disabled_ids = {} + for key in disabled_ids["app_id"].values(): + disabled_ids[key.lower()] = { + "app_id": "", + "build_id": "", + "ro_section": {}, + } + for id_type in ["app_id", "build_id"]: + for value, key in old_disabled_ids[id_type].items(): + disabled_ids[key.lower()][id_type] = value + for key, value in old_disabled_ids["ro_section"].items(): + disabled_ids[key.lower()]["ro_section"] = value + set_disabled_ids(bot, disabled_ids) + return disabled_ids - return {"app_id": {}, "build_id": {}, "ro_section": {}} + return {} def set_disabled_ids(bot, contents: dict[str, dict[str, Union[str, dict[str, str]]]]): @@ -40,102 +50,141 @@ def set_disabled_ids(bot, contents: dict[str, dict[str, Union[str, dict[str, str json.dump(contents, f) +def add_disable_id_if_necessary( + disable_id: str, disabled_ids: dict[str, dict[str, Union[str, dict[str, str]]]] +): + if disable_id not in disabled_ids.keys(): + disabled_ids[disable_id] = {"app_id": "", "build_id": "", "ro_section": {}} + + def is_app_id_disabled(bot, app_id: str) -> bool: - disabled_ids = get_disabled_ids(bot) + disabled_app_ids = [ + entry["app_id"] + for entry in get_disabled_ids(bot).values() + if len(entry["app_id"]) > 0 + ] app_id = app_id.lower() - return app_id in disabled_ids["app_id"].keys() + return app_id in disabled_app_ids def is_build_id_disabled(bot, build_id: str) -> bool: - disabled_ids = get_disabled_ids(bot) + disabled_build_ids = [ + entry["build_id"] + for entry in get_disabled_ids(bot).values() + if len(entry["build_id"]) > 0 + ] build_id = build_id.lower() if len(build_id) < 64: build_id += "0" * (64 - len(build_id)) - return build_id in disabled_ids["build_id"].keys() + return build_id in disabled_build_ids def is_ro_section_disabled(bot, ro_section: dict[str, Union[str, list[str]]]) -> bool: - disabled_ids = get_disabled_ids(bot) + disabled_ro_sections = [ + entry["ro_section"] + for entry in get_disabled_ids(bot).values() + if len(entry["ro_section"]) > 0 + ] matches = [] - for note, entry in disabled_ids["ro_section"].items(): - for key, content in entry.items(): + for disabled_ro_section in disabled_ro_sections: + for key, content in disabled_ro_section.items(): if key == "module": matches.append(ro_section[key].lower() == content.lower()) else: matches.append(ro_section[key] == content) - if all(matches): - return True - else: - matches = [] - return False + if all(matches) and len(matches) > 0: + return True + else: + matches = [] + return False -def add_disabled_app_id(bot, app_id: str, note="") -> bool: +def remove_disable_id(bot, disable_id: str) -> bool: disabled_ids = get_disabled_ids(bot) - app_id = app_id.lower() - if app_id not in disabled_ids["app_id"].keys(): - disabled_ids["app_id"][app_id] = note + if disable_id in disabled_ids.keys(): + del disabled_ids[disable_id] set_disabled_ids(bot, disabled_ids) return True return False -def remove_disabled_app_id(bot, app_id: str) -> bool: +def add_disabled_app_id(bot, disable_id: str, app_id: str) -> bool: disabled_ids = get_disabled_ids(bot) + disable_id = disable_id.lower() app_id = app_id.lower() - if app_id in disabled_ids["app_id"].keys(): - del disabled_ids["app_id"][app_id] + if not is_app_id_disabled(bot, app_id): + add_disable_id_if_necessary(disable_id, disabled_ids) + disabled_ids[disable_id]["app_id"] = app_id set_disabled_ids(bot, disabled_ids) return True return False -def add_disabled_build_id(bot, build_id: str, note="") -> bool: +def add_disabled_build_id(bot, disable_id: str, build_id: str) -> bool: disabled_ids = get_disabled_ids(bot) + disable_id = disable_id.lower() build_id = build_id.lower() if len(build_id) < 64: build_id += "0" * (64 - len(build_id)) - if build_id not in disabled_ids["build_id"].keys(): - disabled_ids["build_id"][build_id] = note + if not is_build_id_disabled(bot, build_id): + add_disable_id_if_necessary(disable_id, disabled_ids) + disabled_ids[disable_id]["build_id"] = build_id set_disabled_ids(bot, disabled_ids) return True return False -def remove_disabled_build_id(bot, build_id: str) -> bool: +def remove_disabled_app_id(bot, disable_id: str) -> bool: disabled_ids = get_disabled_ids(bot) - build_id = build_id.lower() - if len(build_id) < 64: - build_id += "0" * (64 - len(build_id)) - if build_id in disabled_ids["build_id"].keys(): - del disabled_ids["build_id"][build_id] + disable_id = disable_id.lower() + if ( + disable_id in disabled_ids.keys() + and len(disabled_ids[disable_id]["app_id"]) > 0 + ): + disabled_ids[disable_id]["app_id"] = "" + set_disabled_ids(bot, disabled_ids) + return True + return False + + +def remove_disabled_build_id(bot, disable_id: str) -> bool: + disabled_ids = get_disabled_ids(bot) + disable_id = disable_id.lower() + if ( + disable_id in disabled_ids.keys() + and len(disabled_ids[disable_id]["build_id"]) > 0 + ): + disabled_ids[disable_id]["build_id"] = "" set_disabled_ids(bot, disabled_ids) return True return False def add_disabled_ro_section( - bot, note: str, ro_section: dict[str, Union[str, list[str]]] + bot, disable_id: str, ro_section: dict[str, Union[str, list[str]]] ) -> bool: disabled_ids = get_disabled_ids(bot) - note = note.lower() - if note not in disabled_ids["ro_section"].keys(): - disabled_ids["ro_section"][note] = {} + disable_id = disable_id.lower() + add_disable_id_if_necessary(disable_id, disabled_ids) + if len(ro_section) > len(disabled_ids[disable_id]["ro_section"]): for key, content in ro_section.items(): if key == "module": - disabled_ids["ro_section"][note][key] = content.lower() + disabled_ids[disable_id]["ro_section"][key] = content.lower() else: - disabled_ids["ro_section"][note][key] = content + disabled_ids[disable_id]["ro_section"][key] = content set_disabled_ids(bot, disabled_ids) return True return False -def remove_disabled_ro_section(bot, note: str) -> bool: +def remove_disabled_ro_section(bot, disable_id: str) -> bool: disabled_ids = get_disabled_ids(bot) - note = note.lower() - if note in disabled_ids["ro_section"].keys(): - del disabled_ids["ro_section"][note] + disable_id = disable_id.lower() + if ( + disable_id in disabled_ids.keys() + and len(disabled_ids[disable_id]["ro_section"]) > 0 + ): + disabled_ids[disable_id]["ro_section"] = {} set_disabled_ids(bot, disabled_ids) return True return False diff --git a/robocop_ng/helpers/ryujinx_log_analyser.py b/robocop_ng/helpers/ryujinx_log_analyser.py new file mode 100644 index 0000000..8a153b5 --- /dev/null +++ b/robocop_ng/helpers/ryujinx_log_analyser.py @@ -0,0 +1,669 @@ +import re +from enum import IntEnum, auto +from typing import Optional, Union + +from robocop_ng.helpers.disabled_ids import is_build_id_valid + + +class CommonErrors(IntEnum): + SHADER_CACHE_COLLISION = auto() + DUMP_HASH = auto() + SHADER_CACHE_CORRUPTION = auto() + UPDATE_KEYS = auto() + FILE_PERMISSIONS = auto() + FILE_NOT_FOUND = auto() + MISSING_SERVICES = auto() + VULKAN_OUT_OF_MEMORY = auto() + + +class RyujinxVersion(IntEnum): + MASTER = auto() + OLD_MASTER = auto() + LDN = auto() + MAC = auto() + PR = auto() + CUSTOM = auto() + + +class LogAnalyser: + _log_text: str + _log_errors: list[list[str]] + _hardware_info: dict[str, Optional[str]] + _ram_available_mib: int + _emu_info: dict[str, Optional[str]] + _game_info: dict[str, Optional[str]] + _settings: dict[str, Optional[str]] + _notes: list[str] + + @staticmethod + def get_main_ro_section(log_file: str) -> Optional[dict[str, str]]: + ro_section_match = re.search( + r"PrintRoSectionInfo: main:[\r\n]*(.*)", log_file, re.DOTALL + ) + if ro_section_match is not None and len(ro_section_match.groups()) > 0: + ro_section = {"module": "", "sdk_libraries": []} + if ro_section_match.group(1) is None: + return None + for line in ro_section_match.group(1).splitlines(): + line = line.strip() + if line.startswith("Module:"): + ro_section["module"] = line[8:] + elif line.startswith("SDK Libraries:"): + ro_section["sdk_libraries"].append(line[19:]) + elif line.startswith("SDK "): + ro_section["sdk_libraries"].append(line[4:]) + else: + break + return ro_section + return None + + @staticmethod + def get_app_info( + log_file: str, + ) -> Optional[tuple[str, str, str, list[str], dict[str, str]]]: + game_name_match = re.search( + r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", + log_file, + re.MULTILINE, + ) + if game_name_match is not None and len(game_name_match.groups()) > 0: + game_name = None + app_id = None + if game_name_match.group(1) is not None: + game_name = game_name_match.group(1).rstrip() + app_id_match = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name) + if app_id_match: + app_id = app_id_match.group(1).strip().upper() + bids_match = re.search( + r"Build ids found for title ([a-zA-Z0-9]*):[\n\r]*(.*)", + log_file, + re.DOTALL, + ) + if bids_match is not None and len(bids_match.groups()) > 0: + app_id_from_bids = None + build_ids = None + if bids_match.group(1) is not None: + app_id_from_bids = bids_match.group(1).strip().upper() + if bids_match.group(2) is not None: + build_ids = [ + bid.strip().upper() + for bid in bids_match.group(2).splitlines() + if is_build_id_valid(bid.strip()) + ] + + return ( + game_name, + app_id, + app_id_from_bids, + build_ids, + LogAnalyser.get_main_ro_section(log_file), + ) + return None + + @staticmethod + def contains_errors(search_terms, errors): + for term in search_terms: + for error_lines in errors: + line = "\n".join(error_lines) + if term in line: + return True + return False + + def __init__(self, log_text: Union[str, list[str]]): + self.__init_members() + + if isinstance(log_text, str): + self._log_text = log_text.replace("\r\n", "\n") + elif isinstance(log_text, list): + self._log_text = "\n".join(log_text) + else: + raise TypeError(log_text) + + # Large files show a header value when not downloaded completely + # this regex makes sure that the log text to read starts from the first timestamp, ignoring headers + log_file_header_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL) + log_file_match = re.search(log_file_header_regex, self._log_text) + if log_file_match and log_file_match.group(0) is not None: + self._log_text = log_file_match.group(0) + else: + raise ValueError("No log entries found.") + + self.__get_errors() + self.__get_settings_info() + self.__get_hardware_info() + self.__get_ryujinx_info() + self.__get_app_name() + self.__get_mods() + self.__get_cheats() + self.__get_notes() + + def __init_members(self): + self._hardware_info = { + "cpu": "Unknown", + "gpu": "Unknown", + "ram": "Unknown", + "os": "Unknown", + } + self._emu_info = { + "ryu_version": "Unknown", + "ryu_firmware": "Unknown", + "logs_enabled": None, + } + self._game_info = { + "game_name": "Unknown", + "errors": "No errors found in log", + "mods": "No mods found", + "cheats": "No cheats found", + } + self._settings = { + "audio_backend": "Unknown", + "backend_threading": "Unknown", + "docked": "Unknown", + "expand_ram": "Unknown", + "fs_integrity": "Unknown", + "graphics_backend": "Unknown", + "ignore_missing_services": "Unknown", + "memory_manager": "Unknown", + "pptc": "Unknown", + "shader_cache": "Unknown", + "vsync": "Unknown", + "resolution_scale": "Unknown", + "anisotropic_filtering": "Unknown", + "aspect_ratio": "Unknown", + "texture_recompression": "Unknown", + } + self._ram_available_mib = -1 + self._notes = [] + self._log_errors = [] + + def __get_errors(self): + errors = [] + curr_error_lines = [] + error_line = False + for line in self._log_text.splitlines(): + if len(line.strip()) == 0: + continue + if "|E|" in line: + curr_error_lines = [line] + errors.append(curr_error_lines) + error_line = True + elif error_line and line[0] == " ": + curr_error_lines.append(line) + if len(curr_error_lines) > 0: + errors.append(curr_error_lines) + + self._log_errors = errors + + def __get_hardware_info(self): + for setting in self._hardware_info.keys(): + match setting: + case "cpu": + cpu_match = re.search( + r"CPU:\s([^;\n\r]*)", self._log_text, re.MULTILINE + ) + if cpu_match is not None and cpu_match.group(1) is not None: + self._hardware_info[setting] = cpu_match.group(1).rstrip() + + case "ram": + ram_match = re.search( + r"RAM:\s(Total)?\s([^;\n\r]*)\s;\s(Available)?\s(\d+)", + self._log_text, + re.MULTILINE, + ) + if ram_match is not None and ram_match.group(2) is not None: + self._hardware_info[setting] = ram_match.group(2).rstrip() + ram_available = ram_match.group(4).rstrip() + if ram_available.isnumeric(): + self._ram_available_mib = int(ram_available) + + case "os": + os_match = re.search( + r"Operating System:\s([^;\n\r]*)", + self._log_text, + re.MULTILINE, + ) + if os_match is not None and os_match.group(1) is not None: + self._hardware_info[setting] = os_match.group(1).rstrip() + + case "gpu": + gpu_match = re.search( + r"PrintGpuInformation:\s([^;\n\r]*)", + self._log_text, + re.MULTILINE, + ) + if gpu_match is not None and gpu_match.group(1) is not None: + self._hardware_info[setting] = gpu_match.group(1).rstrip() + + case _: + raise NotImplementedError(setting) + + def __get_ryujinx_info(self): + for setting in self._emu_info.keys(): + match setting: + case "ryu_version": + for line in self._log_text.splitlines(): + if "Ryujinx Version:" in line: + self._emu_info[setting] = line.split()[-1].strip() + break + + case "logs_enabled": + logs_match = re.search( + r"Logs Enabled:\s([^;\n\r]*)", self._log_text, re.MULTILINE + ) + if logs_match is not None and logs_match.group(1) is not None: + self._emu_info[setting] = logs_match.group(1).rstrip() + + case "ryu_firmware": + for line in self._log_text.splitlines(): + if "Firmware Version:" in line: + self._emu_info[setting] = line.split()[-1].strip() + break + + case _: + raise NotImplementedError(setting) + + def __get_setting_value(self, name, key): + values = [ + line.split()[-1] + for line in self._log_text.splitlines() + if re.search(rf"LogValueChange: ({key})\s", line) + ] + if len(values) > 0: + value = values[-1] + else: + return None + + match name: + case "docked": + return "Docked" if value == "True" else "Handheld" + + case "resolution_scale": + resolution_map = { + "-1": "Custom", + "1": "Native (720p/1080p)", + "2": "2x (1440p/2160p)", + "3": "3x (2160p/3240p)", + "4": "4x (2880p/4320p)", + } + if value in resolution_map.keys(): + return resolution_map[value] + else: + return "Custom" + + case "anisotropic_filtering": + anisotropic_map = { + "-1": "Auto", + "2": "2x", + "4": "4x", + "8": "8x", + "16": "16x", + } + if value in anisotropic_map.keys(): + return anisotropic_map[value] + else: + return "Auto" + + case "aspect_ratio": + aspect_map = { + "Fixed4x3": "4:3", + "Fixed16x9": "16:9", + "Fixed16x10": "16:10", + "Fixed21x9": "21:9", + "Fixed32x9": "32:9", + "Stretched": "Stretch to Fit Window", + } + if value in aspect_map.keys(): + return aspect_map[value] + else: + return "Unknown" + + case "pptc" | "shader_cache" | "texture_recompression" | "vsync": + return "Enabled" if value == "True" else "Disabled" + + case _: + return value + + def __get_settings_info(self): + settings_map = { + "anisotropic_filtering": "MaxAnisotropy", + "aspect_ratio": "AspectRatio", + "audio_backend": "AudioBackend", + "backend_threading": "BackendThreading", + "docked": "EnableDockedMode", + "expand_ram": "ExpandRam", + "fs_integrity": "EnableFsIntegrityChecks", + "graphics_backend": "GraphicsBackend", + "ignore_missing_services": "IgnoreMissingServices", + "memory_manager": "MemoryManagerMode", + "pptc": "EnablePtc", + "resolution_scale": "ResScale", + "shader_cache": "EnableShaderCache", + "texture_recompression": "EnableTextureRecompression", + "vsync": "EnableVsync", + } + + for key in self._settings.keys(): + if key in settings_map: + self._settings[key] = self.__get_setting_value(key, settings_map[key]) + else: + raise NotImplementedError(key) + + def __get_mods(self): + mods_regex = re.compile(r"Found mod\s\'(.+?)\'\s(\[.+?\])") + matches = re.findall(mods_regex, self._log_text) + if matches: + mods = [{"mod": match[0], "status": match[1]} for match in matches] + mods_status = [ + f"ℹ️ {i['mod']} ({'ExeFS' if i['status'] == '[E]' else 'RomFS'})" + for i in mods + ] + # Remove duplicated mods from output + mods_status = list(dict.fromkeys(mods_status)) + + self._game_info["mods"] = "\n".join(mods_status) + + def __get_cheats(self): + cheat_regex = re.compile(r"Tampering program\s?") + matches = re.findall(cheat_regex, self._log_text) + if matches: + cheats = [f"ℹ️ {match}" for match in matches] + + self._game_info["cheats"] = "\n".join(cheats) + + def __get_app_name(self): + app_match = re.search( + r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", + self._log_text, + re.MULTILINE, + ) + if app_match is not None and app_match.group(1) is not None: + self._game_info["game_name"] = app_match.group(1).rstrip() + + def __get_controller_notes(self): + controllers_regex = re.compile(r"Hid Configure: ([^\r\n]+)") + controllers = re.findall(controllers_regex, self._log_text) + if controllers: + input_status = [f"ℹ {match}" for match in controllers] + # Hid Configure lines can appear multiple times, so converting to dict keys removes duplicate entries, + # also maintains the list order + input_status = list(dict.fromkeys(input_status)) + self._notes.append("\n".join(input_status)) + # If emulator crashes on startup without game load, there is no need to show controller notification at all + elif self._game_info["game_name"] != "Unknown": + self._notes.append("⚠️ No controller information found") + + def __get_os_notes(self): + if ( + "Windows" in self._hardware_info["os"] + and self._settings["graphics_backend"] != "Vulkan" + ): + if "Intel" in self._hardware_info["gpu"]: + self._notes.append( + "**⚠️ Intel iGPU users should consider using Vulkan graphics backend**" + ) + if "AMD" in self._hardware_info["gpu"]: + self._notes.append( + "**⚠️ AMD GPU users should consider using Vulkan graphics backend**" + ) + + def __get_log_notes(self): + default_logs = ["Info", "Warning", "Error", "Guest", "Stub"] + user_logs = [] + if self._emu_info["logs_enabled"] is not None: + user_logs = ( + self._emu_info["logs_enabled"].rstrip().replace(" ", "").split(",") + ) + + if "Debug" in user_logs: + self._notes.append( + "⚠️ **Debug logs enabled will have a negative impact on performance**" + ) + + disabled_logs = set(default_logs).difference(set(user_logs)) + if disabled_logs: + logs_status = [f"⚠️ {log} log is not enabled" for log in disabled_logs] + log_string = "\n".join(logs_status) + else: + log_string = "✅ Default logs enabled" + + self._notes.append(log_string) + + def __get_settings_notes(self): + if self._settings["audio_backend"] == "Dummy": + self._notes.append( + "⚠️ Dummy audio backend, consider changing to SDL2 or OpenAL" + ) + + if self._settings["pptc"] == "Disabled": + self._notes.append("🔴 **PPTC cache should be enabled**") + + if self._settings["shader_cache"] == "Disabled": + self._notes.append("🔴 **Shader cache should be enabled**") + + if self._settings["expand_ram"] == "True": + self._notes.append( + "⚠️ `Use alternative memory layout` should only be enabled for 4K mods" + ) + + if self._settings["memory_manager"] == "SoftwarePageTable": + self._notes.append( + "🔴 **`Software` setting in Memory Manager Mode will give slower performance than the default setting of `Host unchecked`**" + ) + + if self._settings["ignore_missing_services"] == "True": + self._notes.append( + "⚠️ `Ignore Missing Services` being enabled can cause instability" + ) + + if self._settings["vsync"] == "Disabled": + self._notes.append( + "⚠️ V-Sync disabled can cause instability like games running faster than intended or longer load times" + ) + + if self._settings["fs_integrity"] == "Disabled": + self._notes.append( + "⚠️ Disabling file integrity checks may cause corrupted dumps to not be detected" + ) + + if self._settings["backend_threading"] == "Off": + self._notes.append( + "🔴 **Graphics Backend Multithreading should be set to `Auto`**" + ) + + def __sort_notes(self): + def severity(log_note_string): + symbols = ["❌", "🔴", "⚠️", "ℹ", "✅"] + return next( + i for i, symbol in enumerate(symbols) if symbol in log_note_string + ) + + game_notes = [note for note in self._notes] + # Warnings split on the string after the warning symbol for alphabetical ordering + # Severity key then orders alphabetically sorted warnings to show most severe first + return sorted(sorted(game_notes, key=lambda x: x.split()[1]), key=severity) + + def __get_notes(self): + for common_error in self.get_common_errors(): + match common_error: + case CommonErrors.SHADER_CACHE_COLLISION: + self._notes.append( + "⚠️ Cache collision detected. Investigate possible shader cache issues" + ) + case CommonErrors.SHADER_CACHE_CORRUPTION: + self._notes.append( + "⚠️ Cache corruption detected. Investigate possible shader cache issues" + ) + case CommonErrors.DUMP_HASH: + self._notes.append( + "⚠️ Dump error detected. Investigate possible bad game/firmware dump issues" + ) + case CommonErrors.UPDATE_KEYS: + self._notes.append( + "⚠️ Keys or firmware out of date, consider updating them" + ) + case CommonErrors.FILE_PERMISSIONS: + self._notes.append( + "⚠️ File permission error. Consider deleting save directory and allowing Ryujinx to make a new one" + ) + case CommonErrors.FILE_NOT_FOUND: + self._notes.append( + "⚠️ Save not found error. Consider starting game without a save file or using a new save file⚠️ Save not found error. Consider starting game without a save file or using a new save file" + ) + case CommonErrors.MISSING_SERVICES: + if self._settings["ignore_missing_services"] == "False": + self._notes.append( + "⚠️ Consider enabling `Ignore Missing Services` in Ryujinx settings" + ) + case CommonErrors.VULKAN_OUT_OF_MEMORY: + if self._settings["texture_recompression"] == "Disabled": + self._notes.append( + "⚠️ Consider enabling `Texture Recompression` in Ryujinx settings" + ) + case _: + raise NotImplementedError(common_error) + + timestamp_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}") + latest_timestamp = re.findall(timestamp_regex, self._log_text)[-1] + if latest_timestamp: + timestamp_message = f"ℹ️ Time elapsed: `{latest_timestamp}`" + self._notes.append(timestamp_message) + + if self.is_default_user_profile(): + self._notes.append( + "⚠️ Default user profile in use, consider creating a custom one." + ) + + if 8192 > self._ram_available_mib > -1: + self._notes.append( + f"⚠️ Less than 8GB RAM available ({self._ram_available_mib} MB)" + ) + + self.__get_controller_notes() + self.__get_os_notes() + + if ( + self._emu_info["ryu_firmware"] == "Unknown" + and self._game_info["game_name"] != "Unknown" + ): + firmware_warning = f"**❌ Nintendo Switch firmware not found**" + self._notes.append(firmware_warning) + + self.__get_settings_notes() + if self.get_ryujinx_version() == RyujinxVersion.CUSTOM: + self._notes.append("**⚠️ Custom builds are not officially supported**") + + def get_ryujinx_version(self): + mainline_version = re.compile(r"^\d\.\d\.\d+$") + old_mainline_version = re.compile(r"^\d\.\d\.(\d){4}$") + pr_version = re.compile(r"^\d\.\d\.\d\+([a-f]|\d){7}$") + ldn_version = re.compile(r"^\d\.\d\.\d-ldn\d+\.\d+(?:\.\d+|$)") + mac_version = re.compile(r"^\d\.\d\.\d-macos\d+(?:\.\d+(?:\.\d+|$)|$)") + + if re.match(mainline_version, self._emu_info["ryu_version"]): + return RyujinxVersion.MASTER + elif re.match(old_mainline_version, self._emu_info["ryu_version"]): + return RyujinxVersion.OLD_MASTER + elif re.match(mac_version, self._emu_info["ryu_version"]): + return RyujinxVersion.MAC + elif re.match(ldn_version, self._emu_info["ryu_version"]): + return RyujinxVersion.LDN + elif re.match(pr_version, self._emu_info["ryu_version"]): + return RyujinxVersion.PR + else: + return RyujinxVersion.CUSTOM + + def is_default_user_profile(self) -> bool: + return ( + re.search(r"UserId: 00000000000000010000000000000000", self._log_text) + is not None + ) + + def get_last_error(self) -> Optional[list[str]]: + return self._log_errors[-1] if len(self._log_errors) > 0 else None + + def get_common_errors(self) -> list[CommonErrors]: + errors = [] + + if self.contains_errors(["Cache collision found"], self._log_errors): + errors.append(CommonErrors.SHADER_CACHE_COLLISION) + if self.contains_errors( + [ + "ResultFsInvalidIvfcHash", + "ResultFsNonRealDataVerificationFailed", + ], + self._log_errors, + ): + errors.append(CommonErrors.DUMP_HASH) + if self.contains_errors( + [ + "Ryujinx.Graphics.Gpu.Shader.ShaderCache.Initialize()", + "System.IO.InvalidDataException: End of Central Directory record could not be found", + "ICSharpCode.SharpZipLib.Zip.ZipException: Cannot find central directory", + ], + self._log_errors, + ): + errors.append(CommonErrors.SHADER_CACHE_CORRUPTION) + if self.contains_errors(["MissingKeyException"], self._log_errors): + errors.append(CommonErrors.UPDATE_KEYS) + if self.contains_errors(["ResultFsPermissionDenied"], self._log_errors): + errors.append(CommonErrors.FILE_PERMISSIONS) + if self.contains_errors(["ResultFsTargetNotFound"], self._log_errors): + errors.append(CommonErrors.FILE_NOT_FOUND) + if self.contains_errors(["ServiceNotImplementedException"], self._log_errors): + errors.append(CommonErrors.MISSING_SERVICES) + if self.contains_errors(["ErrorOutOfDeviceMemory"], self._log_errors): + errors.append(CommonErrors.VULKAN_OUT_OF_MEMORY) + + return errors + + def analyse_discord( + self, is_channel_allowed: bool, pr_channel: int + ) -> dict[str, dict[str, str]]: + last_error = self.get_last_error() + if last_error is not None: + last_error = last_error[:2] + self._game_info["errors"] = f"```\n{last_error}\n```" + else: + self._game_info["errors"] = "No errors found in log" + + # Limit mods and cheats to 5 entries + mods = self._game_info["mods"].splitlines() + cheats = self._game_info["cheats"].splitlines() + if len(mods) > 5: + limit_mods = mods[:5] + limit_mods.append(f"✂️ {len(mods) - 5} other mods") + self._game_info["mods"] = "\n".join(limit_mods) + if len(cheats) > 5: + limit_cheats = cheats[:5] + limit_cheats.append(f"✂️ {len(cheats) - 5} other cheats") + self._game_info["cheats"] = "\n".join(limit_cheats) + + if is_channel_allowed and self.get_ryujinx_version() == RyujinxVersion.PR: + self._notes.append( + f"**⚠️ PR build logs should be posted in <#{pr_channel}> if reporting bugs or tests**" + ) + + self._notes = self.__sort_notes() + full_game_info = self._game_info + full_game_info["notes"] = ( + "\n".join(self._notes) if len(self._notes) > 0 else "Nothing to note" + ) + + return { + "hardware_info": self._hardware_info, + "emu_info": self._emu_info, + "game_info": full_game_info, + "settings": self._settings, + } + + def analyse(self) -> dict[str, Union[dict[str, str], list[str], list[list[str]]]]: + self._notes = list(self.__sort_notes()) + + return { + "hardware_info": self._hardware_info, + "emu_info": self._emu_info, + "game_info": self._game_info, + "notes": self._notes, + "errors": self._log_errors, + "settings": self._settings, + }