mirror of
https://github.com/Ryujinx/ryuko-ng.git
synced 2025-01-05 14:45:28 +00:00
599 lines
24 KiB
Python
599 lines
24 KiB
Python
import logging
|
|
import re
|
|
|
|
import aiohttp
|
|
from discord import Colour, Embed, Message, Attachment
|
|
from discord.ext import commands
|
|
from discord.ext.commands import Cog, Context
|
|
|
|
from robocop_ng.helpers.checks import check_if_staff
|
|
from robocop_ng.helpers.disabled_ids import (
|
|
add_disabled_app_id,
|
|
is_app_id_valid,
|
|
remove_disabled_app_id,
|
|
get_disabled_ids,
|
|
is_app_id_disabled,
|
|
is_build_id_valid,
|
|
add_disabled_build_id,
|
|
remove_disabled_build_id,
|
|
is_build_id_disabled,
|
|
is_ro_section_disabled,
|
|
is_ro_section_valid,
|
|
add_disabled_ro_section,
|
|
remove_disabled_ro_section,
|
|
remove_disable_id,
|
|
)
|
|
from robocop_ng.helpers.ryujinx_log_analyser import LogAnalyser
|
|
|
|
logging.basicConfig(
|
|
format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)",
|
|
level=logging.INFO,
|
|
)
|
|
|
|
|
|
class LogFileReader(Cog):
|
|
@staticmethod
|
|
def is_valid_log_name(attachment: Attachment) -> tuple[bool, bool]:
|
|
filename = attachment.filename
|
|
ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log$")
|
|
log_file = re.compile(r"^.*\.log|.*\.txt$")
|
|
is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename) is not None
|
|
is_log_file = re.match(log_file, filename) is not None
|
|
|
|
return is_log_file, is_ryujinx_log_file
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels
|
|
self.disallowed_named_roles = ["pirate"]
|
|
self.ryujinx_blue = Colour(0x4A90E2)
|
|
self.uploaded_log_info = []
|
|
|
|
self.disallowed_roles = [
|
|
self.bot.config.named_roles[x] for x in self.disallowed_named_roles
|
|
]
|
|
|
|
@staticmethod
|
|
async def download_file(log_url):
|
|
async with aiohttp.ClientSession() as session:
|
|
# Grabs first and last few bytes of log file to prevent abuse from large files
|
|
headers = {"Range": "bytes=0-60000, -6000"}
|
|
async with session.get(log_url, headers=headers) as response:
|
|
return await response.text("UTF-8")
|
|
|
|
@staticmethod
|
|
def is_log_valid(log_file: str) -> bool:
|
|
app_info = LogAnalyser.get_app_info(log_file)
|
|
is_homebrew = LogAnalyser.is_homebrew(log_file)
|
|
if app_info is None or is_homebrew:
|
|
return True
|
|
game_name, app_id, another_app_id, build_ids, main_ro_section = app_info
|
|
if (
|
|
game_name is None
|
|
or app_id is None
|
|
or another_app_id is None
|
|
or build_ids is None
|
|
or main_ro_section is None
|
|
):
|
|
return False
|
|
return app_id == another_app_id
|
|
|
|
def is_game_blocked(self, log_file: str) -> bool:
|
|
app_info = LogAnalyser.get_app_info(log_file)
|
|
if app_info is None:
|
|
return False
|
|
game_name, app_id, another_app_id, build_ids, main_ro_section = app_info
|
|
if is_app_id_disabled(self.bot, app_id) or is_app_id_disabled(
|
|
self.bot, another_app_id
|
|
):
|
|
return True
|
|
for bid in build_ids:
|
|
if is_build_id_disabled(self.bot, bid):
|
|
return True
|
|
return is_ro_section_disabled(self.bot, main_ro_section)
|
|
|
|
async def blocked_game_action(self, message: Message) -> Embed:
|
|
warn_command = self.bot.get_command("warn")
|
|
if warn_command is not None:
|
|
warn_message = await message.reply(
|
|
".warn This log contains a blocked game."
|
|
)
|
|
warn_context = await self.bot.get_context(warn_message)
|
|
await warn_context.invoke(
|
|
warn_command,
|
|
target=None,
|
|
reason="This log contains a blocked game.",
|
|
)
|
|
else:
|
|
logging.error(
|
|
f"Couldn't find 'warn' command. Unable to warn {message.author}."
|
|
)
|
|
|
|
pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"])
|
|
await message.author.add_roles(pirate_role)
|
|
|
|
embed = Embed(
|
|
title="⛔ Blocked game detected ⛔",
|
|
colour=Colour(0xFF0000),
|
|
description="This log contains a blocked game and has been removed.\n"
|
|
"The user has been warned and the pirate role was applied.",
|
|
)
|
|
embed.set_footer(text=f"Log uploaded by @{message.author.name}")
|
|
await message.delete()
|
|
return embed
|
|
|
|
def format_analysed_log(self, author_name: str, analysed_log):
|
|
cleaned_game_name = re.sub(
|
|
r"\s\[(64|32)-bit\]$", "", analysed_log["game_info"]["game_name"]
|
|
)
|
|
analysed_log["game_info"]["game_name"] = cleaned_game_name
|
|
|
|
hardware_info = " | ".join(
|
|
(
|
|
f"**CPU:** {analysed_log['hardware_info']['cpu']}",
|
|
f"**GPU:** {analysed_log['hardware_info']['gpu']}",
|
|
f"**RAM:** {analysed_log['hardware_info']['ram']}",
|
|
f"**OS:** {analysed_log['hardware_info']['os']}",
|
|
)
|
|
)
|
|
|
|
system_settings_info = "\n".join(
|
|
(
|
|
f"**Audio Backend:** `{analysed_log['settings']['audio_backend']}`",
|
|
f"**Console Mode:** `{analysed_log['settings']['docked']}`",
|
|
f"**PPTC Cache:** `{analysed_log['settings']['pptc']}`",
|
|
f"**Shader Cache:** `{analysed_log['settings']['shader_cache']}`",
|
|
f"**V-Sync:** `{analysed_log['settings']['vsync']}`",
|
|
)
|
|
)
|
|
|
|
graphics_settings_info = "\n".join(
|
|
(
|
|
f"**Graphics Backend:** `{analysed_log['settings']['graphics_backend']}`",
|
|
f"**Resolution:** `{analysed_log['settings']['resolution_scale']}`",
|
|
f"**Anisotropic Filtering:** `{analysed_log['settings']['anisotropic_filtering']}`",
|
|
f"**Aspect Ratio:** `{analysed_log['settings']['aspect_ratio']}`",
|
|
f"**Texture Recompression:** `{analysed_log['settings']['texture_recompression']}`",
|
|
)
|
|
)
|
|
|
|
ryujinx_info = " | ".join(
|
|
(
|
|
f"**Version:** {analysed_log['emu_info']['ryu_version']}",
|
|
f"**Firmware:** {analysed_log['emu_info']['ryu_firmware']}",
|
|
)
|
|
)
|
|
|
|
log_embed = Embed(title=f"{cleaned_game_name}", colour=self.ryujinx_blue)
|
|
log_embed.set_footer(text=f"Log uploaded by {author_name}")
|
|
log_embed.add_field(
|
|
name="General Info",
|
|
value=" | ".join((ryujinx_info, hardware_info)),
|
|
inline=False,
|
|
)
|
|
log_embed.add_field(
|
|
name="System Settings",
|
|
value=system_settings_info,
|
|
inline=True,
|
|
)
|
|
log_embed.add_field(
|
|
name="Graphics Settings",
|
|
value=graphics_settings_info,
|
|
inline=True,
|
|
)
|
|
if (
|
|
cleaned_game_name == "Unknown"
|
|
and analysed_log["game_info"]["errors"] == "No errors found in log"
|
|
):
|
|
log_embed.add_field(
|
|
name="Empty Log",
|
|
value=f"""The log file appears to be empty. To get a proper log, follow these steps:
|
|
1) In Logging settings, ensure `Enable Logging to File` is checked.
|
|
2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`.
|
|
3) Start a game up.
|
|
4) Play until your issue occurs.
|
|
5) Upload the latest log file which is larger than 3KB.""",
|
|
inline=False,
|
|
)
|
|
if (
|
|
cleaned_game_name == "Unknown"
|
|
and analysed_log["game_info"]["errors"] != "No errors found in log"
|
|
):
|
|
log_embed.add_field(
|
|
name="Latest Error Snippet",
|
|
value=analysed_log["game_info"]["errors"],
|
|
inline=False,
|
|
)
|
|
log_embed.add_field(
|
|
name="No Game Boot Detected",
|
|
value=f"""No game boot has been detected in log file. To get a proper log, follow these steps:
|
|
1) In Logging settings, ensure `Enable Logging to File` is checked.
|
|
2) Ensure the following default logs are enabled: `Info`, `Warning`, `Error`, `Guest` and `Stub`.
|
|
3) Start a game up.
|
|
4) Play until your issue occurs.
|
|
5) Upload the latest log file which is larger than 3KB.""",
|
|
inline=False,
|
|
)
|
|
else:
|
|
log_embed.add_field(
|
|
name="Latest Error Snippet",
|
|
value=analysed_log["game_info"]["errors"],
|
|
inline=False,
|
|
)
|
|
log_embed.add_field(
|
|
name="Mods", value=analysed_log["game_info"]["mods"], inline=False
|
|
)
|
|
log_embed.add_field(
|
|
name="Cheats", value=analysed_log["game_info"]["cheats"], inline=False
|
|
)
|
|
|
|
log_embed.add_field(
|
|
name="Notes",
|
|
value=analysed_log["game_info"]["notes"],
|
|
inline=False,
|
|
)
|
|
|
|
return log_embed
|
|
|
|
async def log_file_read(self, message):
|
|
attached_log = message.attachments[0]
|
|
author_name = f"@{message.author.name}"
|
|
log_file = await self.download_file(attached_log.url)
|
|
|
|
if self.is_game_blocked(log_file):
|
|
return await self.blocked_game_action(message)
|
|
|
|
for role in message.author.roles:
|
|
if role.id in self.disallowed_roles:
|
|
embed = Embed(
|
|
colour=Colour(0xFF0000),
|
|
description="I'm not allowed to analyse this log.",
|
|
)
|
|
embed.set_footer(text=f"Log uploaded by {author_name}")
|
|
return embed
|
|
|
|
if not self.is_log_valid(log_file):
|
|
embed = Embed(
|
|
title="⚠️ Modified log detected ⚠️",
|
|
colour=Colour(0xFCFC00),
|
|
description="This log contains manually modified information and won't be analysed.",
|
|
)
|
|
embed.set_footer(text=f"Log uploaded by {author_name}")
|
|
return embed
|
|
|
|
try:
|
|
analyser = LogAnalyser(log_file)
|
|
except ValueError:
|
|
return Embed(
|
|
colour=self.ryujinx_blue,
|
|
description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.",
|
|
)
|
|
|
|
is_channel_allowed = False
|
|
for allowed_channel_id in self.bot.config.bot_log_allowed_channels.values():
|
|
if message.channel.id == allowed_channel_id:
|
|
is_channel_allowed = True
|
|
break
|
|
|
|
return self.format_analysed_log(
|
|
author_name,
|
|
analyser.analyse_discord(
|
|
is_channel_allowed,
|
|
self.bot.config.bot_log_allowed_channels["pr-testing"],
|
|
),
|
|
)
|
|
|
|
@commands.check(check_if_staff)
|
|
@commands.command(
|
|
aliases=["disallow_log_id", "forbid_log_id", "block_id", "blockid"]
|
|
)
|
|
async def disable_log_id(
|
|
self, ctx: Context, disable_id: str, block_id_type: str, *, block_id: str
|
|
):
|
|
match block_id_type.lower():
|
|
case "app" | "app_id" | "appid" | "tid" | "title_id":
|
|
if not is_app_id_valid(block_id):
|
|
return await ctx.send("The specified app id is invalid.")
|
|
|
|
if add_disabled_app_id(self.bot, disable_id, block_id):
|
|
return await ctx.send(
|
|
f"Application id '{block_id}' is now blocked!"
|
|
)
|
|
else:
|
|
return await ctx.send(
|
|
f"Application id '{block_id}' is already blocked."
|
|
)
|
|
case "build" | "build_id" | "bid":
|
|
if not is_build_id_valid(block_id):
|
|
return await ctx.send("The specified build id is invalid.")
|
|
|
|
if add_disabled_build_id(self.bot, disable_id, block_id):
|
|
return await ctx.send(f"Build id '{block_id}' is now blocked!")
|
|
else:
|
|
return await ctx.send(f"Build id '{block_id}' is already blocked.")
|
|
case "ro_section" | "rosection":
|
|
ro_section_snippet = block_id.strip("`").splitlines()
|
|
ro_section_snippet = [
|
|
line for line in ro_section_snippet if len(line.strip()) > 0
|
|
]
|
|
|
|
ro_section_info_regex = re.search(
|
|
r"PrintRoSectionInfo: main:", ro_section_snippet[0]
|
|
)
|
|
if ro_section_info_regex is None:
|
|
ro_section_snippet.insert(0, "PrintRoSectionInfo: main:")
|
|
|
|
ro_section = LogAnalyser.get_main_ro_section(
|
|
"\n".join(ro_section_snippet)
|
|
)
|
|
if ro_section is not None and is_ro_section_valid(ro_section):
|
|
if add_disabled_ro_section(self.bot, disable_id, ro_section):
|
|
return await ctx.send(
|
|
f"The specified read-only section for '{disable_id}' is now blocked."
|
|
)
|
|
else:
|
|
return await ctx.send(
|
|
f"The specified read-only section for '{disable_id}' is already blocked."
|
|
)
|
|
case _:
|
|
return await ctx.send(
|
|
"The specified id type is invalid. Valid id types are: ['app_id', 'build_id', 'ro_section']"
|
|
)
|
|
|
|
@commands.check(check_if_staff)
|
|
@commands.command(
|
|
aliases=[
|
|
"allow_log_id",
|
|
"unblock_log_id",
|
|
"unblock_id",
|
|
"allow_id",
|
|
"unblockid",
|
|
]
|
|
)
|
|
async def enable_log_id(self, ctx: Context, disable_id: str, block_id_type="all"):
|
|
match block_id_type.lower():
|
|
case "all":
|
|
if remove_disable_id(self.bot, disable_id):
|
|
return await ctx.send(
|
|
f"All ids for '{disable_id}' are now unblocked!"
|
|
)
|
|
else:
|
|
return await ctx.send(f"No blocked ids for '{disable_id}' found.")
|
|
case "app" | "app_id" | "appid" | "tid" | "title_id":
|
|
if remove_disabled_app_id(self.bot, disable_id):
|
|
return await ctx.send(
|
|
f"Application id for '{disable_id}' is now unblocked!"
|
|
)
|
|
else:
|
|
return await ctx.send(
|
|
f"No blocked application id for '{disable_id}' found."
|
|
)
|
|
case "build" | "build_id" | "bid":
|
|
if remove_disabled_build_id(self.bot, disable_id):
|
|
return await ctx.send(
|
|
f"Build id for '{disable_id}' is now unblocked!"
|
|
)
|
|
else:
|
|
return await ctx.send(f"No blocked build id '{disable_id}' found.")
|
|
case "ro_section" | "rosection":
|
|
if remove_disabled_ro_section(self.bot, disable_id):
|
|
return await ctx.send(
|
|
f"Read-only section for '{disable_id}' is now unblocked!"
|
|
)
|
|
else:
|
|
return await ctx.send(
|
|
f"No blocked read-only section for '{disable_id}' found."
|
|
)
|
|
case _:
|
|
return await ctx.send(
|
|
"The specified id type is invalid. Valid id types are: ['all', 'app_id', 'build_id', 'ro_section']"
|
|
)
|
|
|
|
@commands.check(check_if_staff)
|
|
@commands.command(
|
|
aliases=[
|
|
"disabled_ids",
|
|
"blocked_ids",
|
|
"listblockedids",
|
|
"list_blocked_log_ids",
|
|
"list_blocked_ids",
|
|
]
|
|
)
|
|
async def list_disabled_ids(self, ctx: Context):
|
|
disabled_ids = get_disabled_ids(self.bot)
|
|
id_types = {"app_id": "AppID", "build_id": "BID", "ro_section": "RoSection"}
|
|
|
|
message = "**Blocking analysis of the following IDs:**\n"
|
|
for name, entry in disabled_ids.items():
|
|
message += f"- {name}:\n"
|
|
for id_type, title in id_types.items():
|
|
if len(entry[id_type]) > 0:
|
|
if id_type != "ro_section":
|
|
message += f" - __{title}__: {entry[id_type]}\n"
|
|
else:
|
|
message += f" - __{title}__\n"
|
|
message += "\n"
|
|
return await ctx.send(message)
|
|
|
|
@commands.check(check_if_staff)
|
|
@commands.command(
|
|
aliases=[
|
|
"get_blocked_ro_section",
|
|
"disabled_ro_section",
|
|
"blocked_ro_section",
|
|
"list_disabled_ro_section",
|
|
"list_blocked_ro_section",
|
|
]
|
|
)
|
|
async def get_disabled_ro_section(self, ctx: Context, disable_id: str):
|
|
disabled_ids = get_disabled_ids(self.bot)
|
|
disable_id = disable_id.lower()
|
|
if (
|
|
disable_id in disabled_ids.keys()
|
|
and len(disabled_ids[disable_id]["ro_section"]) > 0
|
|
):
|
|
message = f"**Blocked read-only section for '{disable_id}'**:\n"
|
|
message += "```\n"
|
|
for key, content in disabled_ids[disable_id]["ro_section"].items():
|
|
match key:
|
|
case "module":
|
|
message += f"Module: {content}\n"
|
|
case "sdk_libraries":
|
|
message += f"SDK Libraries: \n"
|
|
for entry in content:
|
|
message += f" SDK {entry}\n"
|
|
message += "\n"
|
|
message += "```"
|
|
return await ctx.send(message)
|
|
else:
|
|
return await ctx.send(f"No read-only section blocked for '{disable_id}'.")
|
|
|
|
async def analyse_log_message(self, message: Message, attachment_index=0):
|
|
author_id = message.author.id
|
|
author_mention = message.author.mention
|
|
filename = message.attachments[attachment_index].filename
|
|
filesize = message.attachments[attachment_index].size
|
|
# Any message over 2000 chars is uploaded as message.txt, so this is accounted for
|
|
log_file_link = message.jump_url
|
|
|
|
uploaded_logs_exist = [
|
|
True for elem in self.uploaded_log_info if filename in elem.values()
|
|
]
|
|
if not any(uploaded_logs_exist):
|
|
reply_message = await message.channel.send(
|
|
"Log detected, parsing...", reference=message
|
|
)
|
|
try:
|
|
embed = await self.log_file_read(message)
|
|
if "Ryujinx_" in filename:
|
|
self.uploaded_log_info.append(
|
|
{
|
|
"filename": filename,
|
|
"file_size": filesize,
|
|
"link": log_file_link,
|
|
"author": author_id,
|
|
}
|
|
)
|
|
# Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs
|
|
# this should help support channels not be flooded with too many log files
|
|
# fmt: off
|
|
self.uploaded_log_info = self.uploaded_log_info[-5:]
|
|
# fmt: on
|
|
return await reply_message.edit(content=None, embed=embed)
|
|
except UnicodeDecodeError as error:
|
|
await reply_message.edit(
|
|
content=author_mention,
|
|
embed=Embed(
|
|
description="This log file appears to be invalid. Please re-check and re-upload your log file.",
|
|
colour=self.ryujinx_blue,
|
|
),
|
|
)
|
|
logging.warning(error)
|
|
except Exception as error:
|
|
await reply_message.edit(
|
|
content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception."
|
|
)
|
|
logging.warning(error)
|
|
else:
|
|
duplicate_log_file = next(
|
|
(
|
|
elem
|
|
for elem in self.uploaded_log_info
|
|
if elem["filename"] == filename
|
|
and elem["file_size"] == filesize
|
|
and elem["author"] == author_id
|
|
),
|
|
None,
|
|
)
|
|
await message.channel.send(
|
|
content=author_mention,
|
|
embed=Embed(
|
|
description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.",
|
|
colour=self.ryujinx_blue,
|
|
),
|
|
)
|
|
|
|
@commands.check(check_if_staff)
|
|
@commands.command(
|
|
aliases=["analyselog", "analyse_log", "analyze", "analyzelog", "analyze_log"]
|
|
)
|
|
async def analyse(self, ctx: Context, attachment_number=1):
|
|
await ctx.message.delete()
|
|
if ctx.message.reference is not None:
|
|
message = await ctx.fetch_message(ctx.message.reference.message_id)
|
|
if len(message.attachments) >= attachment_number:
|
|
attachment = message.attachments[attachment_number - 1]
|
|
is_log_file, _ = self.is_valid_log_name(attachment)
|
|
|
|
if is_log_file:
|
|
return await self.analyse_log_message(
|
|
message, attachment_number - 1
|
|
)
|
|
else:
|
|
return await ctx.send(
|
|
f"The attached log file '{attachment.filename}' is not valid.",
|
|
reference=ctx.message.reference,
|
|
)
|
|
|
|
return await ctx.send(
|
|
"Please use `.analyse` as a reply to a message with an attached log file."
|
|
)
|
|
|
|
@Cog.listener()
|
|
async def on_message(self, message: Message):
|
|
await self.bot.wait_until_ready()
|
|
if message.author.bot:
|
|
return
|
|
for attachment in message.attachments:
|
|
is_log_file, is_ryujinx_log_file = self.is_valid_log_name(attachment)
|
|
|
|
if is_log_file and not is_ryujinx_log_file:
|
|
attached_log = message.attachments[0]
|
|
log_file = await self.download_file(attached_log.url)
|
|
# Large files show a header value when not downloaded completely
|
|
# this regex makes sure that the log text to read starts from the first timestamp, ignoring headers
|
|
log_file_header_regex = re.compile(
|
|
r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL
|
|
)
|
|
log_file_match = re.search(log_file_header_regex, log_file)
|
|
if log_file_match:
|
|
log_file = log_file_match.group(0)
|
|
if self.is_game_blocked(log_file):
|
|
return await message.channel.send(
|
|
content=None, embed=await self.blocked_game_action(message)
|
|
)
|
|
elif (
|
|
is_log_file
|
|
and is_ryujinx_log_file
|
|
and message.channel.id in self.bot_log_allowed_channels.values()
|
|
):
|
|
return await self.analyse_log_message(
|
|
message, message.attachments.index(attachment)
|
|
)
|
|
elif (
|
|
is_log_file
|
|
and is_ryujinx_log_file
|
|
and message.channel.id not in self.bot_log_allowed_channels.values()
|
|
):
|
|
return await message.author.send(
|
|
content=message.author.mention,
|
|
embed=Embed(
|
|
description="\n".join(
|
|
(
|
|
f"Please upload Ryujinx log files to the correct location:\n",
|
|
f'<#{self.bot.config.bot_log_allowed_channels["windows-support"]}>: Windows help and troubleshooting',
|
|
f'<#{self.bot.config.bot_log_allowed_channels["linux-support"]}>: Linux help and troubleshooting',
|
|
f'<#{self.bot.config.bot_log_allowed_channels["macos-support"]}>: macOS help and troubleshooting',
|
|
f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers',
|
|
f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion',
|
|
f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds',
|
|
)
|
|
),
|
|
colour=self.ryujinx_blue,
|
|
),
|
|
)
|
|
|
|
|
|
async def setup(bot):
|
|
await bot.add_cog(LogFileReader(bot))
|