Change robocop_ng to have state directory differ from working directory

This commit is contained in:
Mary 2023-04-05 12:10:18 +02:00
parent 8463b9b2fb
commit 48f9cc5cde
34 changed files with 349 additions and 308 deletions

View file

@ -9,4 +9,4 @@ COPY . .
WORKDIR /usr/src/app
CMD [ "python", "-m", "robocop_ng" ]
CMD [ "python", "-m", "robocop_ng", "/state" ]

View file

@ -7,7 +7,14 @@ import aiohttp
import discord
from discord.ext import commands
from robocop_ng import config
if len(sys.argv[1:]) != 1:
sys.stderr.write("usage: <state_dir>")
sys.exit(1)
state_dir = os.path.abspath(sys.argv[1])
sys.path.append(state_dir)
import config
# TODO: check __name__ for __main__ nerd
@ -50,6 +57,11 @@ wanted_jsons = [
"data/persistent_roles.json",
]
for wanted_json_idx in range(len(wanted_jsons)):
wanted_jsons[wanted_json_idx] = os.path.join(
state_dir, wanted_jsons[wanted_json_idx]
)
intents = discord.Intents.all()
intents.typing = False
@ -61,6 +73,7 @@ bot.help_command = commands.DefaultHelpCommand(dm_help=True)
bot.log = log
bot.config = config
bot.script_name = script_name
bot.state_dir = state_dir
bot.wanted_jsons = wanted_jsons

View file

@ -6,7 +6,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_bot_manager
@ -94,7 +93,7 @@ class Admin(Cog):
async def cog_load_actions(self, cog_name):
if cog_name == "verification":
verif_channel = self.bot.get_channel(config.welcome_channel)
verif_channel = self.bot.get_channel(self.bot.config.welcome_channel)
await self.bot.do_resetalgo(verif_channel, "cog load")
@commands.guild_only()
@ -109,7 +108,7 @@ class Admin(Cog):
cogs_to_reload = re.findall(r"cogs/([a-z_]*).py[ ]*\|", git_output)
for cog in cogs_to_reload:
cog_name = "robocop_ng.cogs." + cog
if cog_name not in config.initial_cogs:
if cog_name not in self.bot.config.initial_cogs:
continue
try:

View file

@ -4,8 +4,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
class Basic(Cog):
def __init__(self, bot):
@ -39,7 +37,9 @@ class Basic(Cog):
async def robocop(self, ctx):
"""Shows a quick embed with bot info."""
embed = discord.Embed(
title="Robocop-NG", url=config.source_url, description=config.embed_desc
title="Robocop-NG",
url=self.bot.config.source_url,
description=self.bot.config.embed_desc,
)
embed.set_thumbnail(url=str(self.bot.user.display_avatar))

View file

@ -1,8 +1,6 @@
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
class BasicReswitched(Cog):
def __init__(self, bot):
@ -12,7 +10,7 @@ class BasicReswitched(Cog):
@commands.command()
async def communitycount(self, ctx):
"""Prints the community member count of the server."""
community = ctx.guild.get_role(config.named_roles["community"])
community = ctx.guild.get_role(self.bot.config.named_roles["community"])
await ctx.send(
f"{ctx.guild.name} has {len(community.members)} community members!"
)
@ -21,7 +19,7 @@ class BasicReswitched(Cog):
@commands.command()
async def hackercount(self, ctx):
"""Prints the hacker member count of the server."""
h4x0r = ctx.guild.get_role(config.named_roles["hacker"])
h4x0r = ctx.guild.get_role(self.bot.config.named_roles["hacker"])
await ctx.send(
f"{ctx.guild.name} has {len(h4x0r.members)} people with hacker role!"
)

View file

@ -1,29 +1,30 @@
import json
import os
import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_collaborator
class Invites(Cog):
def __init__(self, bot):
self.bot = bot
self.invites_json_path = os.path.join(self.bot.state_dir, "data/invites.json")
@commands.command()
@commands.guild_only()
@commands.check(check_if_collaborator)
async def invite(self, ctx):
welcome_channel = self.bot.get_channel(config.welcome_channel)
welcome_channel = self.bot.get_channel(self.bot.config.welcome_channel)
author = ctx.message.author
reason = f"Created by {str(author)} ({author.id})"
invite = await welcome_channel.create_invite(
max_age=0, max_uses=1, temporary=True, unique=True, reason=reason
)
with open("data/invites.json", "r") as f:
with open(self.invites_json_path, "r") as f:
invites = json.load(f)
invites[invite.id] = {
@ -33,7 +34,7 @@ class Invites(Cog):
"code": invite.code,
}
with open("data/invites.json", "w") as f:
with open(self.invites_json_path, "w") as f:
f.write(json.dumps(invites))
await ctx.message.add_reaction("🆗")

View file

@ -2,8 +2,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
class Links(Cog):
"""
@ -38,13 +36,13 @@ class Links(Cog):
@commands.command(hidden=True, aliases=["guides", "link"])
async def guide(self, ctx):
"""Link to the guides"""
await ctx.send(config.links_guide_text)
await ctx.send(self.bot.config.links_guide_text)
@commands.command()
async def source(self, ctx):
"""Gives link to source code."""
await ctx.send(
f"You can find my source at {config.source_url}. "
f"You can find my source at {self.bot.config.source_url}. "
"Serious PRs and issues welcome!"
)
@ -55,7 +53,7 @@ class Links(Cog):
targetuser = ctx.author
await ctx.send(
f"{targetuser.mention}: A link to the rules "
f"can be found here: {config.rules_url}"
f"can be found here: {self.bot.config.rules_url}"
)
@commands.command()

View file

@ -5,8 +5,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
class Lists(Cog):
"""
@ -19,7 +17,7 @@ class Lists(Cog):
# Helpers
def check_if_target_is_staff(self, target):
return any(r.id in config.staff_role_ids for r in target.roles)
return any(r.id in self.bot.config.staff_role_ids for r in target.roles)
def is_edit(self, emoji):
return str(emoji)[0] == "" or str(emoji)[0] == "📝"
@ -84,7 +82,7 @@ class Lists(Cog):
fields = embeds[0].fields
for field in fields:
if field.name == "Message ID":
files_channel = self.bot.get_channel(config.list_files_channel)
files_channel = self.bot.get_channel(self.bot.config.list_files_channel)
file_message = await files_channel.fetch_message(int(field.value))
await file_message.delete()
@ -133,7 +131,7 @@ class Lists(Cog):
await ctx.send(f"Number must be greater than 0.")
return
if channel.id not in config.list_channels:
if channel.id not in self.bot.config.list_channels:
await ctx.send(f"{channel.mention} is not a list channel.")
return
@ -160,7 +158,7 @@ class Lists(Cog):
await self.bot.wait_until_ready()
# We only care about reactions in Rules, and Support FAQ
if payload.channel_id not in config.list_channels:
if payload.channel_id not in self.bot.config.list_channels:
return
channel = self.bot.get_channel(payload.channel_id)
@ -201,8 +199,8 @@ class Lists(Cog):
await r.remove(user)
# When editing we want to provide the user a copy of the raw text.
if self.is_edit(reaction.emoji) and config.list_files_channel != 0:
files_channel = self.bot.get_channel(config.list_files_channel)
if self.is_edit(reaction.emoji) and self.bot.config.list_files_channel != 0:
files_channel = self.bot.get_channel(self.bot.config.list_files_channel)
file = discord.File(
io.BytesIO(message.content.encode("utf-8")),
filename=f"{message.id}.txt",
@ -221,7 +219,7 @@ class Lists(Cog):
await self.bot.wait_until_ready()
# We only care about reactions in Rules, and Support FAQ
if payload.channel_id not in config.list_channels:
if payload.channel_id not in self.bot.config.list_channels:
return
channel = self.bot.get_channel(payload.channel_id)
@ -232,7 +230,7 @@ class Lists(Cog):
return
# We want to remove the embed we added.
if self.is_edit(payload.emoji) and config.list_files_channel != 0:
if self.is_edit(payload.emoji) and self.bot.config.list_files_channel != 0:
await self.clean_up_raw_text_file_message(message)
@Cog.listener()
@ -240,7 +238,7 @@ class Lists(Cog):
await self.bot.wait_until_ready()
# We only care about messages in Rules, and Support FAQ
if message.channel.id not in config.list_channels:
if message.channel.id not in self.bot.config.list_channels:
return
# We don"t care about messages from bots.
@ -252,7 +250,7 @@ class Lists(Cog):
await message.delete()
return
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
channel = message.channel
content = message.content
user = message.author
@ -300,7 +298,7 @@ class Lists(Cog):
targeted_message = targeted_reaction.message
if self.is_edit(targeted_reaction):
if config.list_files_channel != 0:
if self.bot.config.list_files_channel != 0:
await self.clean_up_raw_text_file_message(targeted_message)
await targeted_message.edit(content=content)
await targeted_reaction.remove(user)

View file

@ -2,7 +2,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
@ -24,7 +23,7 @@ class Lockdown(Cog):
pass
async def unlock_for_staff(self, channel: discord.TextChannel, issuer):
for role in config.staff_role_ids:
for role in self.bot.config.staff_role_ids:
await self.set_sendmessage(channel, role, True, issuer)
@commands.guild_only()
@ -36,15 +35,15 @@ class Lockdown(Cog):
Defaults to current channel."""
if not channel:
channel = ctx.channel
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
roles = None
for key, lockdown_conf in config.lockdown_configs.items():
for key, lockdown_conf in self.bot.config.lockdown_configs.items():
if channel.id in lockdown_conf["channels"]:
roles = lockdown_conf["roles"]
if roles is None:
roles = config.lockdown_configs["default"]["roles"]
roles = self.bot.config.lockdown_configs["default"]["roles"]
for role in roles:
await self.set_sendmessage(channel, role, False, ctx.author)
@ -76,15 +75,15 @@ class Lockdown(Cog):
"""Unlocks speaking in current channel, staff only."""
if not channel:
channel = ctx.channel
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
roles = None
for key, lockdown_conf in config.lockdown_configs.items():
for key, lockdown_conf in self.bot.config.lockdown_configs.items():
if channel.id in lockdown_conf["channels"]:
roles = lockdown_conf["roles"]
if roles is None:
roles = config.lockdown_configs["default"]["roles"]
roles = self.bot.config.lockdown_configs["default"]["roles"]
await self.unlock_for_staff(channel, ctx.author)

View file

@ -5,8 +5,6 @@ import aiohttp
from discord import Colour, Embed
from discord.ext.commands import Cog
from robocop_ng import config
logging.basicConfig(
format="%(asctime)s (%(levelname)s) %(message)s (Line %(lineno)d)",
level=logging.INFO,
@ -16,7 +14,7 @@ logging.basicConfig(
class LogFileReader(Cog):
def __init__(self, bot):
self.bot = bot
self.bot_log_allowed_channels = config.bot_log_allowed_channels
self.bot_log_allowed_channels = self.bot.config.bot_log_allowed_channels
self.ryujinx_blue = Colour(0x4A90E2)
self.uploaded_log_info = []
@ -620,14 +618,16 @@ class LogFileReader(Cog):
is_channel_allowed = False
for allowed_channel_id in config.bot_log_allowed_channels.values():
for (
allowed_channel_id
) in self.bot.config.bot_log_allowed_channels.values():
if message.channel.id == allowed_channel_id:
is_channel_allowed = True
break
if is_channel_allowed:
if re.match(pr_version, self.embed["emu_info"]["ryu_version"]):
pr_version_warning = f"**⚠️ PR build logs should be posted in <#{config.bot_log_allowed_channels['pr-testing']}> if reporting bugs or tests**"
pr_version_warning = f"**⚠️ PR build logs should be posted in <#{self.bot.config.bot_log_allowed_channels['pr-testing']}> if reporting bugs or tests**"
self.embed["game_info"]["notes"].append(pr_version_warning)
if re.match(
@ -776,13 +776,13 @@ class LogFileReader(Cog):
description="\n".join(
(
f"Please upload Ryujinx log files to the correct location:\n",
f'<#{config.bot_log_allowed_channels["windows-support"]}>: Windows help and troubleshooting',
f'<#{config.bot_log_allowed_channels["linux-support"]}>: Linux help and troubleshooting',
f'<#{config.bot_log_allowed_channels["macos-support"]}>: macOS help and troubleshooting',
f'<#{config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers',
f'<#{config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion',
f'<#{config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds',
f'<#{config.bot_log_allowed_channels["linux-master-race"]}>: Linux support and discussion',
f'<#{self.bot.config.bot_log_allowed_channels["windows-support"]}>: Windows help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["linux-support"]}>: Linux help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["macos-support"]}>: macOS help and troubleshooting',
f'<#{self.bot.config.bot_log_allowed_channels["patreon-support"]}>: Help and troubleshooting for Patreon subscribers',
f'<#{self.bot.config.bot_log_allowed_channels["development"]}>: Ryujinx development discussion',
f'<#{self.bot.config.bot_log_allowed_channels["pr-testing"]}>: Discussion of in-progress pull request builds',
f'<#{self.bot.config.bot_log_allowed_channels["linux-master-race"]}>: Linux support and discussion',
)
),
colour=self.ryujinx_blue,

View file

@ -1,12 +1,13 @@
import json
import os
import re
import discord
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.restrictions import get_user_restrictions
from robocop_ng.helpers.userlogs import get_userlog
class Logs(Cog):
@ -16,6 +17,7 @@ class Logs(Cog):
def __init__(self, bot):
self.bot = bot
self.invites_json_path = os.path.join(self.bot.state_dir, "data/invites.json")
self.invite_re = re.compile(
r"((discord\.gg|discordapp\.com/" r"+invite)/+[a-zA-Z0-9-]+)", re.IGNORECASE
)
@ -23,7 +25,7 @@ class Logs(Cog):
self.clean_re = re.compile(r"[^a-zA-Z0-9_ ]+", re.UNICODE)
# All lower case, no spaces, nothing non-alphanumeric
susp_hellgex = "|".join(
[r"\W*".join(list(word)) for word in config.suspect_words]
[r"\W*".join(list(word)) for word in self.bot.config.suspect_words]
)
self.susp_hellgex = re.compile(susp_hellgex, re.IGNORECASE)
@ -31,15 +33,15 @@ class Logs(Cog):
async def on_member_join(self, member):
await self.bot.wait_until_ready()
if member.guild.id not in config.guild_whitelist:
if member.guild.id not in self.bot.config.guild_whitelist:
return
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
# We use this a lot, might as well get it once
escaped_name = self.bot.escape_message(member)
# Attempt to correlate the user joining with an invite
with open("data/invites.json", "r") as f:
with open(self.invites_json_path, "r") as f:
invites = json.load(f)
real_invites = await member.guild.invites()
@ -74,7 +76,7 @@ class Logs(Cog):
del invites[id]
# Save invites data.
with open("data/invites.json", "w") as f:
with open(self.invites_json_path, "w") as f:
f.write(json.dumps(invites))
# Prepare the invite correlation message
@ -88,7 +90,7 @@ class Logs(Cog):
# Check if user account is older than 15 minutes
age = member.joined_at - member.created_at
if age < config.min_age:
if age < self.bot.config.min_age:
try:
await member.send(
f"Your account is too new to "
@ -126,13 +128,12 @@ class Logs(Cog):
# Handles user restrictions
# Basically, gives back muted role to users that leave with it.
rsts = get_user_restrictions(member.id)
rsts = get_user_restrictions(self.bot, member.id)
roles = [discord.utils.get(member.guild.roles, id=rst) for rst in rsts]
await member.add_roles(*roles)
# Real hell zone.
with open("data/userlog.json", "r") as f:
warns = json.load(f)
warns = get_userlog()
try:
if len(warns[str(member.id)]["warns"]) == 0:
await log_channel.send(msg)
@ -170,16 +171,17 @@ class Logs(Cog):
msg += f"\n- Has invite: https://{invite[0]}"
alert = True
for susp_word in config.suspect_words:
for susp_word in self.bot.config.suspect_words:
if susp_word in cleancont and not any(
ok_word in cleancont for ok_word in config.suspect_ignored_words
ok_word in cleancont
for ok_word in self.bot.config.suspect_ignored_words
):
msg += f"\n- Contains suspicious word: `{susp_word}`"
alert = True
if alert:
msg += f"\n\nJump: <{message.jump_url}>"
spy_channel = self.bot.get_channel(config.spylog_channel)
spy_channel = self.bot.get_channel(self.bot.config.spylog_channel)
# Bad Code :tm:, blame retr0id
message_clean = message.content.replace("*", "").replace("_", "")
@ -205,13 +207,13 @@ class Logs(Cog):
f"R11 violating name by {message.author.mention} " f"({message.author.id})."
)
spy_channel = self.bot.get_channel(config.spylog_channel)
spy_channel = self.bot.get_channel(self.bot.config.spylog_channel)
await spy_channel.send(msg)
@Cog.listener()
async def on_message(self, message):
await self.bot.wait_until_ready()
if message.channel.id not in config.spy_channels:
if message.channel.id not in self.bot.config.spy_channels:
return
await self.do_spy(message)
@ -219,7 +221,7 @@ class Logs(Cog):
@Cog.listener()
async def on_message_edit(self, before, after):
await self.bot.wait_until_ready()
if after.channel.id not in config.spy_channels or after.author.bot:
if after.channel.id not in self.bot.config.spy_channels or after.author.bot:
return
# If content is the same, just skip over it
@ -233,7 +235,7 @@ class Logs(Cog):
before_content = before.clean_content.replace("`", "`\u200d")
after_content = after.clean_content.replace("`", "`\u200d")
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
msg = (
"📝 **Message edit**: \n"
@ -252,10 +254,10 @@ class Logs(Cog):
@Cog.listener()
async def on_message_delete(self, message):
await self.bot.wait_until_ready()
if message.channel.id not in config.spy_channels or message.author.bot:
if message.channel.id not in self.bot.config.spy_channels or message.author.bot:
return
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
msg = (
"🗑️ **Message delete**: \n"
f"from {self.bot.escape_message(message.author.name)} "
@ -274,10 +276,10 @@ class Logs(Cog):
async def on_member_remove(self, member):
await self.bot.wait_until_ready()
if member.guild.id not in config.guild_whitelist:
if member.guild.id not in self.bot.config.guild_whitelist:
return
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
msg = (
f"⬅️ **Leave**: {member.mention} | "
f"{self.bot.escape_message(member)}\n"
@ -289,10 +291,10 @@ class Logs(Cog):
async def on_member_ban(self, guild, member):
await self.bot.wait_until_ready()
if guild.id not in config.guild_whitelist:
if guild.id not in self.bot.config.guild_whitelist:
return
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
msg = (
f"⛔ **Ban**: {member.mention} | "
f"{self.bot.escape_message(member)}\n"
@ -304,10 +306,10 @@ class Logs(Cog):
async def on_member_unban(self, guild, user):
await self.bot.wait_until_ready()
if guild.id not in config.guild_whitelist:
if guild.id not in self.bot.config.guild_whitelist:
return
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
msg = (
f"⚠️ **Unban**: {user.mention} | "
f"{self.bot.escape_message(user)}\n"
@ -328,11 +330,11 @@ class Logs(Cog):
async def on_member_update(self, member_before, member_after):
await self.bot.wait_until_ready()
if member_after.guild.id not in config.guild_whitelist:
if member_after.guild.id not in self.bot.config.guild_whitelist:
return
msg = ""
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
if member_before.roles != member_after.roles:
# role removal
role_removal = []

View file

@ -18,12 +18,15 @@ from robocop_ng.helpers.macros import (
class Macro(Cog):
def __init__(self, bot):
self.bot = bot
@commands.cooldown(3, 30, BucketType.member)
@commands.command(aliases=["m"])
async def macro(self, ctx: Context, target: Optional[discord.Member], key: str):
await ctx.message.delete()
if len(key) > 0:
text = get_macro(key)
text = get_macro(self.bot, key)
if text is not None:
if target is not None:
await ctx.send(f"{target.mention}:\n{text}")
@ -42,7 +45,7 @@ class Macro(Cog):
@commands.check(check_if_staff)
@commands.command(name="macroadd", aliases=["ma", "addmacro", "add_macro"])
async def add_macro(self, ctx: Context, key: str, *, text: str):
if add_macro(key, text):
if add_macro(self.bot, key, text):
await ctx.send(f"Macro '{key}' added!")
else:
await ctx.send(f"Error: Macro '{key}' already exists.")
@ -53,7 +56,7 @@ class Macro(Cog):
if len(new_keys) == 0:
await ctx.send("Error: You need to add at least one alias.")
else:
if add_aliases(existing_key, list(new_keys)):
if add_aliases(self.bot, existing_key, list(new_keys)):
await ctx.send(
f"Added {len(new_keys)} aliases to macro '{existing_key}'!"
)
@ -63,7 +66,7 @@ class Macro(Cog):
@commands.check(check_if_staff)
@commands.command(name="macroedit", aliases=["me", "editmacro", "edit_macro"])
async def edit_macro(self, ctx: Context, key: str, *, text: str):
if edit_macro(key, text):
if edit_macro(self.bot, key, text):
await ctx.send(f"Macro '{key}' edited!")
else:
await ctx.send(f"Error: Macro '{key}' not found.")
@ -86,7 +89,7 @@ class Macro(Cog):
if len(remove_keys) == 0:
await ctx.send("Error: You need to remove at least one alias.")
else:
if remove_aliases(existing_key, list(remove_keys)):
if remove_aliases(self.bot, existing_key, list(remove_keys)):
await ctx.send(
f"Removed {len(remove_keys)} aliases from macro '{existing_key}'!"
)
@ -109,7 +112,7 @@ class Macro(Cog):
],
)
async def remove_macro(self, ctx: Context, key: str):
if remove_macro(key):
if remove_macro(self.bot, key):
await ctx.send(f"Macro '{key}' removed!")
else:
await ctx.send(f"Error: Macro '{key}' not found.")
@ -117,7 +120,7 @@ class Macro(Cog):
@commands.check(check_if_staff)
@commands.command(name="aliasclear", aliases=["clearalias", "clear_alias"])
async def clear_alias_macro(self, ctx: Context, existing_key: str):
if clear_aliases(existing_key):
if clear_aliases(self.bot, existing_key):
await ctx.send(f"Removed all aliases of macro '{existing_key}'!")
else:
await ctx.send(f"Error: No aliases found for macro '{existing_key}'.")
@ -125,7 +128,7 @@ class Macro(Cog):
@commands.cooldown(3, 30, BucketType.channel)
@commands.command(name="macros", aliases=["ml", "listmacros", "list_macros"])
async def list_macros(self, ctx: Context):
macros = get_macros_dict()
macros = get_macros_dict(self.bot)
if len(macros["macros"]) > 0:
macros = [f"- {key}\n" for key in sorted(macros["macros"].keys())]
message = "📝 **Macros**:\n"
@ -138,7 +141,7 @@ class Macro(Cog):
@commands.cooldown(3, 30, BucketType.channel)
@commands.command(name="aliases", aliases=["listaliases", "list_aliases"])
async def list_aliases(self, ctx: Context, existing_key: str):
macros = get_macros_dict()
macros = get_macros_dict(self.bot)
existing_key = existing_key.lower()
if existing_key in macros["aliases"].keys():
message = f"📝 **Aliases for '{existing_key}'**:\n"

View file

@ -5,7 +5,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff, check_if_bot_manager
from robocop_ng.helpers.restrictions import add_restriction, remove_restriction
from robocop_ng.helpers.userlogs import userlog
@ -16,7 +15,7 @@ class Mod(Cog):
self.bot = bot
def check_if_target_is_staff(self, target):
return any(r.id in config.staff_role_ids for r in target.roles)
return any(r.id in self.bot.config.staff_role_ids for r in target.roles)
@commands.guild_only()
@commands.check(check_if_bot_manager)
@ -27,7 +26,7 @@ class Mod(Cog):
await ctx.guild.edit(icon=img_bytes, reason=str(ctx.author))
await ctx.send(f"Done!")
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
log_msg = (
f"✏️ **Guild Icon Update**: {ctx.author} changed the guild icon."
f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
@ -62,7 +61,7 @@ class Mod(Cog):
"I can't mute this user as they're a member of staff."
)
userlog(target.id, ctx.author, reason, "mutes", target.name)
userlog(self.bot, target.id, ctx.author, reason, "mutes", target.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -79,7 +78,7 @@ class Mod(Cog):
# or has DMs disabled
pass
mute_role = ctx.guild.get_role(config.mute_role)
mute_role = ctx.guild.get_role(self.bot.config.mute_role)
await target.add_roles(mute_role, reason=str(ctx.author))
@ -99,10 +98,10 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"{target.mention} can no longer speak.")
add_restriction(target.id, config.mute_role)
add_restriction(self.bot, target.id, self.bot.config.mute_role)
@commands.guild_only()
@commands.check(check_if_staff)
@ -113,7 +112,7 @@ class Mod(Cog):
ctx, str(target)
)
mute_role = ctx.guild.get_role(config.mute_role)
mute_role = ctx.guild.get_role(self.bot.config.mute_role)
await target.remove_roles(mute_role, reason=str(ctx.author))
chan_message = (
@ -124,10 +123,10 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"{target.mention} can now speak again.")
remove_restriction(target.id, config.mute_role)
remove_restriction(self.bot, target.id, self.bot.config.mute_role)
@commands.guild_only()
@commands.bot_has_permissions(kick_members=True)
@ -156,7 +155,7 @@ class Mod(Cog):
"I can't kick this user as they're a member of staff."
)
userlog(target.id, ctx.author, reason, "kicks", target.name)
userlog(self.bot, target.id, ctx.author, reason, "kicks", target.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -195,7 +194,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"👢 {safe_name}, 👍.")
@ -228,7 +227,7 @@ class Mod(Cog):
elif self.check_if_target_is_staff(target):
return await ctx.send("I can't ban this user as they're a member of staff.")
userlog(target.id, ctx.author, reason, "bans", target.name)
userlog(self.bot, target.id, ctx.author, reason, "bans", target.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -265,7 +264,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"{safe_name} is now b&. 👍")
@ -305,7 +304,7 @@ class Mod(Cog):
"Message delete day count needs to be between 0 and 7 days."
)
userlog(target.id, ctx.author, reason, "bans", target.name)
userlog(self.bot, target.id, ctx.author, reason, "bans", target.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -343,7 +342,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(
f"{safe_name} is now b&, with {day_count} days of messages deleted. 👍"
@ -367,7 +366,7 @@ class Mod(Cog):
elif target_member and self.check_if_target_is_staff(target_member):
return await ctx.send("I can't ban this user as they're a member of staff.")
userlog(target, ctx.author, reason, "bans", target_user.name)
userlog(self.bot, target, ctx.author, reason, "bans", target_user.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -392,7 +391,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"{safe_name} is now b&. 👍")
@ -421,7 +420,7 @@ class Mod(Cog):
)
continue
userlog(target, ctx.author, f"massban", "bans", target_user.name)
userlog(self.bot, target, ctx.author, f"massban", "bans", target_user.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -441,7 +440,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"All {len(targets_int)} users are now b&. 👍")
@ -474,7 +473,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
await ctx.send(f"{safe_name} is now unb&.")
@ -494,7 +493,7 @@ class Mod(Cog):
elif self.check_if_target_is_staff(target):
return await ctx.send("I can't ban this user as they're a member of staff.")
userlog(target.id, ctx.author, reason, "bans", target.name)
userlog(self.bot, target.id, ctx.author, reason, "bans", target.name)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -519,7 +518,7 @@ class Mod(Cog):
chan_message += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_message)
@commands.guild_only()
@ -529,9 +528,10 @@ class Mod(Cog):
self, ctx, target: Optional[discord.Member], role: str = "community"
):
"""Add a role to a user (default: community), staff only."""
if role not in config.named_roles:
if role not in self.bot.config.named_roles:
return await ctx.send(
"No such role! Available roles: " + ",".join(config.named_roles)
"No such role! Available roles: "
+ ",".join(self.bot.config.named_roles)
)
if target is None and ctx.message.reference is None:
@ -544,8 +544,8 @@ class Mod(Cog):
ctx.message.reference.message_id
).author
log_channel = self.bot.get_channel(config.modlog_channel)
target_role = ctx.guild.get_role(config.named_roles[role])
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
target_role = ctx.guild.get_role(self.bot.config.named_roles[role])
if target_role in target.roles:
return await ctx.send("Target already has this role.")
@ -567,9 +567,10 @@ class Mod(Cog):
self, ctx, target: Optional[discord.Member], role: str = "community"
):
"""Remove a role from a user (default: community), staff only."""
if role not in config.named_roles:
if role not in self.bot.config.named_roles:
return await ctx.send(
"No such role! Available roles: " + ",".join(config.named_roles)
"No such role! Available roles: "
+ ",".join(self.bot.config.named_roles)
)
if target is None and ctx.message.reference is None:
@ -582,8 +583,8 @@ class Mod(Cog):
ctx.message.reference.message_id
).author
log_channel = self.bot.get_channel(config.modlog_channel)
target_role = ctx.guild.get_role(config.named_roles[role])
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
target_role = ctx.guild.get_role(self.bot.config.named_roles[role])
if target_role not in target.roles:
return await ctx.send("Target doesn't have this role.")
@ -603,7 +604,7 @@ class Mod(Cog):
@commands.command(aliases=["clear"])
async def purge(self, ctx, limit: int, channel: discord.TextChannel = None):
"""Clears a given number of messages, staff only."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
if not channel:
channel = ctx.channel
await channel.purge(limit=limit)
@ -639,8 +640,10 @@ class Mod(Cog):
"I can't warn this user as they're a member of staff."
)
log_channel = self.bot.get_channel(config.modlog_channel)
warn_count = userlog(target.id, ctx.author, reason, "warns", target.name)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
warn_count = userlog(
self.bot, target.id, ctx.author, reason, "warns", target.name
)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -655,7 +658,7 @@ class Mod(Cog):
if reason:
msg += " The given reason is: " + reason
msg += (
f"\n\nPlease read the rules in {config.rules_url}. "
f"\n\nPlease read the rules in {self.bot.config.rules_url}. "
f"This is warn #{warn_count}."
)
if warn_count == 2:
@ -718,7 +721,7 @@ class Mod(Cog):
"I can't warn this user as they're a member of staff."
)
warn_count = userlog(target, ctx.author, reason, "warns", target_user)
warn_count = userlog(self.bot, target, ctx.author, reason, "warns", target_user)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -731,7 +734,14 @@ class Mod(Cog):
)
if warn_count == 4:
userlog(target, ctx.author, "exceeded warn limit", "bans", target_user.name)
userlog(
self.bot,
target,
ctx.author,
"exceeded warn limit",
"bans",
target_user.name,
)
chan_msg += "**This resulted in an auto-hackban.**\n"
await ctx.guild.ban(
target_user,
@ -750,7 +760,7 @@ class Mod(Cog):
chan_msg += f"\n🔗 __Jump__: <{ctx.message.jump_url}>"
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
await log_channel.send(chan_msg)
await ctx.send(f"{safe_name} warned. " f"User has {warn_count} warning(s).")

View file

@ -15,7 +15,7 @@ class ModNote(Cog):
@commands.command(aliases=["addnote"])
async def note(self, ctx, target: discord.Member, *, note: str = ""):
"""Adds a note to a user, staff only."""
userlog(target.id, ctx.author, note, "notes", target.name)
userlog(self.bot, target.id, ctx.author, note, "notes", target.name)
await ctx.send(f"{ctx.author.mention}: noted!")
@commands.guild_only()
@ -23,7 +23,7 @@ class ModNote(Cog):
@commands.command(aliases=["addnoteid"])
async def noteid(self, ctx, target: int, *, note: str = ""):
"""Adds a note to a user by userid, staff only."""
userlog(target, ctx.author, note, "notes")
userlog(self.bot, target, ctx.author, note, "notes")
await ctx.send(f"{ctx.author.mention}: noted!")

View file

@ -4,7 +4,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
@ -24,7 +23,7 @@ class ModReact(Cog):
limit: int = 50,
):
"""Clears reacts from a given user in the given channel, staff only."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
if not channel:
channel = ctx.channel
count = 0
@ -49,7 +48,7 @@ class ModReact(Cog):
self, ctx, *, limit: int = 50, channel: discord.TextChannel = None
):
"""Clears all reacts in a given channel, staff only. Use with care."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
if not channel:
channel = ctx.channel
count = 0

View file

@ -1,7 +1,6 @@
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
@ -13,10 +12,10 @@ class ModReswitched(Cog):
@commands.command(aliases=["pingmods", "summonmods"])
async def pingmod(self, ctx):
"""Pings mods, only use when there's an emergency."""
can_ping = any(r.id in config.pingmods_allow for r in ctx.author.roles)
can_ping = any(r.id in self.bot.config.pingmods_allow for r in ctx.author.roles)
if can_ping:
await ctx.send(
f"<@&{config.pingmods_role}>: {ctx.author.mention} needs assistance."
f"<@&{self.bot.config.pingmods_role}>: {ctx.author.mention} needs assistance."
)
else:
await ctx.send(
@ -28,7 +27,7 @@ class ModReswitched(Cog):
@commands.command(aliases=["togglemod"])
async def modtoggle(self, ctx):
"""Toggles your mod role, staff only."""
target_role = ctx.guild.get_role(config.modtoggle_role)
target_role = ctx.guild.get_role(self.bot.config.modtoggle_role)
if target_role in ctx.author.roles:
await ctx.author.remove_roles(

View file

@ -5,7 +5,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.restrictions import add_restriction
from robocop_ng.helpers.robocronp import add_job
@ -17,7 +16,7 @@ class ModTimed(Cog):
self.bot = bot
def check_if_target_is_staff(self, target):
return any(r.id in config.staff_role_ids for r in target.roles)
return any(r.id in self.bot.config.staff_role_ids for r in target.roles)
@commands.guild_only()
@commands.bot_has_permissions(ban_members=True)
@ -49,6 +48,7 @@ class ModTimed(Cog):
)
userlog(
self.bot,
target.id,
ctx.author,
f"{reason} (Timed, until " f"{duration_text})",
@ -89,9 +89,9 @@ class ModTimed(Cog):
" as the reason is automatically sent to the user."
)
add_job("unban", target.id, {"guild": ctx.guild.id}, expiry_timestamp)
add_job(self.bot, "unban", target.id, {"guild": ctx.guild.id}, expiry_timestamp)
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
await log_channel.send(chan_message)
await ctx.send(f"{safe_name} is now b&. " f"It will expire {duration_text}. 👍")
@ -126,6 +126,7 @@ class ModTimed(Cog):
)
userlog(
self.bot,
target.id,
ctx.author,
f"{reason} (Timed, until " f"{duration_text})",
@ -149,7 +150,7 @@ class ModTimed(Cog):
# or has DMs disabled
pass
mute_role = ctx.guild.get_role(config.mute_role)
mute_role = ctx.guild.get_role(self.bot.config.mute_role)
await target.add_roles(mute_role, reason=str(ctx.author))
@ -167,14 +168,16 @@ class ModTimed(Cog):
" as the reason is automatically sent to the user."
)
add_job("unmute", target.id, {"guild": ctx.guild.id}, expiry_timestamp)
add_job(
self.bot, "unmute", target.id, {"guild": ctx.guild.id}, expiry_timestamp
)
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
await log_channel.send(chan_message)
await ctx.send(
f"{target.mention} can no longer speak. " f"It will expire {duration_text}."
)
add_restriction(target.id, config.mute_role)
add_restriction(self.bot, target.id, self.bot.config.mute_role)
async def setup(bot):

View file

@ -4,7 +4,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.userlogs import get_userlog, set_userlog, userlog_event_types
@ -22,7 +21,7 @@ class ModUserlog(Cog):
wanted_events = [event]
embed = discord.Embed(color=discord.Color.dark_red())
embed.set_author(name=f"Userlog for {name}")
userlog = get_userlog()
userlog = get_userlog(self.bot)
if uid not in userlog:
embed.description = f"There are none!{own_note} (no entry)"
@ -55,18 +54,18 @@ class ModUserlog(Cog):
return embed
def clear_event_from_id(self, uid: str, event_type):
userlog = get_userlog()
userlog = get_userlog(self.bot)
if uid not in userlog:
return f"<@{uid}> has no {event_type}!"
event_count = len(userlog[uid][event_type])
if not event_count:
return f"<@{uid}> has no {event_type}!"
userlog[uid][event_type] = []
set_userlog(json.dumps(userlog))
set_userlog(self.bot, json.dumps(userlog))
return f"<@{uid}> no longer has any {event_type}!"
def delete_event_from_id(self, uid: str, idx: int, event_type):
userlog = get_userlog()
userlog = get_userlog(self.bot)
if uid not in userlog:
return f"<@{uid}> has no {event_type}!"
event_count = len(userlog[uid][event_type])
@ -85,7 +84,7 @@ class ModUserlog(Cog):
f"Reason: {event['reason']}",
)
del userlog[uid][event_type][idx - 1]
set_userlog(json.dumps(userlog))
set_userlog(self.bot, json.dumps(userlog))
return embed
@commands.guild_only()
@ -137,7 +136,7 @@ class ModUserlog(Cog):
@commands.command(aliases=["clearwarns"])
async def clearevent(self, ctx, target: discord.Member, event="warns"):
"""Clears all events of given type for a user, staff only."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
msg = self.clear_event_from_id(str(target.id), event)
safe_name = await commands.clean_content(escape_markdown=True).convert(
ctx, str(target)
@ -156,7 +155,7 @@ class ModUserlog(Cog):
@commands.command(aliases=["clearwarnsid"])
async def cleareventid(self, ctx, target: int, event="warns"):
"""Clears all events of given type for a userid, staff only."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
msg = self.clear_event_from_id(str(target), event)
await ctx.send(msg)
msg = (
@ -171,7 +170,7 @@ class ModUserlog(Cog):
@commands.command(aliases=["delwarn"])
async def delevent(self, ctx, target: discord.Member, idx: int, event="warns"):
"""Removes a specific event from a user, staff only."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
del_event = self.delete_event_from_id(str(target.id), idx, event)
event_name = userlog_event_types[event].lower()
# This is hell.
@ -195,7 +194,7 @@ class ModUserlog(Cog):
@commands.command(aliases=["delwarnid"])
async def deleventid(self, ctx, target: int, idx: int, event="warns"):
"""Removes a specific event from a userid, staff only."""
log_channel = self.bot.get_channel(config.modlog_channel)
log_channel = self.bot.get_channel(self.bot.config.modlog_channel)
del_event = self.delete_event_from_id(str(target), idx, event)
event_name = userlog_event_types[event].lower()
# This is hell.

View file

@ -15,7 +15,7 @@ class ModWatch(Cog):
@commands.command()
async def watch(self, ctx, target: discord.Member, *, note: str = ""):
"""Puts a user under watch, staff only."""
setwatch(target.id, ctx.author, True, target.name)
setwatch(self.bot, target.id, ctx.author, True, target.name)
await ctx.send(f"{ctx.author.mention}: user is now on watch.")
@commands.guild_only()
@ -23,7 +23,7 @@ class ModWatch(Cog):
@commands.command()
async def watchid(self, ctx, target: int, *, note: str = ""):
"""Puts a user under watch by userid, staff only."""
setwatch(target, ctx.author, True, target.name)
setwatch(self.bot, target, ctx.author, True, target.name)
await ctx.send(f"{target.mention}: user is now on watch.")
@commands.guild_only()
@ -31,7 +31,7 @@ class ModWatch(Cog):
@commands.command()
async def unwatch(self, ctx, target: discord.Member, *, note: str = ""):
"""Removes a user from watch, staff only."""
setwatch(target.id, ctx.author, False, target.name)
setwatch(self.bot, target.id, ctx.author, False, target.name)
await ctx.send(f"{ctx.author.mention}: user is now not on watch.")
@commands.guild_only()
@ -39,7 +39,7 @@ class ModWatch(Cog):
@commands.command()
async def unwatchid(self, ctx, target: int, *, note: str = ""):
"""Removes a user from watch by userid, staff only."""
setwatch(target, ctx.author, False, target.name)
setwatch(self.bot, target, ctx.author, False, target.name)
await ctx.send(f"{target.mention}: user is now not on watch.")

View file

@ -5,7 +5,6 @@ from discord.enums import MessageType
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_collaborator
from robocop_ng.helpers.checks import check_if_pin_channel
@ -60,13 +59,13 @@ class Pin(Cog):
return (data["id"], data["files"]["pinboard.md"]["content"])
async def add_pin_to_pinboard(self, channel, data):
if config.github_oauth_token == "":
if self.bot.config.github_oauth_token == "":
# Don't add to gist pinboard if we don't have an oauth token
return
async with aiohttp.ClientSession() as session:
gh = gidgethub.aiohttp.GitHubAPI(
session, "RoboCop-NG", oauth_token=config.github_oauth_token
session, "RoboCop-NG", oauth_token=self.bot.config.github_oauth_token
)
(id, content) = await self.get_pinboard(gh, channel)
content += "- " + data + "\n"
@ -103,7 +102,7 @@ class Pin(Cog):
return
# Check that reaction pinning is allowd in this channel
if payload.channel_id not in config.allowed_pin_channels:
if payload.channel_id not in self.bot.config.allowed_pin_channels:
return
target_guild = self.bot.get_guild(payload.guild_id)
@ -112,7 +111,7 @@ class Pin(Cog):
# Check that the user is allowed to reaction-pin
target_user = target_guild.get_member(payload.user_id)
for role in config.staff_role_ids + config.allowed_pin_roles:
for role in self.bot.config.staff_role_ids + self.bot.config.allowed_pin_roles:
if role in [role.id for role in target_user.roles]:
target_chan = self.bot.get_channel(payload.channel_id)
target_msg = await target_chan.get_message(payload.message_id)

View file

@ -17,7 +17,7 @@ class Remind(Cog):
@commands.command()
async def remindlist(self, ctx):
"""Lists your reminders."""
ctab = get_crontab()
ctab = get_crontab(self.bot)
uid = str(ctx.author.id)
embed = discord.Embed(title=f"Active robocronp jobs")
for jobtimestamp in ctab["remind"]:
@ -61,6 +61,7 @@ class Remind(Cog):
added_on = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S (UTC)")
add_job(
self.bot,
"remind",
ctx.author.id,
{"text": safe_text, "added": added_on},

View file

@ -5,7 +5,6 @@ import discord
from discord.ext import commands, tasks
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
from robocop_ng.helpers.restrictions import remove_restriction
from robocop_ng.helpers.robocronp import get_crontab, delete_job
@ -26,7 +25,7 @@ class Robocronp(Cog):
async def send_data(self):
await self.bot.wait_until_ready()
data_files = [discord.File(fpath) for fpath in self.bot.wanted_jsons]
log_channel = await self.bot.get_channel_safe(config.botlog_channel)
log_channel = await self.bot.get_channel_safe(self.bot.config.botlog_channel)
await log_channel.send("Hourly data backups:", files=data_files)
@commands.guild_only()
@ -34,7 +33,7 @@ class Robocronp(Cog):
@commands.command()
async def listjobs(self, ctx):
"""Lists timed robocronp jobs, staff only."""
ctab = get_crontab()
ctab = get_crontab(self.bot)
embed = discord.Embed(title=f"Active robocronp jobs")
for jobtype in ctab:
for jobtimestamp in ctab[jobtype]:
@ -59,31 +58,31 @@ class Robocronp(Cog):
- job name (userid, like 420332322307571713)
You can get all 3 from listjobs command."""
delete_job(timestamp, job_type, job_name)
delete_job(self.bot, timestamp, job_type, job_name)
await ctx.send(f"{ctx.author.mention}: Deleted!")
async def do_jobs(self, ctab, jobtype, timestamp):
await self.bot.wait_until_ready()
log_channel = await self.bot.get_channel_safe(config.botlog_channel)
log_channel = await self.bot.get_channel_safe(self.bot.config.botlog_channel)
for job_name in ctab[jobtype][timestamp]:
try:
job_details = ctab[jobtype][timestamp][job_name]
if jobtype == "unban":
target_user = await self.bot.fetch_user(job_name)
target_guild = self.bot.get_guild(job_details["guild"])
delete_job(timestamp, jobtype, job_name)
delete_job(self.bot, timestamp, jobtype, job_name)
await target_guild.unban(
target_user, reason="Robocronp: Timed ban expired."
)
elif jobtype == "unmute":
remove_restriction(job_name, config.mute_role)
remove_restriction(self.bot, job_name, self.bot.config.mute_role)
target_guild = self.bot.get_guild(job_details["guild"])
target_member = target_guild.get_member(int(job_name))
target_role = target_guild.get_role(config.mute_role)
target_role = target_guild.get_role(self.bot.config.mute_role)
await target_member.remove_roles(
target_role, reason="Robocronp: Timed mute expired."
)
delete_job(timestamp, jobtype, job_name)
delete_job(self.bot, timestamp, jobtype, job_name)
elif jobtype == "remind":
text = job_details["text"]
added_on = job_details["added"]
@ -92,10 +91,10 @@ class Robocronp(Cog):
await target.send(
f"You asked to be reminded about `{text}` on {added_on}."
)
delete_job(timestamp, jobtype, job_name)
delete_job(self.bot, timestamp, jobtype, job_name)
except:
# Don't kill cronjobs if something goes wrong.
delete_job(timestamp, jobtype, job_name)
delete_job(self.bot, timestamp, jobtype, job_name)
await log_channel.send(
"Crondo has errored, job deleted: ```"
f"{traceback.format_exc()}```"
@ -103,7 +102,7 @@ class Robocronp(Cog):
async def clean_channel(self, channel_id):
await self.bot.wait_until_ready()
log_channel = await self.bot.get_channel_safe(config.botlog_channel)
log_channel = await self.bot.get_channel_safe(self.bot.config.botlog_channel)
channel = await self.bot.get_channel_safe(channel_id)
try:
done_cleaning = False
@ -125,9 +124,9 @@ class Robocronp(Cog):
@tasks.loop(minutes=1)
async def minutely(self):
await self.bot.wait_until_ready()
log_channel = await self.bot.get_channel_safe(config.botlog_channel)
log_channel = await self.bot.get_channel_safe(self.bot.config.botlog_channel)
try:
ctab = get_crontab()
ctab = get_crontab(self.bot)
timestamp = time.time()
for jobtype in ctab:
for jobtimestamp in ctab[jobtype]:
@ -135,7 +134,7 @@ class Robocronp(Cog):
await self.do_jobs(ctab, jobtype, jobtimestamp)
# Handle clean channels
for clean_channel in config.minutely_clean_channels:
for clean_channel in self.bot.config.minutely_clean_channels:
await self.clean_channel(clean_channel)
except:
# Don't kill cronjobs if something goes wrong.
@ -146,11 +145,11 @@ class Robocronp(Cog):
@tasks.loop(hours=1)
async def hourly(self):
await self.bot.wait_until_ready()
log_channel = await self.bot.get_channel_safe(config.botlog_channel)
log_channel = await self.bot.get_channel_safe(self.bot.config.botlog_channel)
try:
await self.send_data()
# Handle clean channels
for clean_channel in config.hourly_clean_channels:
for clean_channel in self.bot.config.hourly_clean_channels:
await self.clean_channel(clean_channel)
except:
# Don't kill cronjobs if something goes wrong.
@ -161,11 +160,13 @@ class Robocronp(Cog):
@tasks.loop(hours=24)
async def daily(self):
await self.bot.wait_until_ready()
log_channel = await self.bot.get_channel_safe(config.botlog_channel)
log_channel = await self.bot.get_channel_safe(self.bot.config.botlog_channel)
try:
# Reset verification and algorithm
if "cogs.verification" in config.initial_cogs:
verif_channel = await self.bot.get_channel_safe(config.welcome_channel)
if "cogs.verification" in self.bot.config.initial_cogs:
verif_channel = await self.bot.get_channel_safe(
self.bot.config.welcome_channel
)
await self.bot.do_resetalgo(verif_channel, "daily robocronp")
except:
# Don't kill cronjobs if something goes wrong.

View file

@ -22,11 +22,11 @@ class RolePersistence(Cog):
save_roles.append(role.id)
if len(save_roles) > 0:
add_user_roles(payload.user.id, save_roles)
add_user_roles(self.bot, payload.user.id, save_roles)
@Cog.listener()
async def on_member_join(self, member: Member):
user_roles = get_user_roles(member.id)
user_roles = get_user_roles(self.bot, member.id)
if len(user_roles) > 0:
user_roles = [
member.guild.get_role(int(role))

View file

@ -6,7 +6,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
@ -14,10 +13,12 @@ class RyujinxReactionRoles(Cog):
def __init__(self, bot):
self.bot = bot
self.channel_id = (
config.reaction_roles_channel_id
self.bot.config.reaction_roles_channel_id
) # The channel to send the reaction role message. (self-roles channel)
self.file = "data/reactionroles.json" # the file to store the required reaction role data. (message id of the RR message.)
self.file = os.path.join(
self.bot.state_dir, "data/reactionroles.json"
) # the file to store the required reaction role data. (message id of the RR message.)
self.msg_id = None
self.m = None # the msg object
@ -33,7 +34,7 @@ class RyujinxReactionRoles(Cog):
if emoji_name[0] == "<":
emoji_name = emoji_name[1:-1]
if target_role_id in config.staff_role_ids:
if target_role_id in self.bot.config.staff_role_ids:
return await ctx.send("Error: Dangerous role found!")
target_role = ctx.guild.get_role(target_role_id)
@ -155,7 +156,6 @@ class RyujinxReactionRoles(Cog):
def load_reaction_config(self):
if not os.path.exists(self.file):
self.bot.log.error("HERE?!")
with open(self.file, "w") as f:
json.dump({}, f)

View file

@ -2,7 +2,6 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
@ -18,10 +17,10 @@ class RyujinxVerification(Cog):
async def on_member_join(self, member):
await self.bot.wait_until_ready()
if member.guild.id not in config.guild_whitelist:
if member.guild.id not in self.bot.config.guild_whitelist:
return
join_channel = self.bot.get_channel(config.welcome_channel)
join_channel = self.bot.get_channel(self.bot.config.welcome_channel)
if join_channel is not None:
await join_channel.send(
@ -32,14 +31,14 @@ class RyujinxVerification(Cog):
async def process_message(self, message):
"""Process the verification process"""
if message.channel.id == config.welcome_channel:
if message.channel.id == self.bot.config.welcome_channel:
# Assign common stuff into variables to make stuff less of a mess
mcl = message.content.lower()
# Get the role we will give in case of success
success_role = message.guild.get_role(config.participant_role)
success_role = message.guild.get_role(self.bot.config.participant_role)
if config.verification_string == mcl:
if self.bot.config.verification_string == mcl:
await message.author.add_roles(success_role)
await message.delete()
@ -69,22 +68,22 @@ class RyujinxVerification(Cog):
async def on_member_join(self, member):
await self.bot.wait_until_ready()
if member.guild.id not in config.guild_whitelist:
if member.guild.id not in self.bot.config.guild_whitelist:
return
join_channel = self.bot.get_channel(config.welcome_channel)
join_channel = self.bot.get_channel(self.bot.config.welcome_channel)
if join_channel is not None:
await join_channel.send(config.join_message.format(member))
await join_channel.send(self.bot.config.join_message.format(member))
@commands.check(check_if_staff)
@commands.command()
async def reset(self, ctx, limit: int = 100, force: bool = False):
"""Wipes messages and pastes the welcome message again. Staff only."""
if ctx.message.channel.id != config.welcome_channel and not force:
if ctx.message.channel.id != self.bot.config.welcome_channel and not force:
await ctx.send(
f"This command is limited to"
f" <#{config.welcome_channel}>, unless forced."
f" <#{self.bot.config.welcome_channel}>, unless forced."
)
return
await self.do_reset(ctx.channel, ctx.author.mention, limit)

View file

@ -1,7 +1,6 @@
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff_or_ot
@ -16,8 +15,8 @@ class SAR(Cog):
"""Lists self assignable roles."""
return await ctx.send(
"Self assignable roles in this guild: "
+ ",".join(config.self_assignable_roles)
+ f"\n\nRun `{config.prefixes[0]}iam role_name_goes_here` to get or remove one."
+ ",".join(self.bot.config.self_assignable_roles)
+ f"\n\nRun `{self.bot.config.prefixes[0]}iam role_name_goes_here` to get or remove one."
)
@commands.cooldown(1, 30, type=commands.BucketType.user)
@ -26,12 +25,12 @@ class SAR(Cog):
@commands.check(check_if_staff_or_ot)
async def iam(self, ctx, role: str):
"""Gets you a self assignable role."""
if role not in config.self_assignable_roles:
if role not in self.bot.config.self_assignable_roles:
return await ctx.send(
"There's no self assignable role with that name. Run .sar to see what you can self assign."
)
target_role = ctx.guild.get_role(config.self_assignable_roles[role])
target_role = ctx.guild.get_role(self.bot.config.self_assignable_roles[role])
if target_role in ctx.author.roles:
await ctx.author.remove_roles(target_role, reason=str(ctx.author))

View file

@ -8,14 +8,13 @@ import discord
from discord.ext import commands
from discord.ext.commands import Cog
from robocop_ng import config
from robocop_ng.helpers.checks import check_if_staff
class Verification(Cog):
def __init__(self, bot):
self.bot = bot
self.hash_choice = random.choice(config.welcome_hashes)
self.hash_choice = random.choice(self.bot.config.welcome_hashes)
# Export reset channel functions
self.bot.do_reset = self.do_reset
@ -24,10 +23,10 @@ class Verification(Cog):
async def do_reset(self, channel, author, limit: int = 100):
await channel.purge(limit=limit)
await channel.send(config.welcome_header)
await channel.send(self.bot.config.welcome_header)
rules = [
"**{}**. {}".format(i, cleandoc(r))
for i, r in enumerate(config.welcome_rules, 1)
for i, r in enumerate(self.bot.config.welcome_rules, 1)
]
rule_choice = random.randint(2, len(rules))
hash_choice_str = self.hash_choice.upper()
@ -35,12 +34,14 @@ class Verification(Cog):
hash_choice_str += "-512"
elif hash_choice_str == "BLAKE2S":
hash_choice_str += "-256"
rules[rule_choice - 1] += "\n" + config.hidden_term_line.format(hash_choice_str)
rules[rule_choice - 1] += "\n" + self.bot.config.hidden_term_line.format(
hash_choice_str
)
msg = (
f"🗑 **Reset**: {author} cleared {limit} messages " f" in {channel.mention}"
)
msg += f"\n💬 __Current challenge location__: under rule {rule_choice}"
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
await log_channel.send(msg)
# find rule that puts us over 2,000 characters, if any
@ -62,19 +63,19 @@ class Verification(Cog):
await channel.send(item)
await asyncio.sleep(1)
for x in config.welcome_footer:
for x in self.bot.config.welcome_footer:
await channel.send(cleandoc(x))
await asyncio.sleep(1)
async def do_resetalgo(self, channel, author, limit: int = 100):
# randomize hash_choice on reset
self.hash_choice = random.choice(tuple(config.welcome_hashes))
self.hash_choice = random.choice(tuple(self.bot.config.welcome_hashes))
msg = (
f"📘 **Reset Algorithm**: {author} reset " f"algorithm in {channel.mention}"
)
msg += f"\n💬 __Current algorithm__: {self.hash_choice.upper()}"
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
await log_channel.send(msg)
await self.do_reset(channel, author)
@ -83,10 +84,10 @@ class Verification(Cog):
@commands.command()
async def reset(self, ctx, limit: int = 100, force: bool = False):
"""Wipes messages and pastes the welcome message again. Staff only."""
if ctx.message.channel.id != config.welcome_channel and not force:
if ctx.message.channel.id != self.bot.config.welcome_channel and not force:
await ctx.send(
f"This command is limited to"
f" <#{config.welcome_channel}>, unless forced."
f" <#{self.bot.config.welcome_channel}>, unless forced."
)
return
await self.do_reset(ctx.channel, ctx.author.mention, limit)
@ -95,10 +96,10 @@ class Verification(Cog):
@commands.command()
async def resetalgo(self, ctx, limit: int = 100, force: bool = False):
"""Resets the verification algorithm and does what reset does. Staff only."""
if ctx.message.channel.id != config.welcome_channel and not force:
if ctx.message.channel.id != self.bot.config.welcome_channel and not force:
await ctx.send(
f"This command is limited to"
f" <#{config.welcome_channel}>, unless forced."
f" <#{self.bot.config.welcome_channel}>, unless forced."
)
return
@ -109,7 +110,7 @@ class Verification(Cog):
Not really a rewrite but more of a port
Git blame tells me that I should blame/credit Robin Lambertz"""
if message.channel.id == config.welcome_channel:
if message.channel.id == self.bot.config.welcome_channel:
# Assign common stuff into variables to make stuff less of a mess
member = message.author
full_name = str(member)
@ -136,7 +137,7 @@ class Verification(Cog):
return await chan.send(snark)
# Get the role we will give in case of success
success_role = guild.get_role(config.named_roles["participant"])
success_role = guild.get_role(self.bot.config.named_roles["participant"])
# Get a list of stuff we'll allow and will consider close
allowed_names = [f"@{full_name}", full_name, str(member.id)]
@ -169,11 +170,13 @@ class Verification(Cog):
)
# Detect if the user uses the wrong hash algorithm
wrong_hash_algos = list(set(config.welcome_hashes) - {self.hash_choice})
wrong_hash_algos = list(
set(self.bot.config.welcome_hashes) - {self.hash_choice}
)
for algo in wrong_hash_algos:
for name in itertools.chain(allowed_names, close_names):
if hashlib.new(algo, name.encode("utf-8")).hexdigest() in mcl:
log_channel = self.bot.get_channel(config.log_channel)
log_channel = self.bot.get_channel(self.bot.config.log_channel)
await log_channel.send(
f"User {message.author.mention} tried verification with algo {algo} instead of {self.hash_choice}."
)

View file

@ -6,8 +6,6 @@ import secrets
from discord.ext.commands import Cog
from robocop_ng import config
class YubicoOTP(Cog):
def __init__(self, bot):
@ -58,7 +56,7 @@ class YubicoOTP(Cog):
return int("".join(hexconv), 16)
def calc_signature(self, text):
key = base64.b64decode(config.yubico_otp_secret)
key = base64.b64decode(self.bot.config.yubico_otp_secret)
signature_bytes = hmac.digest(key, text.encode(), "SHA1")
return base64.b64encode(signature_bytes).decode()
@ -74,10 +72,10 @@ class YubicoOTP(Cog):
async def validate_yubico_otp(self, otp):
nonce = secrets.token_hex(15) # Random number in the valid range
params = f"id={config.yubico_otp_client_id}&nonce={nonce}&otp={otp}"
params = f"id={self.bot.config.yubico_otp_client_id}&nonce={nonce}&otp={otp}"
# If secret is supplied, sign our request
if config.yubico_otp_secret:
if self.bot.config.yubico_otp_secret:
params += "&h=" + self.calc_signature(params)
for api_server in self.api_servers:
@ -101,7 +99,7 @@ class YubicoOTP(Cog):
assert datafields["nonce"] == nonce
# Verify signature if secret is present
if config.yubico_otp_secret:
if self.bot.config.yubico_otp_secret:
assert self.validate_response_signature(datafields)
# If we got a success, then return True

View file

@ -1,4 +1,4 @@
from robocop_ng import config
import config
def check_if_staff(ctx):

View file

@ -2,12 +2,14 @@ import json
import os
from typing import Optional, Union
MACROS_FILE = "data/macros.json"
def get_crontab_path(bot):
return os.path.join(bot.state_dir, "data/macros.json")
def get_macros_dict() -> dict[str, dict[str, Union[list[str], str]]]:
if os.path.isfile(MACROS_FILE):
with open(MACROS_FILE, "r") as f:
def get_macros_dict(bot) -> dict[str, dict[str, Union[list[str], str]]]:
if os.path.isfile(get_crontab_path(bot)):
with open(get_crontab_path(bot), "r") as f:
macros = json.load(f)
# Migration code
@ -37,10 +39,10 @@ def get_macros_dict() -> dict[str, dict[str, Union[list[str], str]]]:
def is_macro_key_available(
key: str, macros: dict[str, dict[str, Union[list[str], str]]] = None
bot, key: str, macros: dict[str, dict[str, Union[list[str], str]]] = None
) -> bool:
if macros is None:
macros = get_macros_dict()
macros = get_macros_dict(bot)
if key in macros["macros"].keys():
return False
for aliases in macros["aliases"].values():
@ -49,13 +51,13 @@ def is_macro_key_available(
return True
def set_macros(contents: dict[str, dict[str, Union[list[str], str]]]):
with open(MACROS_FILE, "w") as f:
def set_macros(bot, contents: dict[str, dict[str, Union[list[str], str]]]):
with open(get_crontab_path(bot), "w") as f:
json.dump(contents, f)
def get_macro(key: str) -> Optional[str]:
macros = get_macros_dict()
def get_macro(bot, key: str) -> Optional[str]:
macros = get_macros_dict(bot)
key = key.lower()
if key in macros["macros"].keys():
return macros["macros"][key]
@ -65,43 +67,43 @@ def get_macro(key: str) -> Optional[str]:
return None
def add_macro(key: str, message: str) -> bool:
macros = get_macros_dict()
def add_macro(bot, key: str, message: str) -> bool:
macros = get_macros_dict(bot)
key = key.lower()
if is_macro_key_available(key, macros):
if is_macro_key_available(bot, key, macros):
macros["macros"][key] = message
set_macros(macros)
set_macros(bot, macros)
return True
return False
def add_aliases(key: str, aliases: list[str]) -> bool:
macros = get_macros_dict()
def add_aliases(bot, key: str, aliases: list[str]) -> bool:
macros = get_macros_dict(bot)
key = key.lower()
success = False
if key in macros["macros"].keys():
for alias in aliases:
alias = alias.lower()
if is_macro_key_available(alias, macros):
if is_macro_key_available(bot, alias, macros):
macros["aliases"][key].append(alias)
success = True
if success:
set_macros(macros)
set_macros(bot, macros)
return success
def edit_macro(key: str, message: str) -> bool:
macros = get_macros_dict()
def edit_macro(bot, key: str, message: str) -> bool:
macros = get_macros_dict(bot)
key = key.lower()
if key in macros["macros"].keys():
macros["macros"][key] = message
set_macros(macros)
set_macros(bot, macros)
return True
return False
def remove_aliases(key: str, aliases: list[str]) -> bool:
macros = get_macros_dict()
def remove_aliases(bot, key: str, aliases: list[str]) -> bool:
macros = get_macros_dict(bot)
key = key.lower()
success = False
if key not in macros["aliases"].keys():
@ -114,25 +116,25 @@ def remove_aliases(key: str, aliases: list[str]) -> bool:
del macros["aliases"][key]
success = True
if success:
set_macros(macros)
set_macros(bot, macros)
return success
def remove_macro(key: str) -> bool:
macros = get_macros_dict()
def remove_macro(bot, key: str) -> bool:
macros = get_macros_dict(bot)
key = key.lower()
if key in macros["macros"].keys():
del macros["macros"][key]
set_macros(macros)
set_macros(bot, macros)
return True
return False
def clear_aliases(key: str) -> bool:
macros = get_macros_dict()
def clear_aliases(bot, key: str) -> bool:
macros = get_macros_dict(bot)
key = key.lower()
if key in macros["macros"].keys() and key in macros["aliases"].keys():
del macros["aliases"][key]
set_macros(macros)
set_macros(bot, macros)
return True
return False

View file

@ -1,42 +1,47 @@
import json
import os
def get_restrictions():
with open("data/restrictions.json", "r") as f:
def get_restrictions_path(bot):
return os.path.join(bot.state_dir, "data/restrictions.json")
def get_restrictions(bot):
with open(get_restrictions_path(bot), "r") as f:
return json.load(f)
def set_restrictions(contents):
with open("data/restrictions.json", "w") as f:
def set_restrictions(bot, contents):
with open(get_restrictions_path(bot), "w") as f:
f.write(contents)
def get_user_restrictions(uid):
def get_user_restrictions(bot, uid):
uid = str(uid)
with open("data/restrictions.json", "r") as f:
with open(get_restrictions_path(bot), "r") as f:
rsts = json.load(f)
if uid in rsts:
return rsts[uid]
return []
def add_restriction(uid, rst):
def add_restriction(bot, uid, rst):
# mostly from kurisu source, credits go to ihaveamac
uid = str(uid)
rsts = get_restrictions()
rsts = get_restrictions(bot)
if uid not in rsts:
rsts[uid] = []
if rst not in rsts[uid]:
rsts[uid].append(rst)
set_restrictions(json.dumps(rsts))
set_restrictions(bot, json.dumps(rsts))
def remove_restriction(uid, rst):
def remove_restriction(bot, uid, rst):
# mostly from kurisu source, credits go to ihaveamac
uid = str(uid)
rsts = get_restrictions()
rsts = get_restrictions(bot)
if uid not in rsts:
rsts[uid] = []
if rst in rsts[uid]:
rsts[uid].remove(rst)
set_restrictions(json.dumps(rsts))
set_restrictions(bot, json.dumps(rsts))

View file

@ -1,21 +1,26 @@
import json
import math
import os
def get_crontab():
with open("data/robocronptab.json", "r") as f:
def get_crontab_path(bot):
return os.path.join(bot.state_dir, "data/robocronptab.json")
def get_crontab(bot):
with open(get_crontab_path(bot), "r") as f:
return json.load(f)
def set_crontab(contents):
with open("data/robocronptab.json", "w") as f:
def set_crontab(bot, contents):
with open(get_crontab_path(bot), "w") as f:
f.write(contents)
def add_job(job_type, job_name, job_details, timestamp):
def add_job(bot, job_type, job_name, job_details, timestamp):
timestamp = str(math.floor(timestamp))
job_name = str(job_name)
ctab = get_crontab()
ctab = get_crontab(bot)
if job_type not in ctab:
ctab[job_type] = {}
@ -24,14 +29,14 @@ def add_job(job_type, job_name, job_details, timestamp):
ctab[job_type][timestamp] = {}
ctab[job_type][timestamp][job_name] = job_details
set_crontab(json.dumps(ctab))
set_crontab(bot, json.dumps(ctab))
def delete_job(timestamp, job_type, job_name):
def delete_job(bot, timestamp, job_type, job_name):
timestamp = str(timestamp)
job_name = str(job_name)
ctab = get_crontab()
ctab = get_crontab(bot)
del ctab[job_type][timestamp][job_name]
set_crontab(json.dumps(ctab))
set_crontab(bot, json.dumps(ctab))

View file

@ -1,33 +1,36 @@
import json
import os.path
PERSISTENT_ROLES_FILE = "data/persistent_roles.json"
import os
def get_persistent_roles() -> dict[str, list[str]]:
if os.path.isfile(PERSISTENT_ROLES_FILE):
with open(PERSISTENT_ROLES_FILE, "r") as f:
def get_persistent_roles_path(bot):
return os.path.join(bot.state_dir, "data/persistent_roles.json")
def get_persistent_roles(bot) -> dict[str, list[str]]:
if os.path.isfile(get_persistent_roles_path(bot)):
with open(get_persistent_roles_path(bot), "r") as f:
return json.load(f)
return {}
def set_persistent_roles(contents: dict[str, list[str]]):
with open(PERSISTENT_ROLES_FILE, "w") as f:
def set_persistent_roles(bot, contents: dict[str, list[str]]):
with open(get_persistent_roles_path(bot), "w") as f:
json.dump(contents, f)
def add_user_roles(uid: int, roles: list[int]):
def add_user_roles(bot, uid: int, roles: list[int]):
uid = str(uid)
roles = [str(x) for x in roles]
persistent_roles = get_persistent_roles()
persistent_roles = get_persistent_roles(bot)
persistent_roles[uid] = roles
set_persistent_roles(persistent_roles)
set_persistent_roles(bot, persistent_roles)
def get_user_roles(uid: int) -> list[str]:
def get_user_roles(bot, uid: int) -> list[str]:
uid = str(uid)
with open(PERSISTENT_ROLES_FILE, "r") as f:
with open(get_persistent_roles_path(bot), "r") as f:
roles = json.load(f)
if uid in roles:
return roles[uid]

View file

@ -1,4 +1,5 @@
import json
import os
import time
userlog_event_types = {
@ -10,18 +11,22 @@ userlog_event_types = {
}
def get_userlog():
with open("data/userlog.json", "r") as f:
def get_userlog_path(bot):
return os.path.join(bot.state_dir, "data/userlog.json")
def get_userlog(bot):
with open(get_userlog_path(bot), "r") as f:
return json.load(f)
def set_userlog(contents):
with open("data/userlog.json", "w") as f:
def set_userlog(bot, contents):
with open(get_userlog_path(bot), "w") as f:
f.write(contents)
def fill_userlog(userid, uname):
userlogs = get_userlog()
def fill_userlog(bot, userid, uname):
userlogs = get_userlog(bot)
uid = str(userid)
if uid not in userlogs:
userlogs[uid] = {
@ -39,8 +44,8 @@ def fill_userlog(userid, uname):
return userlogs, uid
def userlog(uid, issuer, reason, event_type, uname: str = ""):
userlogs, uid = fill_userlog(uid, uname)
def userlog(bot, uid, issuer, reason, event_type, uname: str = ""):
userlogs, uid = fill_userlog(bot, uid, uname)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log_data = {
@ -52,13 +57,13 @@ def userlog(uid, issuer, reason, event_type, uname: str = ""):
if event_type not in userlogs[uid]:
userlogs[uid][event_type] = []
userlogs[uid][event_type].append(log_data)
set_userlog(json.dumps(userlogs))
set_userlog(bot, json.dumps(userlogs))
return len(userlogs[uid][event_type])
def setwatch(uid, issuer, watch_state, uname: str = ""):
userlogs, uid = fill_userlog(uid, uname)
def setwatch(bot, uid, issuer, watch_state, uname: str = ""):
userlogs, uid = fill_userlog(bot, uid, uname)
userlogs[uid]["watch"] = watch_state
set_userlog(json.dumps(userlogs))
set_userlog(bot, json.dumps(userlogs))
return