ryuko-ng/robocop_ng/cogs/logfilereader.py
TSRBerry 9cd2d6550d
Small logfilereader fixes and improvements (#49)
* Fix cheat_information regex

* Add warning about default user profiles to notes

* Fix disable_ro_section issues

* Apply black formatting
2023-05-04 00:40:41 +02:00

1203 lines
54 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import re
from typing import Optional
import aiohttp
from discord import Colour, Embed, Message, Attachment
from discord.ext import commands
from discord.ext.commands import Cog, Context
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.disabled_ids import (
add_disabled_app_id,
is_app_id_valid,
remove_disabled_app_id,
get_disabled_ids,
is_app_id_disabled,
is_build_id_valid,
add_disabled_build_id,
remove_disabled_build_id,
is_build_id_disabled,
is_ro_section_disabled,
is_ro_section_valid,
add_disabled_ro_section,
remove_disabled_ro_section,
)
logging.basicConfig(
format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)",
level=logging.INFO,
)
class LogFileReader(Cog):
@staticmethod
def is_valid_log_name(attachment: Attachment) -> tuple[bool, bool]:
filename = attachment.filename
ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log$")
log_file = re.compile(r"^.*\.log|.*\.txt$")
is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename) is not None
is_log_file = re.match(log_file, filename) is not None
return is_log_file, is_ryujinx_log_file
def __init__(self, bot):
self.bot = bot
self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels
self.disallowed_named_roles = ["pirate"]
self.ryujinx_blue = Colour(0x4A90E2)
self.uploaded_log_info = []
self.disallowed_roles = [
self.bot.config.named_roles[x] for x in self.disallowed_named_roles
]
async def download_file(self, log_url):
async with aiohttp.ClientSession() as session:
# Grabs first and last few bytes of log file to prevent abuse from large files
headers = {"Range": "bytes=0-60000, -6000"}
async with session.get(log_url, headers=headers) as response:
return await response.text("UTF-8")
def get_main_ro_section(self, log_file: str) -> Optional[dict[str, str]]:
ro_section_regex = re.search(
r"PrintRoSectionInfo: main:[\r\n]*(.*)", log_file, re.DOTALL
)
if ro_section_regex is not None and len(ro_section_regex.groups()) > 0:
ro_section = {"module": "", "sdk_libraries": []}
for line in ro_section_regex.group(1).splitlines():
line = line.strip()
if line.startswith("Module:"):
ro_section["module"] = line[8:]
elif line.startswith("SDK Libraries:"):
ro_section["sdk_libraries"].append(line[19:])
elif line.startswith("SDK "):
ro_section["sdk_libraries"].append(line[4:])
else:
break
return ro_section
return None
def get_app_info(
self, log_file: str
) -> Optional[tuple[str, str, list[str], dict[str, str]]]:
game_name = re.search(
r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
if game_name is not None and len(game_name.groups()) > 0:
game_name = game_name.group(1).rstrip()
app_id_regex = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name)
if app_id_regex:
app_id = app_id_regex.group(1).strip().upper()
else:
app_id = None
bids_regex = re.search(
r"Build ids found for title ([a-zA-Z0-9]*):[\n\r]*(.*)",
log_file,
re.DOTALL,
)
if bids_regex is not None and len(bids_regex.groups()) > 0:
app_id_from_bids = bids_regex.group(1).strip().upper()
build_ids = [
bid.strip().upper()
for bid in bids_regex.group(2).splitlines()
if is_build_id_valid(bid.strip())
]
# TODO: Check if self.get_main_ro_section() is None and return an error
return (
app_id,
app_id_from_bids,
build_ids,
self.get_main_ro_section(log_file),
)
return None
def is_log_valid(self, log_file: str) -> bool:
app_info = self.get_app_info(log_file)
if app_info is None:
return True
app_id, another_app_id, _, _ = app_info
return app_id == another_app_id
def is_game_blocked(self, log_file: str) -> bool:
app_info = self.get_app_info(log_file)
if app_info is None:
return False
app_id, another_app_id, build_ids, main_ro_section = app_info
if is_app_id_disabled(self.bot, app_id) or is_app_id_disabled(
self.bot, another_app_id
):
return True
for bid in build_ids:
if is_build_id_disabled(self.bot, bid):
return True
return is_ro_section_disabled(self.bot, main_ro_section)
async def blocked_game_action(self, message: Message) -> Embed:
warn_command = self.bot.get_command("warn")
if warn_command is not None:
warn_message = await message.reply(
".warn This log contains a blocked game."
)
warn_context = await self.bot.get_context(warn_message)
await warn_context.invoke(
warn_command,
target=None,
reason="This log contains a blocked game.",
)
else:
logging.error(
f"Couldn't find 'warn' command. Unable to warn {message.author}."
)
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
await message.author.add_roles(pirate_role)
embed = Embed(
title="⛔ Blocked game detected ⛔",
colour=Colour(0xFF0000),
description="This log contains a blocked game and has been removed.\n"
"The user has been warned and the pirate role was applied.",
)
embed.set_footer(text=f"Log uploaded by @{message.author.name}")
await message.delete()
return embed
async def log_file_read(self, message):
self.embed = {
"hardware_info": {
"cpu": "Unknown",
"gpu": "Unknown",
"ram": "Unknown",
"os": "Unknown",
},
"emu_info": {
"ryu_version": "Unknown",
"ryu_firmware": "Unknown",
"logs_enabled": None,
},
"game_info": {
"game_name": "Unknown",
"errors": "No errors found in log",
"mods": "No mods found",
"cheats": "No cheats found",
"notes": [],
},
"settings": {
"audio_backend": "Unknown",
"backend_threading": "Unknown",
"docked": "Unknown",
"expand_ram": "Unknown",
"fs_integrity": "Unknown",
"graphics_backend": "Unknown",
"ignore_missing_services": "Unknown",
"memory_manager": "Unknown",
"pptc": "Unknown",
"shader_cache": "Unknown",
"vsync": "Unknown",
"resolution_scale": "Unknown",
"anisotropic_filtering": "Unknown",
"aspect_ratio": "Unknown",
"texture_recompression": "Unknown",
},
}
attached_log = message.attachments[0]
author_name = f"@{message.author.name}"
log_file = await self.download_file(attached_log.url)
# Large files show a header value when not downloaded completely
# this regex makes sure that the log text to read starts from the first timestamp, ignoring headers
log_file_header_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL)
log_file_match = re.search(log_file_header_regex, log_file)
if log_file_match:
log_file = log_file_match.group(0)
else:
return Embed(
colour=self.ryujinx_blue,
description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.",
)
def get_hardware_info(log_file=log_file):
for setting in self.embed["hardware_info"]:
try:
if setting == "cpu":
self.embed["hardware_info"][setting] = (
re.search(r"CPU:\s([^;\n\r]*)", log_file, re.MULTILINE)
.group(1)
.rstrip()
)
if setting == "ram":
self.embed["hardware_info"][setting] = (
re.search(
r"RAM:(\sTotal)?\s([^;\n\r]*)", log_file, re.MULTILINE
)
.group(2)
.rstrip()
)
if setting == "os":
self.embed["hardware_info"][setting] = (
re.search(
r"Operating System:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
.group(1)
.rstrip()
)
if setting == "gpu":
self.embed["hardware_info"][setting] = (
re.search(
r"PrintGpuInformation:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
.group(1)
.rstrip()
)
except AttributeError:
continue
def get_ryujinx_info(log_file=log_file):
for setting in self.embed["emu_info"]:
try:
if setting == "ryu_version":
self.embed["emu_info"][setting] = [
line.split()[-1]
for line in log_file.splitlines()
if "Ryujinx Version:" in line
][0]
if setting == "logs_enabled":
self.embed["emu_info"][setting] = (
re.search(
r"Logs Enabled:\s([^;\n\r]*)", log_file, re.MULTILINE
)
.group(1)
.rstrip()
)
if setting == "ryu_firmware":
self.embed["emu_info"]["ryu_firmware"] = [
line.split()[-1]
for line in log_file.splitlines()
if "Firmware Version:" in line
][0]
except (AttributeError, IndexError):
continue
def format_log_embed():
cleaned_game_name = re.sub(
r"\s\[(64|32)-bit\]$", "", self.embed["game_info"]["game_name"]
)
self.embed["game_info"]["game_name"] = cleaned_game_name
hardware_info = " | ".join(
(
f"**CPU:** {self.embed['hardware_info']['cpu']}",
f"**GPU:** {self.embed['hardware_info']['gpu']}",
f"**RAM:** {self.embed['hardware_info']['ram']}",
f"**OS:** {self.embed['hardware_info']['os']}",
)
)
system_settings_info = "\n".join(
(
f"**Audio Backend:** `{self.embed['settings']['audio_backend']}`",
f"**Console Mode:** `{self.embed['settings']['docked']}`",
f"**PPTC Cache:** `{self.embed['settings']['pptc']}`",
f"**Shader Cache:** `{self.embed['settings']['shader_cache']}`",
f"**V-Sync:** `{self.embed['settings']['vsync']}`",
)
)
graphics_settings_info = "\n".join(
(
f"**Graphics Backend:** `{self.embed['settings']['graphics_backend']}`",
f"**Resolution:** `{self.embed['settings']['resolution_scale']}`",
f"**Anisotropic Filtering:** `{self.embed['settings']['anisotropic_filtering']}`",
f"**Aspect Ratio:** `{self.embed['settings']['aspect_ratio']}`",
f"**Texture Recompression:** `{self.embed['settings']['texture_recompression']}`",
)
)
ryujinx_info = " | ".join(
(
f"**Version:** {self.embed['emu_info']['ryu_version']}",
f"**Firmware:** {self.embed['emu_info']['ryu_firmware']}",
)
)
log_embed = Embed(title=f"{cleaned_game_name}", colour=self.ryujinx_blue)
log_embed.set_footer(text=f"Log uploaded by {author_name}")
log_embed.add_field(
name="General Info",
value=" | ".join((ryujinx_info, hardware_info)),
inline=False,
)
log_embed.add_field(
name="System Settings",
value=system_settings_info,
inline=True,
)
log_embed.add_field(
name="Graphics Settings",
value=graphics_settings_info,
inline=True,
)
if (
cleaned_game_name == "Unknown"
and self.embed["game_info"]["errors"] == "No errors found in log"
):
log_embed.add_field(
name="Empty Log",
value=f"""The log file appears to be empty. To get a proper log, follow these steps:
1) In Logging settings, ensure `Enable Logging to File` is checked.
2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`.
3) Start a game up.
4) Play until your issue occurs.
5) Upload the latest log file which is larger than 2KB.""",
inline=False,
)
if (
cleaned_game_name == "Unknown"
and self.embed["game_info"]["errors"] != "No errors found in log"
):
log_embed.add_field(
name="Latest Error Snippet",
value=self.embed["game_info"]["errors"],
inline=False,
)
log_embed.add_field(
name="No Game Boot Detected",
value=f"""No game boot has been detected in log file. To get a proper log, follow these steps:
1) In Logging settings, ensure `Enable Logging to File` is checked.
2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`.
3) Start a game up.
4) Play until your issue occurs.
5) Upload the latest log file which is larger than 3KB.""",
inline=False,
)
else:
log_embed.add_field(
name="Latest Error Snippet",
value=self.embed["game_info"]["errors"],
inline=False,
)
log_embed.add_field(
name="Mods", value=self.embed["game_info"]["mods"], inline=False
)
log_embed.add_field(
name="Cheats", value=self.embed["game_info"]["cheats"], inline=False
)
try:
notes_value = "\n".join(game_notes)
except TypeError:
notes_value = "Nothing to note"
log_embed.add_field(
name="Notes",
value=notes_value,
inline=False,
)
return log_embed
def analyse_log(log_file=log_file):
try:
for setting_name in self.embed["settings"]:
# Some log info may be missing for users that use older versions of Ryujinx, so reading the settings is not always possible.
# As settings are initialized with "Unknown" values, False should not be an issue for setting.get()
def get_setting(name, setting_string, log_file=log_file):
setting = self.embed["settings"]
setting_value = [
line.split()[-1]
for line in log_file.splitlines()
if re.search(rf"LogValueChange: ({setting_string})\s", line)
][-1]
if setting_value and setting.get(name):
setting[name] = setting_value
if name == "docked":
setting[
name
] = f"{'Docked' if setting_value == 'True' else 'Handheld'}"
if name == "resolution_scale":
resolution_map = {
"-1": "Custom",
"1": "Native (720p/1080p)",
"2": "2x (1440p/2160p)",
"3": "3x (2160p/3240p)",
"4": "4x (2880p/4320p)",
}
setting[name] = resolution_map[setting_value]
if name == "anisotropic_filtering":
anisotropic_map = {
"-1": "Auto",
"2": "2x",
"4": "4x",
"8": "8x",
"16": "16x",
}
setting[name] = anisotropic_map[setting_value]
if name == "aspect_ratio":
aspect_map = {
"Fixed4x3": "4:3",
"Fixed16x9": "16:9",
"Fixed16x10": "16:10",
"Fixed21x9": "21:9",
"Fixed32x9": "32:9",
"Stretched": "Stretch to Fit Window",
}
setting[name] = aspect_map[setting_value]
if name in [
"pptc",
"shader_cache",
"texture_recompression",
"vsync",
]:
setting[
name
] = f"{'Enabled' if setting_value == 'True' else 'Disabled'}"
return setting[name]
setting_map = {
"anisotropic_filtering": "MaxAnisotropy",
"aspect_ratio": "AspectRatio",
"audio_backend": "AudioBackend",
"backend_threading": "BackendThreading",
"docked": "EnableDockedMode",
"expand_ram": "ExpandRam",
"fs_integrity": "EnableFsIntegrityChecks",
"graphics_backend": "GraphicsBackend",
"ignore_missing_services": "IgnoreMissingServices",
"memory_manager": "MemoryManagerMode",
"pptc": "EnablePtc",
"resolution_scale": "ResScale",
"shader_cache": "EnableShaderCache",
"texture_recompression": "EnableTextureRecompression",
"vsync": "EnableVsync",
}
try:
self.embed[setting_name] = get_setting(
setting_name, setting_map[setting_name], log_file=log_file
)
except (AttributeError, IndexError) as error:
logging.info(
f"Settings exception: {setting_name}: {type(error).__name__}"
)
continue
def analyse_error_message(log_file=log_file):
try:
errors = []
curr_error_lines = []
for line in log_file.splitlines():
if line == "":
continue
if "|E|" in line:
curr_error_lines = [line]
errors.append(curr_error_lines)
elif line[0] == " " or line == "":
curr_error_lines.append(line)
def error_search(search_terms):
for term in search_terms:
for error_lines in errors:
line = "\n".join(error_lines)
if term in line:
return True
return False
shader_cache_collision = error_search(["Cache collision found"])
dump_hash_error = error_search(
[
"ResultFsInvalidIvfcHash",
"ResultFsNonRealDataVerificationFailed",
]
)
shader_cache_corruption = error_search(
[
"Ryujinx.Graphics.Gpu.Shader.ShaderCache.Initialize()",
"System.IO.InvalidDataException: End of Central Directory record could not be found",
"ICSharpCode.SharpZipLib.Zip.ZipException: Cannot find central directory",
]
)
update_keys_error = error_search(["MissingKeyException"])
file_permissions_error = error_search(
["ResultFsPermissionDenied"]
)
file_not_found_error = error_search(["ResultFsTargetNotFound"])
missing_services_error = error_search(
["ServiceNotImplementedException"]
)
vulkan_out_of_memory_error = error_search(
["ErrorOutOfDeviceMemory"]
)
last_errors = "\n".join(
errors[-1][:2] if "|E|" in errors[-1][0] else ""
)
except IndexError:
last_errors = None
return (
last_errors,
shader_cache_collision,
dump_hash_error,
shader_cache_corruption,
update_keys_error,
file_permissions_error,
file_not_found_error,
missing_services_error,
vulkan_out_of_memory_error,
)
# Finds the latest error denoted by |E| in the log and its first line
# Also warns of common issues
(
last_error_snippet,
shader_cache_warn,
dump_hash_warning,
shader_cache_corruption_warn,
update_keys_warn,
file_permissions_warn,
file_not_found_warn,
missing_services_warn,
vulkan_out_of_memory_warn,
) = analyse_error_message()
if last_error_snippet:
self.embed["game_info"]["errors"] = f"```{last_error_snippet}```"
else:
pass
# Game name parsed last so that user settings are visible with empty log
try:
self.embed["game_info"]["game_name"] = (
re.search(
r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)",
log_file,
re.MULTILINE,
)
.group(1)
.rstrip()
)
except AttributeError:
pass
if shader_cache_warn:
shader_cache_warn = f"⚠️ Cache collision detected. Investigate possible shader cache issues"
self.embed["game_info"]["notes"].append(shader_cache_warn)
if shader_cache_corruption_warn:
shader_cache_corruption_warn = f"⚠️ Cache corruption detected. Investigate possible shader cache issues"
self.embed["game_info"]["notes"].append(
shader_cache_corruption_warn
)
if dump_hash_warning:
dump_hash_warning = f"⚠️ Dump error detected. Investigate possible bad game/firmware dump issues"
self.embed["game_info"]["notes"].append(dump_hash_warning)
if update_keys_warn:
update_keys_warn = (
f"⚠️ Keys or firmware out of date, consider updating them"
)
self.embed["game_info"]["notes"].append(update_keys_warn)
if file_permissions_warn:
file_permissions_warn = f"⚠️ File permission error. Consider deleting save directory and allowing Ryujinx to make a new one"
self.embed["game_info"]["notes"].append(file_permissions_warn)
if file_not_found_warn:
file_not_found_warn = f"⚠️ Save not found error. Consider starting game without a save file or using a new save file"
self.embed["game_info"]["notes"].append(file_not_found_warn)
if (
missing_services_warn
and self.embed["settings"]["ignore_missing_services"] == "False"
):
missing_services_warn = f"⚠️ Consider enabling `Ignore Missing Services` in Ryujinx settings"
self.embed["game_info"]["notes"].append(missing_services_warn)
if (
vulkan_out_of_memory_warn
and self.embed["settings"]["texture_recompression"] == "Disabled"
):
vulkan_out_of_memory_warn = f"⚠️ Consider enabling `Texture Recompression` in Ryujinx settings"
self.embed["game_info"]["notes"].append(vulkan_out_of_memory_warn)
timestamp_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}")
latest_timestamp = re.findall(timestamp_regex, log_file)[-1]
if latest_timestamp:
timestamp_message = f" Time elapsed: `{latest_timestamp}`"
self.embed["game_info"]["notes"].append(timestamp_message)
def mods_information(log_file=log_file):
mods_regex = re.compile(r"Found mod\s\'(.+?)\'\s(\[.+?\])")
matches = re.findall(mods_regex, log_file)
if matches:
mods = [
{"mod": match[0], "status": match[1]} for match in matches
]
mods_status = [
f" {i['mod']} ({'ExeFS' if i['status'] == '[E]' else 'RomFS'})"
for i in mods
]
# Remove duplicated mods from output
mods_status = list(dict.fromkeys(mods_status))
return mods_status
def cheat_information(log_file=log_file):
cheat_regex = re.compile(r"Installing cheat\s\'<?(.+?)>?\'")
matches = re.findall(cheat_regex, log_file)
if matches:
cheats = [f" {match}" for match in matches]
return list(set(cheats))
game_mods = mods_information()
if game_mods:
self.embed["game_info"]["mods"] = "\n".join(game_mods)
game_cheats = cheat_information()
if game_cheats:
self.embed["game_info"]["cheats"] = "\n".join(game_cheats)
if (
re.search(r"UserId: 00000000000000010000000000000000", log_file)
is not None
):
self.embed["game_info"]["notes"].append(
"⚠️ Default user profile in use, consider creating a custom one."
)
controllers_regex = re.compile(r"Hid Configure: ([^\r\n]+)")
controllers = re.findall(controllers_regex, log_file)
if controllers:
input_status = [f" {match}" for match in controllers]
# Hid Configure lines can appear multiple times, so converting to dict keys removes duplicate entries,
# also maintains the list order
input_status = list(dict.fromkeys(input_status))
input_string = "\n".join(input_status)
self.embed["game_info"]["notes"].append(input_string)
# If emulator crashes on startup without game load, there is no need to show controller notification at all
if (
not controllers
and self.embed["game_info"]["game_name"] != "Unknown"
):
input_string = "⚠️ No controller information found"
self.embed["game_info"]["notes"].append(input_string)
try:
ram_available_regex = re.compile(
r"Application\sPrint:\sRAM:(?:.*Available\s)(\d+)"
)
ram_available = re.search(ram_available_regex, log_file)[1]
if int(ram_available) < 8000:
ram_warning = (
f"⚠️ Less than 8GB RAM available ({str(ram_available)} MB)"
)
self.embed["game_info"]["notes"].append(ram_warning)
except TypeError:
pass
if (
"Windows" in self.embed["hardware_info"]["os"]
and self.embed["settings"]["graphics_backend"] != "Vulkan"
):
if "Intel" in self.embed["hardware_info"]["gpu"]:
intel_gpu_warning = "**⚠️ Intel iGPU users should consider using Vulkan graphics backend**"
self.embed["game_info"]["notes"].append(intel_gpu_warning)
if "AMD" in self.embed["hardware_info"]["gpu"]:
amd_gpu_warning = "**⚠️ AMD GPU users should consider using Vulkan graphics backend**"
self.embed["game_info"]["notes"].append(amd_gpu_warning)
try:
default_logs = ["Info", "Warning", "Error", "Guest", "Stub"]
user_logs = (
self.embed["emu_info"]["logs_enabled"]
.rstrip()
.replace(" ", "")
.split(",")
)
if "Debug" in user_logs:
debug_warning = f"⚠️ **Debug logs enabled will have a negative impact on performance**"
self.embed["game_info"]["notes"].append(debug_warning)
disabled_logs = set(default_logs).difference(set(user_logs))
if disabled_logs:
logs_status = [
f"⚠️ {log} log is not enabled" for log in disabled_logs
]
log_string = "\n".join(logs_status)
else:
log_string = "✅ Default logs enabled"
self.embed["game_info"]["notes"].append(log_string)
except AttributeError:
pass
if self.embed["emu_info"]["ryu_firmware"] == "Unknown":
firmware_warning = f"**❌ Nintendo Switch firmware not found**"
self.embed["game_info"]["notes"].append(firmware_warning)
if self.embed["settings"]["audio_backend"] == "Dummy":
dummy_warning = (
f"⚠️ Dummy audio backend, consider changing to SDL2 or OpenAL"
)
self.embed["game_info"]["notes"].append(dummy_warning)
if self.embed["settings"]["pptc"] == "Disabled":
pptc_warning = f"🔴 **PPTC cache should be enabled**"
self.embed["game_info"]["notes"].append(pptc_warning)
if self.embed["settings"]["shader_cache"] == "Disabled":
shader_warning = f"🔴 **Shader cache should be enabled**"
self.embed["game_info"]["notes"].append(shader_warning)
if self.embed["settings"]["expand_ram"] == "True":
expand_ram_warning = f"⚠️ `Use alternative memory layout` should only be enabled for 4K mods"
self.embed["game_info"]["notes"].append(expand_ram_warning)
if self.embed["settings"]["memory_manager"] == "SoftwarePageTable":
software_memory_manager_warning = "🔴 **`Software` setting in Memory Manager Mode will give slower performance than the default setting of `Host unchecked`**"
self.embed["game_info"]["notes"].append(
software_memory_manager_warning
)
if self.embed["settings"]["ignore_missing_services"] == "True":
ignore_missing_services_warning = "⚠️ `Ignore Missing Services` being enabled can cause instability"
self.embed["game_info"]["notes"].append(
ignore_missing_services_warning
)
if self.embed["settings"]["vsync"] == "Disabled":
vsync_warning = f"⚠️ V-Sync disabled can cause instability like games running faster than intended or longer load times"
self.embed["game_info"]["notes"].append(vsync_warning)
if self.embed["settings"]["fs_integrity"] == "Disabled":
fs_integrity_warning = f"⚠️ Disabling file integrity checks may cause corrupted dumps to not be detected"
self.embed["game_info"]["notes"].append(fs_integrity_warning)
if self.embed["settings"]["backend_threading"] == "Off":
backend_threading_warning = (
f"🔴 **Graphics Backend Multithreading should be set to `Auto`**"
)
self.embed["game_info"]["notes"].append(backend_threading_warning)
mainline_version = re.compile(r"^\d\.\d\.\d+$")
old_mainline_version = re.compile(r"^\d\.\d\.(\d){4}$")
pr_version = re.compile(r"^\d\.\d\.\d\+([a-f]|\d){7}$")
ldn_version = re.compile(r"^\d\.\d\.\d\-ldn\d+\.\d+(?:\.\d+|$)")
mac_version = re.compile(r"^\d\.\d\.\d\-macos\d+(?:\.\d+(?:\.\d+|$)|$)")
is_channel_allowed = False
for (
allowed_channel_id
) in self.bot.config.bot_log_allowed_channels.values():
if message.channel.id == allowed_channel_id:
is_channel_allowed = True
break
if is_channel_allowed:
if re.match(pr_version, self.embed["emu_info"]["ryu_version"]):
pr_version_warning = f"**⚠️ PR build logs should be posted in <#{self.bot.config.bot_log_allowed_channels['pr-testing']}> if reporting bugs or tests**"
self.embed["game_info"]["notes"].append(pr_version_warning)
if re.match(
old_mainline_version, self.embed["emu_info"]["ryu_version"]
):
old_mainline_version_warning = f"**🔴 Old Ryujinx version, please re-download from the Ryujinx website as auto-updates will not work on this version**"
self.embed["game_info"]["notes"].append(
old_mainline_version_warning
)
if not (
re.match(
mainline_version, self.embed["emu_info"]["ryu_version"]
)
or re.match(
old_mainline_version, self.embed["emu_info"]["ryu_version"]
)
or re.match(mac_version, self.embed["emu_info"]["ryu_version"])
or re.match(ldn_version, self.embed["emu_info"]["ryu_version"])
or re.match(pr_version, self.embed["emu_info"]["ryu_version"])
or re.match("Unknown", self.embed["emu_info"]["ryu_version"])
):
custom_build_warning = (
"**⚠️ Custom builds are not officially supported**"
)
self.embed["game_info"]["notes"].append(custom_build_warning)
def severity(log_note_string):
symbols = ["", "🔴", "⚠️", "", ""]
return next(
i
for i, symbol in enumerate(symbols)
if symbol in log_note_string
)
game_notes = [note for note in self.embed["game_info"]["notes"]]
# Warnings split on the string after the warning symbol for alphabetical ordering
# Severity key then orders alphabetically sorted warnings to show most severe first
ordered_game_notes = sorted(
sorted(game_notes, key=lambda x: x.split()[1]), key=severity
)
return ordered_game_notes
except AttributeError:
pass
if self.is_game_blocked(log_file):
return await self.blocked_game_action(message)
for role in message.author.roles:
if role.id in self.disallowed_roles:
embed = Embed(
colour=Colour(0xFF0000),
description="I'm not allowed to analyse this log.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
return embed
if not self.is_log_valid(log_file):
embed = Embed(
title="⚠️ Modified log detected ⚠️",
colour=Colour(0xFCFC00),
description="This log contains manually modified information and won't be analysed.",
)
embed.set_footer(text=f"Log uploaded by {author_name}")
return embed
get_hardware_info()
get_ryujinx_info()
game_notes = analyse_log()
return format_log_embed()
@commands.check(check_if_staff)
@commands.command(
aliases=["disallow_log_id", "forbid_log_id", "block_id", "blockid"]
)
async def disable_log_id(
self, ctx: Context, block_id_type: str, block_id: str, note=""
):
match block_id_type.lower():
case "app" | "app_id" | "appid" | "tid" | "title_id":
if not is_app_id_valid(block_id):
return await ctx.send("The specified app id is invalid.")
if add_disabled_app_id(self.bot, block_id, note):
return await ctx.send(
f"Application id '{block_id}' is now blocked!"
)
else:
return await ctx.send(
f"Application id '{block_id}' is already blocked."
)
case "build" | "build_id" | "bid":
if not is_build_id_valid(block_id):
return await ctx.send("The specified build id is invalid.")
if add_disabled_build_id(self.bot, block_id, note):
return await ctx.send(f"Build id '{block_id}' is now blocked!")
else:
return await ctx.send(f"Build id '{block_id}' is already blocked.")
case _:
return await ctx.send(
"The specified id type is invalid. Valid id types are: ['app_id', 'build_id']"
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_log_id",
"unblock_log_id",
"unblock_id",
"allow_id",
"unblockid",
]
)
async def enable_log_id(self, ctx: Context, block_id_type: str, block_id: str):
match block_id_type.lower():
case "app" | "app_id" | "appid" | "tid" | "title_id":
if not is_app_id_valid(block_id):
return await ctx.send("The specified app id is invalid.")
if remove_disabled_app_id(self.bot, block_id):
return await ctx.send(
f"Application id '{block_id}' is now unblocked!"
)
else:
return await ctx.send(
f"Application id '{block_id}' is not blocked."
)
case "build" | "build_id", "bid":
if not is_build_id_valid(block_id):
return await ctx.send("The specified build id is invalid.")
if remove_disabled_build_id(self.bot, block_id):
return await ctx.send(f"Build id '{block_id}' is now unblocked!")
else:
return await ctx.send(f"Build id '{block_id}' is not blocked.")
case _:
return await ctx.send(
"The specified id type is invalid. Valid id types are: ['app_id', 'build_id']"
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"blocked_ids",
"listblockedids",
"list_blocked_log_ids",
"list_blocked_ids",
]
)
async def list_disabled_ids(self, ctx: Context):
disabled_ids = get_disabled_ids(self.bot)
message = "**Blocking analysis of the following IDs:**\n"
for id_type, name in {
"app_id": "Application IDs",
"build_id": "Build IDs",
}.items():
if len(disabled_ids[id_type].keys()) > 0:
message += f"- {name}:\n"
for disabled_id, note in disabled_ids[id_type].items():
message += (
f" - [{disabled_id.upper()}]: {note}\n"
if note != ""
else f" - [{disabled_id}]\n"
)
message += "\n"
if len(disabled_ids["ro_section"].keys()) > 0:
message += "- Read-only sections:\n"
for note in disabled_ids["ro_section"].keys():
f"- [{note}]"
return await ctx.send(message)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"disallow_ro_section",
"forbid_ro_section",
"block_ro_section",
"blockrosection",
]
)
async def disable_ro_section(
self, ctx: Context, note: str, *, ro_section_snippet: str
):
ro_section_snippet = ro_section_snippet.strip("`").splitlines()
ro_section_snippet = [
line for line in ro_section_snippet if len(line.strip()) > 0
]
ro_section_info_regex = re.search(
r"PrintRoSectionInfo: main:", ro_section_snippet[0]
)
if ro_section_info_regex is None:
ro_section_snippet.insert(0, "PrintRoSectionInfo: main:")
ro_section = self.get_main_ro_section("\n".join(ro_section_snippet))
if ro_section is not None and is_ro_section_valid(ro_section):
if add_disabled_ro_section(self.bot, note, ro_section):
return await ctx.send(
f"The specified read-only section '{note}' is now blocked."
)
else:
return await ctx.send(
f"The specified read-only section '{note}' is already blocked."
)
@commands.check(check_if_staff)
@commands.command(
aliases=[
"allow_ro_section",
"unblock_ro_section",
"allow_rosection",
"unblockrosection",
]
)
async def enable_ro_section(self, ctx: Context, note: str):
if remove_disabled_ro_section(self.bot, note):
return await ctx.send(
f"The read-only section for '{note}' is now unblocked!"
)
else:
return await ctx.send(f"The read-only section for '{note}' is not blocked.")
@commands.check(check_if_staff)
@commands.command(
aliases=[
"get_blocked_ro_section",
"disabled_ro_section",
"blocked_ro_section" "list_disabled_ro_section",
"list_blocked_ro_section",
]
)
async def get_disabled_ro_section(self, ctx: Context, note: str):
disabled_ids = get_disabled_ids(self.bot)
key_note = note.lower()
if key_note in disabled_ids["ro_section"].keys():
message = f"**Disabled read-only section for '{note}'**:\n"
message += "```\n"
for key, content in disabled_ids["ro_section"][key_note].items():
match key:
case "module":
message += f"Module: {content}\n"
case "sdk_libraries":
message += f"SDK Libraries: \n"
for entry in content:
message += f" SDK {entry}\n"
message += "\n"
message += "```"
return await ctx.send(message)
else:
return await ctx.send("The specified read-only section is not blocked.")
async def analyse_log_message(self, message: Message, attachment_index=0):
author_id = message.author.id
author_mention = message.author.mention
filename = message.attachments[attachment_index].filename
filesize = message.attachments[attachment_index].size
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
log_file_link = message.jump_url
uploaded_logs_exist = [
True for elem in self.uploaded_log_info if filename in elem.values()
]
if not any(uploaded_logs_exist):
reply_message = await message.channel.send(
"Log detected, parsing...", reference=message
)
try:
embed = await self.log_file_read(message)
if "Ryujinx_" in filename:
self.uploaded_log_info.append(
{
"filename": filename,
"file_size": filesize,
"link": log_file_link,
"author": author_id,
}
)
# Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs
# this should help support channels not be flooded with too many log files
# fmt: off
self.uploaded_log_info = self.uploaded_log_info[-5:]
# fmt: on
return await reply_message.edit(content=None, embed=embed)
except UnicodeDecodeError as error:
await reply_message.edit(
content=author_mention,
embed=Embed(
description="This log file appears to be invalid. Please re-check and re-upload your log file.",
colour=self.ryujinx_blue,
),
)
logging.warning(error)
except Exception as error:
await reply_message.edit(
content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception."
)
logging.warning(error)
else:
duplicate_log_file = next(
(
elem
for elem in self.uploaded_log_info
if elem["filename"] == filename
and elem["file_size"] == filesize
and elem["author"] == author_id
),
None,
)
await message.channel.send(
content=author_mention,
embed=Embed(
description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.",
colour=self.ryujinx_blue,
),
)
@commands.check(check_if_staff)
@commands.command(
aliases=["analyselog", "analyse_log", "analyze", "analyzelog", "analyze_log"]
)
async def analyse(self, ctx: Context, attachment_number=1):
await ctx.message.delete()
if ctx.message.reference is not None:
message = await ctx.fetch_message(ctx.message.reference.message_id)
if len(message.attachments) >= attachment_number:
attachment = message.attachments[attachment_number - 1]
is_log_file, _ = self.is_valid_log_name(attachment)
if is_log_file:
return await self.analyse_log_message(
message, attachment_number - 1
)
else:
return await ctx.send(
f"The attached log file '{attachment.filename}' is not valid.",
reference=ctx.message.reference,
)
return await ctx.send(
"Please use `.analyse` as a reply to a message with an attached log file."
)
@Cog.listener()
async def on_message(self, message: Message):
await self.bot.wait_until_ready()
if message.author.bot:
return
for attachment in message.attachments:
is_log_file, is_ryujinx_log_file = self.is_valid_log_name(attachment)
if is_log_file and not is_ryujinx_log_file:
attached_log = message.attachments[0]
log_file = await self.download_file(attached_log.url)
# Large files show a header value when not downloaded completely
# this regex makes sure that the log text to read starts from the first timestamp, ignoring headers
log_file_header_regex = re.compile(
r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL
)
log_file_match = re.search(log_file_header_regex, log_file)
if log_file_match:
log_file = log_file_match.group(0)
if self.is_game_blocked(log_file):
return await message.channel.send(
content=None, embed=await self.blocked_game_action(message)
)
elif (
is_log_file
and is_ryujinx_log_file
and message.channel.id in self.bot_log_allowed_channels.values()
):
return await self.analyse_log_message(
message, message.attachments.index(attachment)
)
elif (
is_log_file
and is_ryujinx_log_file
and message.channel.id not in self.bot_log_allowed_channels.values()
):
return await message.author.send(
content=message.author.mention,
embed=Embed(
description="\n".join(
(
f"Please upload Ryujinx log files to the correct location:\n",
f'<#{self.bot.config.bot_log_allowed_channels["windows-support"]}>: Windows help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["linux-support"]}>: Linux help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["macos-support"]}>: macOS help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers',
f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion',
f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds',
)
),
colour=self.ryujinx_blue,
),
)
async def setup(bot):
await bot.add_cog(LogFileReader(bot))