Browse Source

Re-write Wizards

wizards v1.5.0
Cassidy Dingenskirchen 2 months ago
parent
commit
547751aba7
Signed by: StDingenskirchen GPG Key ID: 14FE9712CC42FE8B
5 changed files with 593 additions and 491 deletions
  1. +2
    -1
      .pylintrc
  2. +278
    -272
      blimp/cogs/wizard.py
  3. +5
    -0
      blimp/customizations.py
  4. +307
    -217
      blimp/progress.py
  5. +1
    -1
      pyproject.toml

+ 2
- 1
.pylintrc View File

@ -6,4 +6,5 @@ init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.app
# missing-module-docstring: effectively all modules here just contain one class
# bad-continuation: black wants it the other way
# logging-fstring-interpolation: better readability, main concern is irrelevant to this
disable=missing-module-docstring,bad-continuation,logging-fstring-interpolation
# unsubscriptable-object: false positive for 3.9 type annotations, cf. https://github.com/PyCQA/pylint/issues/3882
disable=missing-module-docstring,bad-continuation,logging-fstring-interpolation,unsubscriptable-object

+ 278
- 272
blimp/cogs/wizard.py View File

@ -1,21 +1,9 @@
import json
import discord
from discord.ext import commands
from ..customizations import Blimp, Unauthorized
from ..progress import (
AutoProgress,
display_emoji,
message_id_to_message,
wait_for_bool,
wait_for_category,
wait_for_channel,
wait_for_emoji,
wait_for_message_id,
wait_for_number,
wait_for_role,
)
from ..progress import CanceledError, ProgressII, display
class Wizard(Blimp.Cog):
@ -32,103 +20,76 @@ class Wizard(Blimp.Cog):
if not ctx.privileged_modify(ctx.guild):
raise Unauthorized()
progress = AutoProgress(
progress = ProgressII(
ctx,
"Create or Update a Board",
ctx.bot.get_command("board" + ctx.bot.suffix).help,
(
"Channel",
"Please start by typing which channel you wish to create or edit a Board in.",
wait_for_channel(ctx),
lambda channel: channel.mention,
),
ctx.bot.get_command(f"board{ctx.bot.suffix}").help,
)
results = await progress.start()
if not results:
return
await progress.update()
try:
channel = await progress.input(
"Channel",
"Please start by typing the channel you wish to create or edit a Board in.",
ProgressII.InputKindOption.CHANNEL,
None,
)
old = ctx.database.execute(
"SELECT * FROM board_configuration WHERE oid=:oid",
{"oid": ctx.objects.by_data(tc=results["Channel"].id)},
).fetchone()
old = ctx.database.execute(
"SELECT * FROM board_configuration WHERE oid=:oid",
{"oid": ctx.objects.by_data(tc=channel.id)},
).fetchone()
data = None
if old:
data = json.loads(old["data"])
if old:
await progress.edit_last_stage(
progress.edit_last_field(
None,
f"{results['Channel'].mention} — Board configuration already exists, updating.",
f"{channel.mention} — "
+ (
"Board configuration already exists, updating."
if old
else "No prior configuration, creating new Board."
),
False,
)
data = json.loads(old["data"])
results |= await progress.proceed(
(
"Emoji",
"Now, please type which emoji should cause messages to get posted into the "
"Board. If you don't care about any particular emoji, answer `any`.\n"
f"The current emoji is {ctx.bot.get_emoji(data[0]) or data[0]}.",
wait_for_emoji,
display_emoji,
),
(
"Minimum Count",
"Almost done! Please type how many emoji of the same type should be "
"required before a message gets reposted.\n"
f"The current minimum count is {data[1]}.",
wait_for_number(),
str,
),
(
"Old Messages",
"Finally, please type if messages that are older than now should be able "
"to get reposted. This may be undesirable e.g. if you are migrating from "
"another bot's starboard.\nCurrently, messages "
+ ("can't " if old["post_age_limit"] else "can ")
+ "be reposted.",
wait_for_bool(),
str,
),
emoji = await progress.input(
"Emoji",
"Now, please type which emoji should cause messages to get posted into the Board. "
"You can also type `any`, in which case BLIMP will only consider the reaction count"
"and not the emoji itself.",
ProgressII.InputKindOption.EMOJI,
(ctx.bot.get_emoji(data[0]) or data[0]) if old else None,
)
else:
await progress.edit_last_stage(
None,
f"{results['Channel'].mention} — No prior configuration, creating new Board.",
False,
min_count = await progress.input(
"Minimum Count",
"Almost done! Please type how many emoji of the same type should be required before"
"a message gets reposted.",
ProgressII.InputKindOption.INTEGER,
data[1] if old else None,
)
results |= await progress.proceed(
(
"Emoji",
"Now, please type which emoji should cause messages to get posted into the "
"Board. If you don't care about any particular emoji, answer `any`.",
wait_for_emoji,
display_emoji,
),
(
"Minimum Count",
"Almost done! Please type how many emoji of the same type should be required "
"before a message gets reposted.",
wait_for_number(),
str,
),
(
"Old Messages",
"Finally, please type if messages that are older than now should be able to "
"get reposted. This may be undesirable e.g. if you are migrating from another "
"bot's starboard.",
wait_for_bool(),
str,
),
old_messages = await progress.input(
"Old Messages",
"Finally, please type if messages that are older than now should be able to get "
"reposted. This may be undesirable e.g. if you are migrating from another bot's "
"starboard.",
ProgressII.InputKindOption.BOOL,
old["post_age_limit"] if old else None,
)
if not "Old Messages" in results:
return
command = (
f"board{ctx.bot.suffix} update {channel.mention} {display(emoji)} {min_count} "
# "not" because the actual db value is if they _shouldn't_ be reposted
+ str(not old_messages)
)
command = (
f"board{ctx.bot.suffix} update {results['Channel'].mention} "
f"{display_emoji(results['Emoji'])} {results['Minimum Count']} "
+ str(not results["Old Messages"])
)
await progress.confirm_execute(command)
await progress.confirm_execute(command)
except CanceledError:
pass
@commands.command(parent=wizard)
async def kiosk(self, ctx: Blimp.Context):
@ -137,149 +98,130 @@ class Wizard(Blimp.Cog):
if not ctx.privileged_modify(ctx.guild):
raise Unauthorized()
progress = AutoProgress(
progress = ProgressII(
ctx,
"Create or Update a Kiosk",
ctx.bot.get_command("kiosk" + ctx.bot.suffix).help,
(
"Message",
"Please start by linking the message you want to edit as a Kiosk.",
wait_for_message_id(ctx),
lambda tup: "Fetching message…",
),
)
results = await progress.start()
if not results:
return
await progress.update()
try:
message = await message_id_to_message(ctx, results["Message"])
except discord.HTTPException:
progress.embed.color = ctx.Color.BAD
await progress.edit_last_stage("❌ Message", "Unknown message.", None)
return
message_link = await ctx.bot.represent_object({"m": results["Message"]})
await progress.edit_last_stage(None, message_link, None)
row = ctx.database.execute(
"SELECT * FROM rolekiosk_entries WHERE oid=:oid",
{"oid": ctx.objects.by_data(m=results["Message"])},
).fetchone()
role_pairs = []
if row:
await progress.edit_last_stage(
message = await progress.input(
"Message",
"Please start by linking the message you want to edit as a Kiosk.",
ProgressII.InputKindOption.MESSAGE,
None,
f"{message_link} — Kiosk configuration already exists, updating.",
False,
)
data = json.loads(row["data"])
results |= await progress.proceed(
(
cid_mid = [message.channel.id, message.id]
old = ctx.database.execute(
"SELECT * FROM rolekiosk_entries WHERE oid=:oid",
{"oid": ctx.objects.by_data(m=cid_mid)},
).fetchone()
data = None
if old:
data = json.loads(old["data"])
message_link = await ctx.bot.represent_object({"m": cid_mid})
progress.edit_last_field(
None,
f"{message_link} — "
+ (
"Kiosk configuration already exists, updating."
if old
else "No prior configuration, creating new Kiosk."
),
None,
)
role_pairs = []
if old:
do_append = await progress.input_choice(
"Append?",
f"The following {len(data)} reaction-role pairs already exist in this kiosk:\n"
+ ctx.bot.get_cog("Kiosk").render_emoji_pairs(data, " ")
+ "\nPlease type whether you want to `append` to this list or `overwrite` it.",
lambda string: (
(string == "append")
if string in ("append", "overwrite")
else None
),
str,
("append", "overwrite"),
"append",
)
)
if "Append?" in results:
if results["Append?"]:
if do_append == "append":
role_pairs.extend(data)
else:
return
else:
await progress.edit_last_stage(
None,
f"{message_link} — No prior configuration, creating new Kiosk.",
progress.add_field(
"➡️ Pending Configuration",
ctx.bot.get_cog("Kiosk").render_emoji_pairs(role_pairs, " ") or "",
False,
)
await progress.add_stage(
"➡️ Pending Configuration",
ctx.bot.get_cog("Kiosk").render_emoji_pairs(role_pairs, " ") or "",
)
await progress.update()
while True:
if len(role_pairs) == 20:
await ctx.reply(
"Discord doesn't support more than twenty reactions per message, no further "
"pairs will be accepted."
)
break
while True:
if len(role_pairs) == 20:
await ctx.reply(
"Discord doesn't support more than twenty reactions per message, no "
"further pairs will be accepted."
)
break
emoji_key = f"Emoji {len(role_pairs)+1}"
role_key = f"Role {len(role_pairs)+1}"
emoji_name = f"Emoji {len(role_pairs)+1}"
role_name = f"Role {len(role_pairs)+1}"
pair_dict = await progress.proceed(
(
emoji_key,
emoji = await progress.input(
emoji_name,
"Please type which emoji the Kiosk should offer for the next role.\n"
f"Type 'done{ctx.bot.suffix}' to confirm the above configuration and continue.",
wait_for_emoji,
display_emoji,
f"Type `done{ctx.bot.suffix}` to confirm the above configuration and continue.",
ProgressII.InputKindOption.EMOJI,
None,
)
)
if not emoji_key in pair_dict:
return
if f"done{ctx.bot.suffix}" in pair_dict.values():
break
pair_dict |= await progress.proceed(
(
role_key,
if emoji == f"done{ctx.bot.suffix}":
break
role = await progress.input(
role_name,
"Please type which role this emoji should grant users.",
wait_for_role(ctx),
lambda role: role.mention,
ProgressII.InputKindOption.ROLE,
None,
)
)
if not role_key in pair_dict:
return
progress.delete_last_stage()
progress.delete_last_stage()
progress.delete_last_field()
progress.delete_last_field()
if not ctx.privileged_modify(role):
await ctx.reply(
f"You can't assign {role.mention} yourself. Skipping.",
color=ctx.Color.BAD,
)
continue
if not ctx.privileged_modify(pair_dict[role_key]):
await ctx.reply(
f"You can't assign {pair_dict[role_key].mention} yourself. Skipping.",
color=ctx.Color.BAD,
if role >= ctx.guild.me.top_role:
await ctx.reply(
f"{role.mention} is above BLIMP's highest role and therefore "
"can't be used in Kiosks. Skipping.",
color=ctx.Color.BAD,
)
continue
role_pairs.append(
(display(emoji), role),
)
continue
if pair_dict[role_key] >= ctx.guild.me.top_role:
await ctx.reply(
f"{pair_dict[role_key].mention} is above BLIMP's highest role and therefore "
"can't be used in Kiosks. Skipping.",
color=ctx.Color.BAD,
progress.edit_last_field(
None,
ctx.bot.get_cog("Kiosk").render_emoji_pairs(role_pairs, " "),
False,
)
continue
role_pairs.append(
(display_emoji(pair_dict[emoji_key]), pair_dict[role_key]),
)
progress.delete_last_field()
progress.edit_last_field("✅ Pending Configuration", None, None)
await progress.edit_last_stage(
None,
ctx.bot.get_cog("Kiosk").render_emoji_pairs(role_pairs, " "),
False,
await progress.confirm_execute(
f"kiosk{ctx.bot.suffix} update {message.channel.id}-{message.id} "
+ (ctx.bot.get_cog("Kiosk").render_emoji_pairs(role_pairs, " "))
)
progress.delete_last_stage()
await progress.edit_last_stage("✅ Pending Configuration", None, None)
command = f"kiosk{ctx.bot.suffix} update {message.channel.id}-{message.id} " + (
ctx.bot.get_cog("Kiosk").render_emoji_pairs(role_pairs, " ")
)
await progress.confirm_execute(command)
except CanceledError:
pass
@commands.command(parent=wizard)
async def tickets(self, ctx: Blimp.Context):
@ -288,86 +230,150 @@ class Wizard(Blimp.Cog):
if not ctx.privileged_modify(ctx.guild):
raise Unauthorized()
progress = AutoProgress(
progress = ProgressII(
ctx,
"Create or Update a Ticket Category",
ctx.bot.get_command("ticket" + ctx.bot.suffix).help,
(
"Category",
"All Tickets are bound to a category and inherit permissions from it. Please type "
"in which category you want to create or modify Tickets configuration.",
wait_for_category(ctx),
lambda cat: cat.mention,
),
ctx.bot.get_command(f"ticket{ctx.bot.suffix}").help,
)
await progress.update()
try:
category = await progress.input(
"Category",
"All Tickets are bound to a category and inherit permissions and settings from it. "
"Please type which category you want to select for Tickets.",
ProgressII.InputKindOption.CATEGORY,
None,
)
results = await progress.start()
if not results:
return
old = ctx.database.execute(
"SELECT * FROM ticket_categories WHERE category_oid=:category_oid",
{"category_oid": ctx.objects.by_data(cc=category.id)},
).fetchone()
old_category = ctx.database.execute(
"SELECT * FROM ticket_categories WHERE category_oid=:category_oid",
{"category_oid": ctx.objects.by_data(cc=results["Category"].id)},
).fetchone()
if old_category:
await progress.edit_last_stage(
progress.edit_last_field(
None,
f"{results['Category'].mention} — Ticket configuration already exists, updating.",
f"{category.mention} — "
+ (
"Ticket configuration already exists, updating."
if old
else "No prior configuration, creating new Ticket category."
),
False,
)
results |= await progress.proceed(
(
"Last Ticket Number",
skip_category = False
if old:
skip_category = await progress.input(
"Skip To Classes",
"Do you want to skip editing the Category itself (this includes most settings) "
"and proceed to editing the Ticket classes offered instead?",
ProgressII.InputKindOption.BOOL,
True,
)
if not skip_category:
count = await progress.input(
"Most Recent Ticket Number",
"Please type which number the ticket opened most recently has. The next ticket "
+ "will have a number exactly one higher.\nCurrently set to "
+ str(old_category["count"])
+ ".",
wait_for_number(),
str,
),
(
"will have a number exactly one higher.",
ProgressII.InputKindOption.INTEGER,
old["count"] if old else 0,
)
transcript_channel = await progress.input(
"Transcript Channel",
"Please type the channel that transcripts should get posted into when a ticket "
"is deleted.\nCurrently they get posted into "
+ await ctx.bot.represent_object(
ctx.objects.by_oid(old_category["transcript_channel_oid"])
"is deleted.",
ProgressII.InputKindOption.CHANNEL,
ctx.bot.get_channel(
ctx.objects.by_oid(old["transcript_channel_oid"])["tc"]
)
+ ".",
wait_for_channel(ctx),
lambda channel: channel.mention,
),
(
"DM Transcripts",
if old
else None,
)
dm_transcript = await progress.input(
"DM Transcripts?",
"Please type if the transcripts should be sent to all users added to the "
+ "ticket in addition to being posted in the transcript channel.\nCurrently "
+ "this "
+ (
"is the case."
if old_category["dm_transcript"]
else "isn't the case."
),
wait_for_bool(),
str,
),
(
"Are Owners Staff?",
"Please type if the Ticket's owner should be able to perform staff actions, "
"i.e. adding and removing members as well as deleting the ticket.\nCurrently "
+ "this "
+ (
"is the case."
if old_category["can_creator_close"]
else "isn't the case."
),
wait_for_bool(),
str,
),
(
"ticket in addition to being posted in the transcript channel.",
ProgressII.InputKindOption.BOOL,
old["dm_transcript"] if old else None,
)
creator_staff = await progress.input(
"Are Creators Staff?",
"Please type if the Ticket's creator should be able to perform staff actions, "
"i.e. adding and removing members as well as deleting the ticket.",
ProgressII.InputKindOption.BOOL,
old["can_creator_close"] if old else None,
)
per_user_limit = await progress.input(
"Per-User Limit",
"Please type how many Tickets in this category non-staff users are allowed to "
+ "open at the same time. If you don't want to set a limit, type `unlimited`."
+ f"\nCurrently {old_category['per_user_limit'] or 'unlimited'}.",
lambda s: wait_for_number()(s) or (s if s == "unlimited" else None),
str,
),
)
"open at the same time. If you don't want to set a limit, type `-1`.",
ProgressII.InputKindOption.INTEGER,
(old["per_user_limit"] or -1) if old else None,
)
if per_user_limit == -1:
per_user_limit = ""
await progress.confirm_execute(
f"ticket{ctx.bot.suffix} updatecategory {category.id} {count} "
f"{transcript_channel.mention} {creator_staff} {dm_transcript} "
f"{per_user_limit}"
)
rows = ctx.database.execute(
"SELECT * FROM ticket_classes WHERE category_oid=:category_oid",
{"category_oid": ctx.objects.by_data(cc=category.id)},
).fetchall()
ticket_classes = {}
for row in rows:
ticket_classes[row["name"]] = row["description"]
while True:
progress.embed.color = ctx.Color.AUTOMATIC_BLUE
choice = await progress.input(
"Edit Class",
f"This Category has the following {len(ticket_classes)} classes:\n"
f"{', '.join([k for k, v in ticket_classes.items()])}\nPlease type which Class "
"you'd like to edit.\nCreate a new Class by typing a name that doesn't exist "
f"yet.\nTo exit this Wizard, type `done{ctx.bot.suffix}`.",
ProgressII.InputKindOption.STRING,
None,
)
if choice == f"done{ctx.bot.suffix}":
progress.delete_last_field()
if not ticket_classes:
await ctx.reply(
"You need at least one Class per Category. Please create one.",
color=ctx.Color.BAD,
)
continue
progress.embed.color = ctx.Color.GOOD
await progress.offer_cleanup()
return
ticket_classes[choice] = await progress.input(
f"Initial Message for {choice}",
"Please type which text BLIMP should post if a ticket in this category gets "
f"created. [Advanced message formatting]({ctx.bot.config['info']['manual']}"
"#advanced-message-formatting) is available.",
ProgressII.InputKindOption.STRING,
ticket_classes.get(choice),
)
progress.delete_last_field()
progress.delete_last_field()
await progress.confirm_execute(
f"ticket{ctx.bot.suffix} updateclass {category.id} {choice} "
f"{ticket_classes[choice]}"
)
except CanceledError:
pass

+ 5
- 0
blimp/customizations.py View File

@ -51,6 +51,11 @@ def maybe(
return instead
async def cid_mid_to_message(ctx: Blimp.Context, tup: Tuple) -> discord.Message:
channel = ctx.bot.get_channel(tup[0])
return await channel.fetch_message(tup[1])
class Blimp(commands.Bot):
"""
Instead of using a prefix like... normal bots, Blimp checks if the first


+ 307
- 217
blimp/progress.py View File

@ -1,6 +1,7 @@
import asyncio
import re
from typing import Any, Callable, Dict, Optional, Tuple, Union
from enum import Enum, auto
from typing import Any, Optional, Tuple, Union
import discord
from discord.ext import commands
@ -10,61 +11,192 @@ from .cogs.alias import (
find_aliased_channel_id,
find_aliased_message_id,
)
from .customizations import Blimp, maybe
from .customizations import Blimp, cid_mid_to_message, maybe
class CanceledError(RuntimeError):
"The not-quite error that the user has canceled a Wizard (or that it's timed out)."
def display(
what: Union[
str,
int,
bool,
discord.CategoryChannel,
discord.TextChannel,
discord.Message,
re.Match,
discord.Role,
tuple,
],
no_mentions: bool = False,
) -> str:
"""Display some object for the purposes of Wizards. If used for displaying embed field names,
no_mentions must be True because they don't support mentions. Most of the options for `what`
ought to be obvious; re.Match is intended to be the result of an emoji match from
ProgressII.InputKindOption.EMOJI.parse() while the 2-tuple is intended to be the result of a
message match from ProgressII.InputKindOption.MESSAGE.parse()."""
if isinstance(what, (discord.TextChannel, discord.Role)) and not no_mentions:
return what.mention
if isinstance(what, re.Match):
return what[0]
if isinstance(what, tuple) and len(what) == 2:
return "Fetching message…"
return str(what)
class ProgressII:
"Progress but better."
class InputKindOption(Enum):
"A pending input for ProgressII.input()."
STRING = auto()
INTEGER = auto()
BOOL = auto()
CATEGORY = auto()
CHANNEL = auto()
MESSAGE = auto()
EMOJI = auto()
ROLE = auto()
def parse( # pylint: disable=too-many-return-statements, too-many-branches
self, ctx: Blimp.Context, text: str
) -> Any:
"Return a meaningful object parsed from `text` based on self's kind or None."
if self == self.STRING:
return text
if self == self.INTEGER:
return maybe(lambda: int(text), ValueError, None)
if self == self.BOOL:
comp = text.casefold()
if comp in ("yes", "y", "true", "1", "#t", "oui"):
return True
if comp in ("no", "n", "false", "0", "-1", "#f"):
return False
return None
if self == self.CATEGORY:
aliased_cat = maybe(
lambda: find_aliased_category_id(ctx, text), commands.BadArgument
)
if aliased_cat:
return ctx.bot.get_channel(aliased_cat)
return discord.utils.find(
lambda c: text == str(c.id) or text == c.mention or text == c.name,
ctx.guild.categories,
)
if self == self.CHANNEL:
aliased_channel = maybe(
lambda: find_aliased_channel_id(ctx, text), commands.BadArgument
)
if aliased_channel:
return ctx.bot.get_channel(aliased_cat)
return discord.utils.find(
lambda c: text == str(c.id) or text == c.mention or text == c.name,
ctx.guild.channels,
)
if self == self.MESSAGE:
aliased_message = maybe(
lambda: find_aliased_message_id(ctx, text), commands.BadArgument
)
if aliased_message:
return aliased_message
link_or_shift_click_match = re.search(
r"(\d{15,21})[/-](\d{15,21})$", text
)
if link_or_shift_click_match:
return tuple(
[
int(link_or_shift_click_match[1]),
int(link_or_shift_click_match[2]),
]
)
just_id_match = re.search(r"(\d{15,21})$", text)
if just_id_match:
return tuple([ctx.channel.id, int(just_id_match[1])])
return None
if self == self.EMOJI:
return re.search(r"<a?:([^:]+):(\d+)>", text) or text
class Progress:
"""A Progress instance represents an in-progress interaction with the user based on a single
embedded message that updates as the user progresses through the "script" of the interaction."""
if self == self.ROLE:
return discord.utils.find(
lambda c: text == str(c.id) or text == c.mention or text == c.name,
ctx.guild.roles,
)
return None
def __init__(self, ctx: Blimp.Context, title: str, description: str):
self.ctx = ctx
self.message = None
self.input_messages = []
self.embed = discord.Embed(
color=ctx.Color.AUTOMATIC_BLUE,
title=title,
description="\n".join([f"> {line}" for line in description.splitlines()]),
).set_footer(
text="Times out five minutes after last input, cancel manually with the special "
+ "command 'cancel"
+ self.ctx.bot.suffix
+ "'. Replying too quickly can break the embed, so give yourself time."
description="\n".join([f"> {line}" for line in description.splitlines()])
+ "\n\n• Automatic timeout after five minutes without input"
+ f"\n• Cancel at any time using `cancel{ctx.bot.suffix}`"
+ f"\n• Accept default values using `ok{ctx.bot.suffix}`",
)
self.message = None
async def start(self):
"Begin the interaction by post the message."
self.message = await self.ctx.send(embed=self.embed)
@property
def fields(self):
"Shorthand for ProgressII.embed.fields"
return self.embed.fields
async def add_stage(self, name: str, value: str):
"Confirm that this part of the interaction has completed and add a new stage."
def add_field(self, name: str, value: str, inline: bool):
"Shorthand for ProgressII.embed.add_field()"
self.embed.add_field(name=name, value=value, inline=inline)
self.embed.add_field(name=name, value=value, inline=False)
await self.message.edit(embed=self.embed)
def delete_last_stage(self):
def delete_last_field(self):
"Removes the last field from self.fields."
self.embed.remove_field(len(self.embed.fields) - 1)
async def edit_last_stage(
def edit_last_field(
self, name: Optional[str], value: Optional[str], inline: Optional[bool]
):
"""Edit the last stage's field to e.g. signal completion. Passing `None` for any value means
"no change"."""
field = self.embed.fields[-1]
self.delete_last_stage()
self.embed.add_field(
name=name or field.name,
value=value or field.value,
inline=field.inline if inline is None else inline,
"Edit the last field. Any value provided as None is left unchanged from the original."
field = self.fields[len(self.fields) - 1]
self.delete_last_field()
self.add_field(
name=name if name is not None else field.name,
value=value if value is not None else field.value,
inline=inline if inline is not None else field.inline,
)
await self.message.edit(embed=self.embed)
async def wait_for(self, transformer: Callable):
"""Wait for messages from the current user in the current channel until one appears such
that if its content is processed with `transformer(msg.content)`, `transformer` returns a
value that's not None. That value will be returned. `transformer` should not have side
effects."""
async def update(self):
"Push our version of the embed to discord, creating a new message if it doesn't exist yet."
if self.message:
await self.message.edit(embed=self.embed)
else:
self.message = await self.ctx.send(embed=self.embed)
async def input(
self,
name: str,
description: str,
kind: InputKindOption,
default: Optional[Any] = None,
) -> Any:
"Accept a value from the user. Raises CanceledError if user cancels or input times out."
def predicate(msg: discord.Message):
if not (msg.channel == self.ctx.channel and msg.author == self.ctx.author):
@ -73,7 +205,26 @@ class Progress:
if msg.content == f"cancel{self.ctx.bot.suffix}":
return True
return transformer(msg.content) is not None
if default is not None and msg.content == f"ok{self.ctx.bot.suffix}":
return True
return kind.parse(self.ctx, msg.content) is not None
if default and kind == self.InputKindOption.BOOL:
default = bool(default)
self.embed.set_footer(text=discord.Embed.Empty)
if default is not None:
self.embed.set_footer(
text=f"↑ Default value, accept with 'ok{self.ctx.bot.suffix}'"
)
self.add_field(
f"➡️ {name}",
description + (f"\n\n```{default}\n```" if default is not None else ""),
False,
)
await self.update()
try:
message = await self.ctx.bot.wait_for(
@ -82,206 +233,145 @@ class Progress:
timeout=300.0,
)
self.input_messages.append(message)
if message.content == f"cancel{self.ctx.bot.suffix}":
self.embed.color = self.ctx.Color.BAD
await self.add_stage("❌ Canceled", "No further input will be accepted.")
return None
return transformer(message.content)
except asyncio.TimeoutError:
self.embed.color = self.ctx.Color.BAD
await self.add_stage("❌ Timeout", "No further input will be accepted.")
return None
async def confirm_execute(self, command: str):
"""A shorthand method that:
- Adds a stage that requests confirmation to execute `command`
- Executes `command` if the user inputs a true bool"""
await self.add_stage(
"➡️ Confirm",
f"**Confirm that you want this command issued in your name:**\n{command}",
)
do_it = await self.wait_for(wait_for_bool())
if do_it is None:
return
if do_it:
await self.ctx.invoke_command(command)
self.embed.color = self.ctx.Color.GOOD
await self.edit_last_stage("✅ Confirm", f"**Executed:**\n{command}", False)
else:
self.add_field(
"❌ Canceled", "No further input will be accepted.", False
)
await self.update()
raise CanceledError()
parsed = kind.parse(self.ctx, message.content)
if default is not None and message.content == f"ok{self.ctx.bot.suffix}":
parsed = kind.parse(self.ctx, display(default))
self.edit_last_field(f"✅ {name}", display(parsed), True)
if kind == self.InputKindOption.MESSAGE:
try:
message = await cid_mid_to_message(self.ctx, parsed)
message_link = await self.ctx.bot.represent_object({"m": parsed})
self.edit_last_field(None, message_link, None)
await self.update()
return message
except discord.HTTPException as ex:
self.embed.color = self.ctx.Color.BAD
self.edit_last_field(f"❌ {name}", "Unknown message.", None)
await self.update()
raise CanceledError() from ex
await self.update()
return parsed
except asyncio.TimeoutError as ex:
self.embed.color = self.ctx.Color.BAD
await self.edit_last_stage(
"❌ Confirm", f"**Didn't execute:**\n{command}", False
)
self.add_field("❌ Timeout", "No further input will be accepted.", False)
await self.update()
raise CanceledError() from ex
async def input_choice(
self, name: str, description: str, options: Tuple[str], default: Optional[str]
) -> str:
"""Similar to ProgressII.input(), but accept one of a set of String choices instead of
general data."""
class AutoProgress(Progress):
"""AutoProgress offers the same capabilities as `Progress` but also allows you to specify a dict
at creation time that's used in `start` to obtain initial values from the user without having to
write out the code yourself. The structure of the `steps` variadic argument is
```
proceed(
("Key To Show The User", "Description", transformer_for_wait_for, str),
("Next Key", "More Description", another_transformer, displayer)
)
```
Values are processed by `displayer` before being written into a completed input stage. A good
default for this is `str` for normal Python behavior.
`start` returns a dict of the collected values with the same keys as in `steps` or `None`, if
input fails."""
def __init__(self, ctx: Blimp.Context, title: str, description: str, *steps):
super(AutoProgress, self).__init__(ctx, title, description)
self.steps = steps
async def start(self) -> Optional[Dict[str, Any]]:
await super(AutoProgress, self).start()
return await self.proceed(*self.steps)
async def proceed(self, *steps) -> Optional[Dict[str, Any]]:
"Ask the user to input the `steps` in order of definition."
result = {}
for key, desc, transformer, displayer in steps:
await self.add_stage(f"➡️ {key}", desc)
user_input = await self.wait_for(transformer)
if user_input is None:
return {}
await self.edit_last_stage(
f"✅ {key}",
displayer(user_input),
True,
)
result[key] = user_input
def predicate(msg: discord.Message):
if not (msg.channel == self.ctx.channel and msg.author == self.ctx.author):
return False
return result
if msg.content == f"cancel{self.ctx.bot.suffix}":
return True
if default is not None and msg.content == f"ok{self.ctx.bot.suffix}":
return True
def wait_for_channel(
ctx: Blimp.Context,
) -> Callable[[str], Optional[discord.TextChannel]]:
"Returns a `Progress.wait_for` transformer that matches text channels."
return msg.content.casefold() in options
def impl(string: str) -> Optional[discord.TextChannel]:
aliased_channel = maybe(
lambda: find_aliased_channel_id(ctx, string), commands.BadArgument
)
if aliased_channel:
return ctx.bot.get_channel(aliased_channel)
self.embed.set_footer(text=discord.Embed.Empty)
if default is not None:
self.embed.set_footer(
text=f"↑ Default value, accept with 'ok{self.ctx.bot.suffix}'"
)
return discord.utils.find(
lambda c: string == str(c.id) or string == c.mention or string == c.name,
ctx.guild.channels,
self.add_field(
f"➡️ {name}",
description + (f"\n\n```{default}\n```" if default is not None else ""),
False,
)
await self.update()
return impl
try:
message = await self.ctx.bot.wait_for(
"message",
check=predicate,
timeout=300.0,
)
def wait_for_category(
ctx: Blimp.Context,
) -> Callable[[str], Optional[discord.CategoryChannel]]:
"Returns a `Progress.wait_for` transformer that matches category channels."
self.input_messages.append(message)
def impl(string: str) -> Optional[discord.CategoryChannel]:
aliased_cat = maybe(
lambda: find_aliased_category_id(ctx, string), commands.BadArgument
)
if aliased_cat:
return ctx.bot.get_channel(aliased_cat)
if message.content == f"cancel{self.ctx.bot.suffix}":
self.embed.color = self.ctx.Color.BAD
self.add_field(
"❌ Canceled", "No further input will be accepted.", False
)
await self.update()
raise CanceledError()
return discord.utils.find(
lambda c: string == str(c.id) or string == c.mention or string == c.name,
ctx.guild.categories,
)
option = message.content
return impl
if default is not None and message.content == f"ok{self.ctx.bot.suffix}":
option = default
self.edit_last_field(f"✅ {name}", option, True)
def wait_for_role(
ctx: Blimp.Context,
) -> Callable[[str], Optional[discord.Role]]:
"Returns a `Progress.wait_for` transformer that matches roles."
await self.update()
return option
def impl(string: str) -> Optional[discord.TextChannel]:
return discord.utils.find(
lambda c: string == str(c.id) or string == c.mention or string == c.name,
ctx.guild.roles,
except asyncio.TimeoutError as ex:
self.embed.color = self.ctx.Color.BAD
self.add_field("❌ Timeout", "No further input will be accepted.", False)
await self.update()
raise CanceledError() from ex
async def offer_cleanup(self):
"""Ask the user if they want to have all their messages accepted by the kiosk, since the
last offer to do so, deleted."""
do_cleanup = await self.input(
"Cleanup",
"Should all messages you typed for this Wizard be deleted?",
self.InputKindOption.BOOL,
True,
)
if do_cleanup:
with self.ctx.typing():
for message in self.input_messages:
await message.delete()
self.input_messages = []
self.delete_last_field()
await self.update()
return impl
def wait_for_message_id(
ctx: Blimp.Context,
) -> Callable[[str], Optional[Tuple[int, int]]]:
"Returns a `Progress.wait_for` transformer that matches things that look like message IDs."
async def confirm_execute(self, command: str):
"Ask the user if they want to execute a `command` and do so if they agree."
def impl(string: str) -> Optional[Tuple[int, int]]:
aliased_message = maybe(
lambda: find_aliased_message_id(ctx, string), commands.BadArgument
dewit = await self.input(
"Confirm",
f"**Do you want this command executed in your name?**\n{command}",
self.InputKindOption.BOOL,
True,
)
if aliased_message:
return aliased_message
link_or_shift_click_match = re.search(r"(\d{15,21})[/-](\d{15,21})$", string)
if link_or_shift_click_match:
return tuple(
[int(link_or_shift_click_match[1]), int(link_or_shift_click_match[2])]
)
just_id_match = re.search(r"(\d{15,21})$", string)
if just_id_match:
return tuple([ctx.channel.id, int(just_id_match[1])])
return None
return impl
async def message_id_to_message(
ctx: Blimp.Context, tup: Tuple[int, int]
) -> discord.Message:
"""Tries to turn a (channelid, messageid) tuple as returned by wait_for_message_id into a
Message object."""
return await ctx.bot.get_channel(tup[0]).fetch_message(tup[1])
def wait_for_number() -> Callable[[str], Optional[int]]:
"Returns a `Progress.wait_for` transformer that matches numbers."
def impl(string: str) -> Optional[int]:
return maybe(lambda: int(string), ValueError)
return impl
def wait_for_bool() -> Callable[[str], Optional[int]]:
"""Returns a `Progress.wait_for` transformer that matches booleans (somewhat naturally expressed
ones too)."""
def impl(string: str) -> Optional[bool]:
comp = string.casefold()
if comp in ("yes", "y", "true", "1", "#t", "oui"):
return True
if comp in ("no", "n", "false", "0", "-1", "#f"):
return False
return impl
def wait_for_emoji(string: str) -> Union[re.Match, str]:
"""`Progress.wait_for` transformer for 'parsing' emoji (everything passes the test since we
can't actually verify if they work)"""
return re.search(r"<a?:([^:]+):(\d+)>", string) or string
def display_emoji(result: Union[re.Match, str]) -> str:
"Try to display an emoji found by `wait_for_emoji`"
if dewit:
with self.ctx.typing():
await self.ctx.invoke_command(command)
self.embed.color = self.ctx.Color.GOOD
self.edit_last_field(None, f"**Executed:**\n{command}", False)
await self.update()
else:
self.embed.color = self.ctx.Color.BAD
self.edit_last_field("❌ Confirm", f"**Didn't Execute:**\n{command}", False)
return result[0] if isinstance(result, re.Match) else str(result)
await self.offer_cleanup()

+ 1
- 1
pyproject.toml View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "blimp"
version = "1.4.4"
version = "1.5.0"
description = "The BLIMP Levitating Intercommunication Management Programme, a Discord bot."
authors = ["Cassidy Dingenskirchen <admin@15318.de>"]
license = "AGPL-3.0"


Loading…
Cancel
Save