diff --git a/robocop_ng/__main__.py b/robocop_ng/__main__.py index 796cc2c..b24b912 100755 --- a/robocop_ng/__main__.py +++ b/robocop_ng/__main__.py @@ -56,7 +56,7 @@ wanted_jsons = [ "data/invites.json", "data/macros.json", "data/persistent_roles.json", - "data/disabled_tids.json", + "data/disabled_ids.json", ] for wanted_json_idx in range(len(wanted_jsons)): diff --git a/robocop_ng/cogs/logfilereader.py b/robocop_ng/cogs/logfilereader.py index e566381..7d81710 100644 --- a/robocop_ng/cogs/logfilereader.py +++ b/robocop_ng/cogs/logfilereader.py @@ -1,5 +1,6 @@ import logging import re +from typing import Optional import aiohttp from discord import Colour, Embed, Message, Attachment @@ -7,12 +8,20 @@ from discord.ext import commands from discord.ext.commands import Cog, Context from robocop_ng.helpers.checks import check_if_staff -from robocop_ng.helpers.disabled_tids import ( - add_disabled_tid, - is_tid_valid, - remove_disabled_tid, - get_disabled_tids, - is_tid_disabled, +from robocop_ng.helpers.disabled_ids import ( + add_disabled_app_id, + is_app_id_valid, + remove_disabled_app_id, + get_disabled_ids, + is_app_id_disabled, + is_build_id_valid, + add_disabled_build_id, + remove_disabled_build_id, + is_build_id_disabled, + is_ro_section_disabled, + is_ro_section_valid, + add_disabled_ro_section, + remove_disabled_ro_section, ) logging.basicConfig( @@ -50,6 +59,113 @@ class LogFileReader(Cog): async with session.get(log_url, headers=headers) as response: return await response.text("UTF-8") + def get_main_ro_section(self, log_file: str) -> Optional[dict[str, str]]: + ro_section_regex = re.search( + r"PrintRoSectionInfo: main:[\r\n]*(.*)", log_file, re.DOTALL + ) + if ro_section_regex is not None and len(ro_section_regex.groups()) > 0: + ro_section = {"module": "", "sdk_libraries": []} + for line in ro_section_regex.group(1).splitlines(): + line = line.strip() + if line.startswith("Module:"): + ro_section["module"] = line[8:] + elif line.startswith("SDK Libraries:"): + ro_section["sdk_libraries"].append(line[19:]) + elif line.startswith("SDK "): + ro_section["sdk_libraries"].append(line[4:]) + else: + break + return ro_section + return None + + def get_app_info( + self, log_file: str + ) -> Optional[tuple[str, str, list[str], dict[str, str]]]: + game_name = re.search( + r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", + log_file, + re.MULTILINE, + ) + if game_name is not None and len(game_name.groups()) > 0: + game_name = game_name.group(1).rstrip() + app_id_regex = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name) + if app_id_regex: + app_id = app_id_regex.group(1).strip() + else: + app_id = None + bids_regex = re.search( + r"Build ids found for title ([a-zA-Z0-9]*):[\n\r]*(.*)", + log_file, + re.DOTALL, + ) + if bids_regex is not None and len(bids_regex.groups()) > 0: + app_id_from_bids = bids_regex.group(1).strip() + build_ids = [ + bid.strip() + for bid in bids_regex.group(2).splitlines() + if is_build_id_valid(bid.strip()) + ] + + # TODO: Check if self.get_main_ro_section() is None and return an error + return ( + app_id, + app_id_from_bids, + build_ids, + self.get_main_ro_section(log_file), + ) + return None + + def is_log_valid(self, log_file: str) -> bool: + app_info = self.get_app_info(log_file) + if app_info is None: + return True + app_id, another_app_id, _, _ = app_info + return app_id == another_app_id + + def is_game_blocked(self, log_file: str) -> bool: + app_info = self.get_app_info(log_file) + if app_info is None: + return False + app_id, another_app_id, build_ids, main_ro_section = app_info + if is_app_id_disabled(self.bot, app_id) or is_app_id_disabled( + self.bot, another_app_id + ): + return True + for bid in build_ids: + if is_build_id_disabled(self.bot, bid): + return True + return is_ro_section_disabled(self.bot, main_ro_section) + + async def blocked_game_action(self, message: Message) -> Embed: + warn_command = self.bot.get_command("warn") + if warn_command is not None: + warn_message = await message.reply( + ".warn This log contains a blocked game." + ) + warn_context = await self.bot.get_context(warn_message) + await warn_context.invoke( + warn_command, + target=None, + reason="This log contains a blocked game.", + ) + else: + logging.error( + f"Couldn't find 'warn' command. Unable to warn {message.author}." + ) + + pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"]) + await message.author.add_roles(pirate_role) + + embed = Embed( + title="⛔ Blocked game detected ⛔", + colour=Colour(0xFF0000), + description="This log contains a blocked game and has been removed.\n" + "The user has been warned and the pirate role was applied.", + ) + embed.set_footer(text=f"Log uploaded by @{message.author.name}") + await message.delete() + return embed + async def log_file_read(self, message): self.embed = { "hardware_info": { @@ -67,6 +183,7 @@ class LogFileReader(Cog): "game_name": "Unknown", "errors": "No errors found in log", "mods": "No mods found", + "cheats": "No cheats found", "notes": [], }, "settings": { @@ -103,20 +220,6 @@ class LogFileReader(Cog): description="This log file appears to be invalid. Please make sure to upload a Ryujinx log file.", ) - def is_tid_blocked(log_file=log_file): - game_name = re.search( - r"Loader [A-Za-z]*: Application Loaded:\s([^;\n\r]*)", - log_file, - re.MULTILINE, - ) - if game_name is not None and len(game_name.groups()) > 0: - game_name = game_name.group(1).rstrip() - tid = re.match(r".* \[([a-zA-Z0-9]*)\]", game_name) - if tid is not None: - tid = tid.group(1).strip() - return is_tid_disabled(self.bot, tid) - return False - def get_hardware_info(log_file=log_file): for setting in self.embed["hardware_info"]: try: @@ -284,6 +387,9 @@ class LogFileReader(Cog): log_embed.add_field( name="Mods", value=self.embed["game_info"]["mods"], inline=False ) + log_embed.add_field( + name="Cheats", value=self.embed["game_info"]["cheats"], inline=False + ) try: notes_value = "\n".join(game_notes) @@ -540,11 +646,20 @@ class LogFileReader(Cog): mods_status = list(dict.fromkeys(mods_status)) return mods_status + def cheat_information(log_file=log_file): + cheat_regex = re.compile(r"Installing cheat\s\'(.+?)\'") + matches = re.findall(cheat_regex, log_file) + if matches: + cheats = [match[0] for match in matches] + return list(set(cheats)) + game_mods = mods_information() if game_mods: self.embed["game_info"]["mods"] = "\n".join(game_mods) - else: - pass + + game_cheats = cheat_information() + if game_cheats: + self.embed["game_info"]["cheats"] = "\n".join(game_cheats) controllers_regex = re.compile(r"Hid Configure: ([^\r\n]+)") controllers = re.findall(controllers_regex, log_file) @@ -721,35 +836,25 @@ class LogFileReader(Cog): except AttributeError: pass - if is_tid_blocked(): - warn_command = self.bot.get_command("warn") - if warn_command is not None: - warn_message = await message.reply( - ".warn This log contains a blocked title id." - ) - warn_context = await self.bot.get_context(warn_message) - await warn_context.invoke( - warn_command, - target=None, - reason="This log contains a blocked title id.", - ) - else: - logging.error( - f"Couldn't find 'warn' command. Unable to warn {message.author}." - ) + if self.is_game_blocked(log_file): + return await self.blocked_game_action(message) - pirate_role = message.guild.get_role(self.bot.config.named_roles["pirate"]) - await message.author.add_roles(pirate_role) + for role in message.author.roles: + if role.id in self.disallowed_roles: + embed = Embed( + colour=Colour(0xFF0000), + description="I'm not allowed to analyse this log.", + ) + embed.set_footer(text=f"Log uploaded by {author_name}") + return embed + if not self.is_log_valid(log_file): embed = Embed( - title="⛔ Blocked game detected ⛔", - colour=Colour(0xFF0000), - description="This log contains a blocked title id and has been removed.\n" - "The user has been warned and the pirate role was applied.", + title="⚠️ Modified log detected ⚠️", + colour=Colour(0xFCFC00), + description="This log contains manually modified information and won't be analysed.", ) embed.set_footer(text=f"Log uploaded by {author_name}") - - await message.delete() return embed get_hardware_info() @@ -760,52 +865,185 @@ class LogFileReader(Cog): @commands.check(check_if_staff) @commands.command( - aliases=["disallow_log_tid", "forbid_log_tid", "block_tid", "blocktid"] + aliases=["disallow_log_id", "forbid_log_id", "block_id", "blockid"] ) - async def disable_log_tid(self, ctx: Context, tid: str, note=""): - if not is_tid_valid(tid): - return await ctx.send("The specified TID is invalid.") + async def disable_log_id( + self, ctx: Context, block_id_type: str, block_id: str, note="" + ): + match block_id_type.lower(): + case "app" | "app_id" | "appid" | "tid" | "title_id": + if not is_app_id_valid(block_id): + return await ctx.send("The specified app id is invalid.") - if add_disabled_tid(self.bot, tid, note): - return await ctx.send(f"TID '{tid}' is now blocked!") - else: - return await ctx.send(f"TID '{tid}' is already blocked.") + if add_disabled_app_id(self.bot, block_id, note): + return await ctx.send( + f"Application id '{block_id}' is now blocked!" + ) + else: + return await ctx.send( + f"Application id '{block_id}' is already blocked." + ) + case "build" | "build_id", "bid": + if not is_build_id_valid(block_id): + return await ctx.send("The specified build id is invalid.") + + if add_disabled_build_id(self.bot, block_id, note): + return await ctx.send(f"Build id '{block_id}' is now blocked!") + else: + return await ctx.send(f"Build id '{block_id}' is already blocked.") + case _: + return await ctx.send( + "The specified id type is invalid. Valid id types are: ['app_id', 'build_id']" + ) @commands.check(check_if_staff) @commands.command( aliases=[ - "allow_log_tid", - "unblock_log_tid", - "unblock_tid", - "allow_tid", - "unblocktid", + "allow_log_id", + "unblock_log_id", + "unblock_id", + "allow_id", + "unblockid", ] ) - async def enable_log_tid(self, ctx: Context, tid: str): - if not is_tid_valid(tid): - return await ctx.send("The specified TID is invalid.") + async def enable_log_id(self, ctx: Context, block_id_type: str, block_id: str): + match block_id_type.lower(): + case "app" | "app_id" | "appid" | "tid" | "title_id": + if not is_app_id_valid(block_id): + return await ctx.send("The specified app id is invalid.") - if remove_disabled_tid(self.bot, tid): - return await ctx.send(f"TID '{tid}' is now unblocked!") - else: - return await ctx.send(f"TID '{tid}' is not blocked.") + if remove_disabled_app_id(self.bot, block_id): + return await ctx.send( + f"Application id '{block_id}' is now unblocked!" + ) + else: + return await ctx.send( + f"Application id '{block_id}' is not blocked." + ) + case "build" | "build_id", "bid": + if not is_build_id_valid(block_id): + return await ctx.send("The specified build id is invalid.") + + if remove_disabled_build_id(self.bot, block_id): + return await ctx.send(f"Build id '{block_id}' is now unblocked!") + else: + return await ctx.send(f"Build id '{block_id}' is not blocked.") + case _: + return await ctx.send( + "The specified id type is invalid. Valid id types are: ['app_id', 'build_id']" + ) @commands.check(check_if_staff) @commands.command( aliases=[ - "blocked_tids", - "listblockedtids", - "list_blocked_log_tids", - "list_blocked_tids", + "blocked_ids", + "listblockedids", + "list_blocked_log_ids", + "list_blocked_ids", ] ) - async def list_disabled_tids(self, ctx: Context): - disabled_tids = get_disabled_tids(self.bot) - message = "**Blocking analysis of the following TIDs:**\n" - for tid, note in disabled_tids.items(): - message += f"- [{tid.upper()}]: {note}\n" if note != "" else f"- [{tid}]\n" + async def list_disabled_ids(self, ctx: Context): + disabled_ids = get_disabled_ids(self.bot) + message = "**Blocking analysis of the following IDs:**\n" + for id_type, name in { + "app_id": "Application IDs", + "build_id": "Build IDs", + }.items(): + if len(disabled_ids[id_type].keys()) > 0: + message += f"- {name}:\n" + for disabled_id, note in disabled_ids[id_type].items(): + message += ( + f" - [{disabled_id.upper()}]: {note}\n" + if note != "" + else f" - [{disabled_id}]\n" + ) + message += "\n" + if len(disabled_ids["ro_section"].keys()) > 0: + message += "- Read-only sections:\n" + for note in disabled_ids["ro_section"].keys(): + f"- [{note}]" return await ctx.send(message) + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "disallow_ro_section", + "forbid_ro_section", + "block_ro_section", + "blockrosection", + ] + ) + async def disable_ro_section( + self, ctx: Context, note: str, ro_section_snippet: str + ): + ro_section_snippet = ro_section_snippet.strip("`").splitlines() + ro_section_snippet = [ + line for line in ro_section_snippet if len(line.strip()) > 0 + ] + + ro_section_info_regex = re.search( + r"PrintRoSectionInfo: main:", ro_section_snippet[0] + ) + if ro_section_info_regex is None: + ro_section_snippet.insert(0, "PrintRoSectionInfo: main:") + + ro_section = self.get_main_ro_section("\n".join(ro_section_snippet)) + if ro_section is not None and is_ro_section_valid(ro_section): + if add_disabled_ro_section(self.bot, note, ro_section): + return await ctx.send( + f"The specified read-only section '{note}' is now blocked." + ) + else: + return await ctx.send( + f"The specified read-only section '{note}' is already blocked." + ) + + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "allow_ro_section", + "unblock_ro_section", + "allow_rosection", + "unblockrosection", + ] + ) + async def enable_ro_section(self, ctx: Context, note: str): + if remove_disabled_ro_section(self.bot, note): + return await ctx.send( + f"The read-only section for '{note}' is now unblocked!" + ) + else: + return await ctx.send(f"The read-only section for '{note}' is not blocked.") + + @commands.check(check_if_staff) + @commands.command( + aliases=[ + "get_blocked_ro_section", + "disabled_ro_section", + "blocked_ro_section" "list_disabled_ro_section", + "list_blocked_ro_section", + ] + ) + async def get_disabled_ro_section(self, ctx: Context, note: str): + disabled_ids = get_disabled_ids(self.bot) + key_note = note.lower() + if key_note in disabled_ids["ro_section"].keys(): + message = f"**Disabled read-only section for '{note}'**:\n" + message += "```\n" + for key, content in disabled_ids["ro_section"][key_note].items(): + match key: + case "module": + message += f"Module: {content}\n" + case "sdk_libraries": + message += f"SDK Libraries: \n" + for entry in content: + message += f" SDK {entry}\n" + message += "\n" + message += "```" + return await ctx.send(message) + else: + return await ctx.send("The specified read-only section is not blocked.") + async def analyse_log_message(self, message: Message, attachment_index=0): author_id = message.author.id author_mention = message.author.mention @@ -814,12 +1052,6 @@ class LogFileReader(Cog): # Any message over 2000 chars is uploaded as message.txt, so this is accounted for log_file_link = message.jump_url - for role in message.author.roles: - if role.id in self.disallowed_roles: - return await message.channel.send( - "I'm not allowed to analyse this log." - ) - uploaded_logs_exist = [ True for elem in self.uploaded_log_info if filename in elem.values() ] @@ -911,7 +1143,22 @@ class LogFileReader(Cog): for attachment in message.attachments: is_log_file, is_ryujinx_log_file = self.is_valid_log_name(attachment) - if ( + if is_log_file and not is_ryujinx_log_file: + attached_log = message.attachments[0] + log_file = await self.download_file(attached_log.url) + # Large files show a header value when not downloaded completely + # this regex makes sure that the log text to read starts from the first timestamp, ignoring headers + log_file_header_regex = re.compile( + r"\d{2}:\d{2}:\d{2}\.\d{3}.*", re.DOTALL + ) + log_file_match = re.search(log_file_header_regex, log_file) + if log_file_match: + log_file = log_file_match.group(0) + if self.is_game_blocked(log_file): + return await message.channel.send( + content=None, embed=await self.blocked_game_action(message) + ) + elif ( is_log_file and is_ryujinx_log_file and message.channel.id in self.bot_log_allowed_channels.values() diff --git a/robocop_ng/cogs/macro.py b/robocop_ng/cogs/macro.py index ef9e955..b14cdcb 100644 --- a/robocop_ng/cogs/macro.py +++ b/robocop_ng/cogs/macro.py @@ -23,7 +23,9 @@ class Macro(Cog): @commands.cooldown(3, 30, BucketType.member) @commands.command(aliases=["m"]) - async def macro(self, ctx: Context, target: Optional[discord.Member], key: str): + async def macro( + self, ctx: Context, key: str, target: Optional[discord.Member] = None + ): await ctx.message.delete() if len(key) > 0: text = get_macro(self.bot, key) diff --git a/robocop_ng/helpers/disabled_ids.py b/robocop_ng/helpers/disabled_ids.py new file mode 100644 index 0000000..47f8ad8 --- /dev/null +++ b/robocop_ng/helpers/disabled_ids.py @@ -0,0 +1,133 @@ +import json +import os +from typing import Union + + +def get_disabled_ids_path(bot) -> str: + old_filepath = os.path.join(bot.state_dir, "data/disabled_tids.json") + new_filepath = os.path.join(bot.state_dir, "data/disabled_ids.json") + if os.path.isfile(old_filepath): + os.rename(old_filepath, new_filepath) + return new_filepath + + +def is_app_id_valid(app_id: str) -> bool: + return len(app_id) == 16 and app_id.isalnum() + + +def is_build_id_valid(build_id: str) -> bool: + return 32 <= len(build_id) <= 64 and build_id.isalnum() + + +def is_ro_section_valid(ro_section: dict[str, str]) -> bool: + return "module" in ro_section.keys() and "sdk_libraries" in ro_section.keys() + + +def get_disabled_ids(bot) -> dict[str, dict[str, Union[str, dict[str, str]]]]: + if os.path.isfile(get_disabled_ids_path(bot)): + with open(get_disabled_ids_path(bot), "r") as f: + disabled_ids = json.load(f) + # Migration code + if "app_id" not in disabled_ids.keys(): + disabled_ids = {"app_id": disabled_ids, "build_id": {}, "ro_section": {}} + return disabled_ids + + return {"app_id": {}, "build_id": {}, "ro_section": {}} + + +def set_disabled_ids(bot, contents: dict[str, dict[str, Union[str, dict[str, str]]]]): + with open(get_disabled_ids_path(bot), "w") as f: + json.dump(contents, f) + + +def is_app_id_disabled(bot, app_id: str) -> bool: + disabled_ids = get_disabled_ids(bot) + app_id = app_id.lower() + return app_id in disabled_ids["app_id"].keys() + + +def is_build_id_disabled(bot, build_id: str) -> bool: + disabled_ids = get_disabled_ids(bot) + build_id = build_id.lower() + if len(build_id) < 64: + build_id += "0" * (64 - len(build_id)) + return build_id in disabled_ids["build_id"].keys() + + +def is_ro_section_disabled(bot, ro_section: dict[str, str]) -> bool: + disabled_ids = get_disabled_ids(bot) + matches = [] + for note, entry in disabled_ids["ro_section"].items(): + for key, content in entry.items(): + matches.append(ro_section[key].lower() == content.lower()) + if all(matches): + return True + else: + matches = [] + return False + + +def add_disabled_app_id(bot, app_id: str, note="") -> bool: + disabled_ids = get_disabled_ids(bot) + app_id = app_id.lower() + if app_id not in disabled_ids["app_id"].keys(): + disabled_ids["app_id"][app_id] = note + set_disabled_ids(bot, disabled_ids) + return True + return False + + +def remove_disabled_app_id(bot, app_id: str) -> bool: + disabled_ids = get_disabled_ids(bot) + app_id = app_id.lower() + if app_id in disabled_ids["app_id"].keys(): + del disabled_ids["app_id"][app_id] + set_disabled_ids(bot, disabled_ids) + return True + return False + + +def add_disabled_build_id(bot, build_id: str, note="") -> bool: + disabled_ids = get_disabled_ids(bot) + build_id = build_id.lower() + if len(build_id) < 64: + build_id += "0" * (64 - len(build_id)) + if build_id not in disabled_ids["build_id"].keys(): + disabled_ids["build_id"][build_id] = note + set_disabled_ids(bot, disabled_ids) + return True + return False + + +def remove_disabled_build_id(bot, build_id: str) -> bool: + disabled_ids = get_disabled_ids(bot) + build_id = build_id.lower() + if len(build_id) < 64: + build_id += "0" * (64 - len(build_id)) + if build_id in disabled_ids["build_id"].keys(): + del disabled_ids["build_id"][build_id] + set_disabled_ids(bot, disabled_ids) + return True + return False + + +def add_disabled_ro_section(bot, note: str, ro_section: dict[str, str]) -> bool: + disabled_ids = get_disabled_ids(bot) + note = note.lower() + if note not in disabled_ids["ro_section"].keys(): + disabled_ids["ro_section"][note] = {} + for key, content in ro_section.items(): + disabled_ids["ro_section"][note][key] = content.lower() + set_disabled_ids(bot, disabled_ids) + return True + return False + + +def remove_disabled_ro_section(bot, note: str) -> bool: + disabled_ids = get_disabled_ids(bot) + note = note.lower() + if note in disabled_ids["ro_section"].keys(): + del disabled_ids["ro_section"][note] + set_disabled_ids(bot, disabled_ids) + return True + return False diff --git a/robocop_ng/helpers/disabled_tids.py b/robocop_ng/helpers/disabled_tids.py deleted file mode 100644 index 51a1e70..0000000 --- a/robocop_ng/helpers/disabled_tids.py +++ /dev/null @@ -1,48 +0,0 @@ -import json -import os - - -def get_disabled_tids_path(bot) -> str: - return os.path.join(bot.state_dir, "data/disabled_tids.json") - - -def is_tid_valid(tid: str) -> bool: - return len(tid) == 16 and tid.isalnum() - - -def get_disabled_tids(bot) -> dict[str, str]: - if os.path.isfile(get_disabled_tids_path(bot)): - with open(get_disabled_tids_path(bot), "r") as f: - return json.load(f) - return {} - - -def set_disabled_tids(bot, contents: dict[str, str]): - with open(get_disabled_tids_path(bot), "w") as f: - json.dump(contents, f) - - -def is_tid_disabled(bot, tid: str) -> bool: - disabled_tids = get_disabled_tids(bot) - tid = tid.lower() - return tid in disabled_tids.keys() - - -def add_disabled_tid(bot, tid: str, note="") -> bool: - disabled_tids = get_disabled_tids(bot) - tid = tid.lower() - if tid not in disabled_tids.keys(): - disabled_tids[tid] = note - set_disabled_tids(bot, disabled_tids) - return True - return False - - -def remove_disabled_tid(bot, tid: str) -> bool: - disabled_tids = get_disabled_tids(bot) - tid = tid.lower() - if tid in disabled_tids.keys(): - del disabled_tids[tid] - set_disabled_tids(bot, disabled_tids) - return True - return False