diff --git a/robocop_ng/__main__.py b/robocop_ng/__main__.py index d375121..681754f 100755 --- a/robocop_ng/__main__.py +++ b/robocop_ng/__main__.py @@ -55,6 +55,7 @@ wanted_jsons = [ "data/invites.json", "data/macros.json", "data/persistent_roles.json", + "data/disabled_tids.json", ] for wanted_json_idx in range(len(wanted_jsons)): @@ -77,10 +78,10 @@ bot.state_dir = state_dir bot.wanted_jsons = wanted_jsons -async def get_channel_safe(self, id): - res = self.get_channel(id) +async def get_channel_safe(self, channel_id: int): + res = self.get_channel(channel_id) if res is None: - res = await self.fetch_channel(id) + res = await self.fetch_channel(channel_id) return res diff --git a/robocop_ng/cogs/logfilereader.py b/robocop_ng/cogs/logfilereader.py index 47bc6c2..7008c11 100644 --- a/robocop_ng/cogs/logfilereader.py +++ b/robocop_ng/cogs/logfilereader.py @@ -2,8 +2,18 @@ import logging import re import aiohttp -from discord import Colour, Embed -from discord.ext.commands import Cog +from discord import Colour, Embed, Message, Attachment +from discord.ext import commands +from discord.ext.commands import Cog, Context + +from robocop_ng.helpers.checks import check_if_staff +from robocop_ng.helpers.disabled_tids import ( + add_disabled_tid, + is_tid_valid, + remove_disabled_tid, + get_disabled_tids, + is_tid_disabled, +) logging.basicConfig( format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)", @@ -12,12 +22,28 @@ logging.basicConfig( class LogFileReader(Cog): + @staticmethod + def is_valid_log(attachment: Attachment) -> tuple[bool, bool]: + filename = attachment.filename + # Any message over 2000 chars is uploaded as message.txt, so this is accounted for + ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log|message\.txt$") + log_file = re.compile(r"^.*\.log|.*\.txt$") + is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename) is not None + is_log_file = re.match(log_file, filename) is not None + + return is_log_file, is_ryujinx_log_file + def __init__(self, bot): self.bot = bot self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels + self.disallowed_named_roles = ["pirate"] self.ryujinx_blue = Colour(0x4A90E2) self.uploaded_log_info = [] + self.disallowed_roles = [ + self.bot.config.named_roles[x] for x in self.disallowed_named_roles + ] + async def download_file(self, log_url): async with aiohttp.ClientSession() as session: # Grabs first and last few bytes of log file to prevent abuse from large files @@ -70,6 +96,20 @@ class LogFileReader(Cog): log_file_header_regex = re.compile(r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL) log_file = re.search(log_file_header_regex, log_file).group(0) + def is_tid_blocked(log_file=log_file): + game_name = re.search( + r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", + log_file, + re.MULTILINE, + ) + if game_name is not None and len(game_name.groups()) > 0: + game_name = game_name.group(1).rstrip() + tid = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name) + if tid is not None: + tid = tid.group(1).strip() + return is_tid_disabled(self.bot, tid) + return False + def get_hardware_info(log_file=log_file): for setting in self.embed["hardware_info"]: try: @@ -615,6 +655,7 @@ class LogFileReader(Cog): old_mainline_version = re.compile(r"^\d\.\d\.(\d){4}$") pr_version = re.compile(r"^\d\.\d\.\d\+([a-f]|\d){7}$") ldn_version = re.compile(r"^\d\.\d\.\d\-ldn\d+\.\d+(?:\.\d+|$)") + mac_version = re.compile(r"^\d\.\d\.\d\-macos\d+\.\d+(?:\.\d+|$)") is_channel_allowed = False @@ -645,6 +686,7 @@ class LogFileReader(Cog): or re.match( old_mainline_version, self.embed["emu_info"]["ryu_version"] ) + or re.match(mac_version, self.embed["emu_info"]["ryu_version"]) or re.match(ldn_version, self.embed["emu_info"]["ryu_version"]) or re.match(pr_version, self.embed["emu_info"]["ryu_version"]) or re.match("Unknown", self.embed["emu_info"]["ryu_version"]) @@ -672,95 +714,189 @@ class LogFileReader(Cog): except AttributeError: pass + if is_tid_blocked(): + warn_message = await message.reply( + f".warn This log contains a blocked title id." + ) + await self.bot.invoke(await self.bot.get_context(warn_message)) + + pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"]) + message.author.add_roles(pirate_role) + + embed = Embed( + title="⛔ Blocked game detected ⛔", + colour=Colour(0xFF0000), + description="This log contains a blocked title id and has been removed.\n" + "The user has been warned and the pirate role was applied.", + ) + embed.set_footer(text=f"Log uploaded by {author_name}") + + await message.delete() + return embed + get_hardware_info() get_ryujinx_info() game_notes = analyse_log() return format_log_embed() + @commands.check(check_if_staff) + @commands.command( + aliases=["disallow_log_tid", "forbid_log_tid", "block_tid", "blocktid"] + ) + async def disable_log_tid(self, ctx: Context, tid: str, note=""): + if not is_tid_valid(tid): + return await ctx.send("The specified TID is invalid.") + + if add_disabled_tid(self.bot, tid, note): + return await ctx.send(f"TID '{tid}' is now blocked!") + else: + return await ctx.send(f"TID '{tid}' is already blocked.") + + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "allow_log_tid", + "unblock_log_tid", + "unblock_tid", + "allow_tid", + "unblocktid", + ] + ) + async def enable_log_tid(self, ctx: Context, tid: str): + if not is_tid_valid(tid): + return await ctx.send("The specified TID is invalid.") + + if remove_disabled_tid(self.bot, tid): + return await ctx.send(f"TID '{tid}' is now unblocked!") + else: + return await ctx.send(f"TID '{tid}' is not blocked.") + + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "blocked_tids", + "listblockedtids", + "list_blocked_log_tids", + "list_blocked_tids", + ] + ) + async def list_disabled_tids(self, ctx: Context): + disabled_tids = get_disabled_tids(self.bot) + message = "**Blocking analysis of the following TIDs:**\n" + for tid, note in disabled_tids.items(): + message += f"- [{tid.upper()}]: {note}\n" if note != "" else f"- [{tid}]\n" + return await ctx.send(message) + + async def analyse_log_message(self, message: Message, attachment_index=0): + author_id = message.author.id + author_mention = message.author.mention + filename = message.attachments[attachment_index].filename + filesize = message.attachments[attachment_index].size + # Any message over 2000 chars is uploaded as message.txt, so this is accounted for + log_file_link = message.jump_url + + for role in message.author.roles: + if role.id in self.disallowed_roles: + return await message.channel.send( + "I'm not allowed to analyse this log." + ) + + uploaded_logs_exist = [ + True for elem in self.uploaded_log_info if filename in elem.values() + ] + if not any(uploaded_logs_exist): + reply_message = await message.channel.send("Log detected, parsing...") + try: + embed = await self.log_file_read(message) + if "Ryujinx_" in filename: + self.uploaded_log_info.append( + { + "filename": filename, + "file_size": filesize, + "link": log_file_link, + "author": author_id, + } + ) + # Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs + # this should help support channels not be flooded with too many log files + # fmt: off + self.uploaded_log_info = self.uploaded_log_info[-5:] + # fmt: on + return await reply_message.edit(content=None, embed=embed) + except UnicodeDecodeError: + return await message.channel.send( + content=author_mention, + embed=Embed( + description=f"This log file appears to be invalid. Please re-check and re-upload your log file.", + colour=self.ryujinx_blue, + ), + ) + except Exception as error: + await reply_message.edit( + content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception." + ) + print(logging.warning(error)) + else: + duplicate_log_file = next( + ( + elem + for elem in self.uploaded_log_info + if elem["filename"] == filename + and elem["file_size"] == filesize + and elem["author"] == author_id + ), + None, + ) + await message.channel.send( + content=author_mention, + embed=Embed( + description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.", + colour=self.ryujinx_blue, + ), + ) + + @commands.check(check_if_staff) + @commands.command( + aliases=["analyselog", "analyse_log", "analyze", "analyzelog", "analyze_log"] + ) + async def analyse(self, ctx: Context, attachment_number=1): + await ctx.message.delete() + if ctx.message.reference is not None: + message = await ctx.fetch_message(ctx.message.reference.message_id) + if len(message.attachments) >= attachment_number: + attachment = message.attachments[attachment_number - 1] + is_log_file, _ = self.is_valid_log(attachment) + + if is_log_file: + return await self.analyse_log_message( + message, attachment_number - 1 + ) + else: + return await ctx.send( + f"The attached log file '{attachment.filename}' is not valid.", + reference=ctx.message.reference, + ) + + return await ctx.send( + "Please use `.analyse` as a reply to a message with an attached log file." + ) + @Cog.listener() - async def on_message(self, message): + async def on_message(self, message: Message): await self.bot.wait_until_ready() if message.author.bot: return - try: - author_id = message.author.id - author_mention = message.author.mention - filename = message.attachments[0].filename - filesize = message.attachments[0].size - # Any message over 2000 chars is uploaded as message.txt, so this is accounted for - ryujinx_log_file_regex = re.compile(r"^Ryujinx_.*\.log|message\.txt$") - log_file = re.compile(r"^.*\.log|.*\.txt$") - log_file_link = message.jump_url - is_ryujinx_log_file = re.match(ryujinx_log_file_regex, filename) - is_log_file = re.match(log_file, filename) + for attachment in message.attachments: + is_log_file, is_ryujinx_log_file = self.is_valid_log(attachment) - if ( - message.channel.id in self.bot_log_allowed_channels.values() - and is_ryujinx_log_file - ): - uploaded_logs_exist = [ - True for elem in self.uploaded_log_info if filename in elem.values() - ] - if not any(uploaded_logs_exist): - reply_message = await message.channel.send( - "Log detected, parsing..." - ) - try: - embed = await self.log_file_read(message) - if "Ryujinx_" in filename: - self.uploaded_log_info.append( - { - "filename": filename, - "file_size": filesize, - "link": log_file_link, - "author": author_id, - } - ) - # Avoid duplicate log file analysis, at least temporarily; keep track of the last few filenames of uploaded logs - # this should help support channels not be flooded with too many log files - # fmt: off - self.uploaded_log_info = self.uploaded_log_info[-5:] - # fmt: on - return await reply_message.edit(content=None, embed=embed) - except UnicodeDecodeError: - return await message.channel.send( - content=author_mention, - embed=Embed( - description=f"This log file appears to be invalid. Please re-check and re-upload your log file.", - colour=self.ryujinx_blue, - ), - ) - except Exception as error: - await reply_message.edit( - content=f"Error: Couldn't parse log; parser threw `{type(error).__name__}` exception." - ) - print(logging.warning(error)) - else: - duplicate_log_file = next( - ( - elem - for elem in self.uploaded_log_info - if elem["filename"] == filename - and elem["file_size"] == filesize - and elem["author"] == author_id - ), - None, - ) - await message.channel.send( - content=author_mention, - embed=Embed( - description=f"The log file `{filename}` appears to be a duplicate [already uploaded here]({duplicate_log_file['link']}). Please upload a more recent file.", - colour=self.ryujinx_blue, - ), - ) - elif ( - is_log_file - and not is_ryujinx_log_file - and message.channel.id in self.bot_log_allowed_channels.values() - ): + if message.channel.id in self.bot_log_allowed_channels.values(): + return await self.analyse_log_message( + message, message.attachments.index(attachment) + ) + elif is_log_file and not is_ryujinx_log_file: return await message.channel.send( - content=author_mention, + content=message.author.mention, embed=Embed( description=f"Your file does not match the Ryujinx log format. Please check your file.", colour=self.ryujinx_blue, @@ -771,7 +907,7 @@ class LogFileReader(Cog): and not message.channel.id in self.bot_log_allowed_channels.values() ): return await message.author.send( - content=author_mention, + content=message.author.mention, embed=Embed( description="\n".join( ( @@ -782,14 +918,11 @@ class LogFileReader(Cog): f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers', f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion', f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds', - f'<#{self.bot.config.bot_log_allowed_channels["linux-master-race"]}>: Linux support and discussion', ) ), colour=self.ryujinx_blue, ), ) - except IndexError: - pass async def setup(bot): diff --git a/robocop_ng/config_template.py b/robocop_ng/config_template.py index e56d069..4bedbd2 100644 --- a/robocop_ng/config_template.py +++ b/robocop_ng/config_template.py @@ -1,5 +1,5 @@ -import hashlib import datetime +import hashlib # Basic bot config, insert your token here, update description if you want prefixes = [".", "!"] @@ -69,6 +69,7 @@ named_roles = { "community": 420010997877833731, "hacker": 364508795038072833, "participant": 434353085926866946, + "pirate": 0, } # The bot manager and staff roles diff --git a/robocop_ng/helpers/disabled_tids.py b/robocop_ng/helpers/disabled_tids.py new file mode 100644 index 0000000..51a1e70 --- /dev/null +++ b/robocop_ng/helpers/disabled_tids.py @@ -0,0 +1,48 @@ +import json +import os + + +def get_disabled_tids_path(bot) -> str: + return os.path.join(bot.state_dir, "data/disabled_tids.json") + + +def is_tid_valid(tid: str) -> bool: + return len(tid) == 16 and tid.isalnum() + + +def get_disabled_tids(bot) -> dict[str, str]: + if os.path.isfile(get_disabled_tids_path(bot)): + with open(get_disabled_tids_path(bot), "r") as f: + return json.load(f) + return {} + + +def set_disabled_tids(bot, contents: dict[str, str]): + with open(get_disabled_tids_path(bot), "w") as f: + json.dump(contents, f) + + +def is_tid_disabled(bot, tid: str) -> bool: + disabled_tids = get_disabled_tids(bot) + tid = tid.lower() + return tid in disabled_tids.keys() + + +def add_disabled_tid(bot, tid: str, note="") -> bool: + disabled_tids = get_disabled_tids(bot) + tid = tid.lower() + if tid not in disabled_tids.keys(): + disabled_tids[tid] = note + set_disabled_tids(bot, disabled_tids) + return True + return False + + +def remove_disabled_tid(bot, tid: str) -> bool: + disabled_tids = get_disabled_tids(bot) + tid = tid.lower() + if tid in disabled_tids.keys(): + del disabled_tids[tid] + set_disabled_tids(bot, disabled_tids) + return True + return False diff --git a/robocop_ng/helpers/macros.py b/robocop_ng/helpers/macros.py index 1e836d2..299a77e 100644 --- a/robocop_ng/helpers/macros.py +++ b/robocop_ng/helpers/macros.py @@ -3,13 +3,13 @@ import os from typing import Optional, Union -def get_crontab_path(bot): +def get_macros_path(bot): return os.path.join(bot.state_dir, "data/macros.json") def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]: - if os.path.isfile(get_crontab_path(bot)): - with open(get_crontab_path(bot), "r") as f: + if os.path.isfile(get_macros_path(bot)): + with open(get_macros_path(bot), "r") as f: macros = json.load(f) # Migration code @@ -31,7 +31,7 @@ def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]: del new_macros["macros"][key] duplicate_num += 1 - set_macros(new_macros) + set_macros(bot, new_macros) return new_macros return macros @@ -52,7 +52,7 @@ def is_macro_key_available( def set_macros(bot, contents: dict[str, dict[str, Union[list[str], str]]]): - with open(get_crontab_path(bot), "w") as f: + with open(get_macros_path(bot), "w") as f: json.dump(contents, f)