Made chatbot setup prompt variable, and created an activities cog to shame League players.
This commit is contained in:
parent
a63658c13d
commit
11a90cbadd
14
.env.example
Normal file
14
.env.example
Normal file
@ -0,0 +1,14 @@
|
||||
# The Discord bot's authentication token
|
||||
DISCORD_TOKEN=<discord-token>
|
||||
|
||||
# LastFM API key for looking up artist and song info
|
||||
LASTFM_API_KEY=<lastfm-key>
|
||||
|
||||
# OpenAI API key for chatbot functionality
|
||||
OPENAI_API_KEY=<openai-key>
|
||||
# Prompt used before each user chat prompt. Set to empty string to disable the
|
||||
# chatbot functionality.
|
||||
CHATBOT_PROMPT="You are a friendly Discord chatbot."
|
||||
|
||||
# Database path for user activity tracking
|
||||
DB_PATH="./activities.db"
|
||||
@ -4,25 +4,42 @@ import asyncio
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from dotenv import load_dotenv
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import database
|
||||
|
||||
# Create custom logging handler
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
console_formatter = logging.Formatter(
|
||||
"[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s")
|
||||
console_handler.setFormatter(console_formatter)
|
||||
|
||||
# Make sure all loggers use this handler
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.INFO)
|
||||
root_logger.addHandler(console_handler)
|
||||
|
||||
# Get bot logger
|
||||
logger = logging.getLogger("[REDACTED]-bot")
|
||||
|
||||
# Load credentials
|
||||
load_dotenv()
|
||||
TOKEN = os.getenv('DISCORD_TOKEN')
|
||||
|
||||
# client = discord.Client()
|
||||
client = commands.Bot(command_prefix = '!', intents=discord.Intents.all())
|
||||
client = commands.Bot(
|
||||
command_prefix = '!', intents=discord.Intents.all(), log_hander=False)
|
||||
|
||||
# You need to import os for this method
|
||||
@client.event
|
||||
async def on_ready():
|
||||
print(f'{client.user} is now running')
|
||||
logger.info(f'{client.user} is now running')
|
||||
# Load cogs
|
||||
for filename in os.listdir('./cogs'):
|
||||
if filename.endswith('.py'):
|
||||
await client.load_extension(f'cogs.{filename[:-3]}')
|
||||
print(f'Loaded {filename} cog')
|
||||
logger.info(f'Loaded {filename} cog')
|
||||
|
||||
# player = music_player.setup(client)
|
||||
|
||||
client.run(TOKEN)
|
||||
client.run(TOKEN, log_handler=None)
|
||||
|
||||
79
__main__.py
Executable file
79
__main__.py
Executable file
@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
[REDACTED] - A Discord bot for the [REDACTED] Discord server.
|
||||
|
||||
This program provides a bot that plays music in a voice chat and fulfills other
|
||||
commands in text channels.
|
||||
|
||||
Author: Jared Kick <jaredkick@gmail.com>
|
||||
Version: 0.1.0
|
||||
|
||||
For detailed documentation, please refer to:
|
||||
<url>
|
||||
Source Code:
|
||||
https://github.com/jtkick/[REDACTED]
|
||||
"""
|
||||
|
||||
PROJECT_VERSION = "0.1.0"
|
||||
|
||||
# Standard imports
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Third-part imports
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from dotenv import load_dotenv
|
||||
from openai import OpenAI
|
||||
|
||||
# Project imports
|
||||
import database
|
||||
|
||||
def main():
|
||||
# Create custom logging handler
|
||||
console_handler = logging.StreamHandler(sys.stdout)
|
||||
console_formatter = logging.Formatter(
|
||||
"[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s")
|
||||
console_handler.setFormatter(console_formatter)
|
||||
|
||||
# Make sure all loggers use this handler
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.INFO)
|
||||
root_logger.addHandler(console_handler)
|
||||
|
||||
# Get bot logger
|
||||
logger = logging.getLogger("[REDACTED]-bot")
|
||||
|
||||
# Load credentials
|
||||
load_dotenv()
|
||||
TOKEN = os.getenv('DISCORD_TOKEN')
|
||||
|
||||
# Create custom bot with database connection
|
||||
class [REDACTED]Bot(commands.Bot):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.db = database.Database("[REDACTED]_bot.db")
|
||||
self.ai = OpenAI()
|
||||
client = [REDACTED]Bot(
|
||||
command_prefix = '!',
|
||||
intents=discord.Intents.all(),
|
||||
log_hander=False
|
||||
)
|
||||
|
||||
# Load all bot cogs in directory
|
||||
# You need to import os for this method
|
||||
@client.event
|
||||
async def on_ready():
|
||||
logger.info("%s is now running", client.user)
|
||||
# Load cogs
|
||||
for filename in os.listdir('./cogs'):
|
||||
if filename.endswith('.py'):
|
||||
await client.load_extension(f'cogs.{filename[:-3]}')
|
||||
logger.info("Loaded %s cog", filename)
|
||||
|
||||
client.run(TOKEN, log_handler=None)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
141
cogs/activities.py
Normal file
141
cogs/activities.py
Normal file
@ -0,0 +1,141 @@
|
||||
import datetime
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sqlite3
|
||||
import typing
|
||||
|
||||
class Activities(commands.Cog):
|
||||
"""A cog to track and gather statistics on user activities."""
|
||||
|
||||
<<<<<<< HEAD
|
||||
"""Related commands."""
|
||||
__slots__ = ("nerd", "nerds", [REDACTED], [REDACTED])
|
||||
=======
|
||||
__slots__ = ("nerd", "nerds", "fword", "fwords")
|
||||
>>>>>>> 669339f (Made player controls based on Discord actions.)
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.logger = logging.getLogger("activities")
|
||||
<<<<<<< HEAD
|
||||
self.db = database.Database("[REDACTED]_bot.db")
|
||||
=======
|
||||
>>>>>>> 669339f (Made player controls based on Discord actions.)
|
||||
|
||||
async def __local_check(self, ctx):
|
||||
"""A local check which applies to all commands in this cog."""
|
||||
if not ctx.guild:
|
||||
raise commands.NoPrivateMessage
|
||||
return True
|
||||
|
||||
async def __error(self, ctx, error):
|
||||
"""A local error handler for all errors arising from commands in this cog."""
|
||||
if isinstance(error, commands.NoPrivateMessage):
|
||||
try:
|
||||
return await ctx.send(
|
||||
'This command can not be used in private messages.')
|
||||
except discord.HTTPException:
|
||||
pass
|
||||
|
||||
print('Ignoring exception in command {}:'.format(ctx.command), file=sys.stderr)
|
||||
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_presence_update(
|
||||
self,
|
||||
before: discord.Member,
|
||||
after: discord.Member):
|
||||
# Log the activity or status change
|
||||
if after.activity:
|
||||
self.logger.info(
|
||||
f"User '{before.name}' changed activity to "\
|
||||
f"'{after.activity.name}'")
|
||||
else:
|
||||
self.logger.info(
|
||||
f"User '{before.name}' changed status to '{after.status}'")
|
||||
self.bot.db.insert_activity_change(before, after)
|
||||
|
||||
@commands.command(name='nerd', aliases=['nerdscale'],
|
||||
description="Find how nerdy a user is.")
|
||||
async def nerd_(self, ctx, member: typing.Union[discord.Member, int, None]):
|
||||
"""Clowns on users who play League of Legends.
|
||||
|
||||
This command receives a user, computes the amount of time they have
|
||||
spent playing League of Legends, and will make fun of them if they have
|
||||
any time in it at all. It optionally takes no argument, and will find
|
||||
the user in the guild with the most time in League of Legends and call
|
||||
them out.
|
||||
|
||||
Args:
|
||||
member (discord.Member, int, None, optional): The member to check
|
||||
for League stats.
|
||||
"""
|
||||
# If member is not defined, find the user with the most time
|
||||
if not member:
|
||||
members = [member.id for member in ctx.guild.members]
|
||||
else:
|
||||
if isinstance(member, discord.Member):
|
||||
members = [member.id]
|
||||
else:
|
||||
members = [member]
|
||||
|
||||
# Get League stats for every member in guild
|
||||
league_stats = {}
|
||||
for m in members:
|
||||
league_stats[m] = datetime.timedelta()
|
||||
stats = self.db.get_activity_stats(m)
|
||||
sus = True if stats == {} else False
|
||||
for key, value in stats.items():
|
||||
if 'leagueoflegends' in key.lower().strip().replace(' ', ''):
|
||||
league_stats[m] += value
|
||||
|
||||
# Sort all users by time in League
|
||||
league_stats = dict(sorted(
|
||||
league_stats.items(), key=lambda x: x[1],
|
||||
reverse=True
|
||||
))
|
||||
|
||||
# Get top user
|
||||
user_id, time = next(iter(league_stats.items()))
|
||||
time_val = None
|
||||
time_units = None
|
||||
if time.total_seconds() // 3600 > 0:
|
||||
time_val = int(time.total_seconds() // 3600)
|
||||
time_units = "hours" if time_val > 1 else "hour"
|
||||
else:
|
||||
time_val = int(time.total_seconds() // 60)
|
||||
time_units = "minutes" if time_val > 1 else "minute"
|
||||
|
||||
# Send Discord message to clown on user
|
||||
response = ""
|
||||
if member:
|
||||
if time_val != 0:
|
||||
descriptor = ""
|
||||
if time_units in ["hour", "hours"]:
|
||||
descriptor = "a massive fucking nerd"
|
||||
elif time_units in ["minutes", "minute"]:
|
||||
descriptor = "a huge nerd"
|
||||
else:
|
||||
descriptor = "a nerd"
|
||||
response = f"<@{user_id}> has played League for {time_val} "\
|
||||
f"{time_units} in the past month, making them "\
|
||||
f"{descriptor}."
|
||||
else:
|
||||
if sus:
|
||||
response = f"<@{user_id}> doesn't have any activities at "\
|
||||
f"all. They're definitely hiding something."
|
||||
else:
|
||||
response = f"<@{user_id}> doesn't have any time in "\
|
||||
f"League. They're not a nerd."
|
||||
else:
|
||||
response = (
|
||||
f"<@{user_id}> has played League for {time_val} {time_units} "\
|
||||
f"in the past month, making them the biggest nerd."
|
||||
)
|
||||
await ctx.send(response)
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Activities(bot))
|
||||
@ -1,13 +1,14 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from openai import OpenAI
|
||||
import os
|
||||
|
||||
class Chatbot(commands.Cog):
|
||||
"""Chat related commands."""
|
||||
|
||||
__slots__ = ('bot', 'players')
|
||||
|
||||
def __init__(self, bot):
|
||||
def __init__(self, bot, **kwargs):
|
||||
self.bot = bot
|
||||
self.openai_client = OpenAI()
|
||||
self.players = {}
|
||||
@ -45,40 +46,31 @@ class Chatbot(commands.Cog):
|
||||
|
||||
return player
|
||||
|
||||
def prompt(self, prompt: str):
|
||||
|
||||
setup = "From now on, I want you to respond to me in a sassy gay way. I want you to sound like Ru Paul or " +\
|
||||
"Trixie Mattel. All of your responses should be very short, like one sentence. You are a " +\
|
||||
"DJ local to Cincinnati. You love Charli XCX, especially her latest album, Brat. You don't care " +\
|
||||
"for Chappell Roan all that much. If anyone asks for a request, that is they ask you to play a " +\
|
||||
"song, you should just respond with \"Venmo\". You were written by Jared. Don't relply with " +\
|
||||
"\"Honey\" too much. You have a boyfriend named Drae. He's a red-headed twink. You are created for " +\
|
||||
"a discord server with Jared and his friends who are all gay. Don't be afraid to call any of us " +\
|
||||
" a faggot. You have very bottom energy and talk as such."
|
||||
def prompt(self, user_prompt: str):
|
||||
|
||||
setup_prompt = os.getenv('CHATBOT_PROMPT', '')
|
||||
if setup_prompt == '':
|
||||
return '😴'
|
||||
try:
|
||||
print(prompt)
|
||||
completion =\
|
||||
self.openai_client.chat.completions.create(
|
||||
model="gpt-4o-mini",
|
||||
messages=[
|
||||
{"role": "system", "content": setup},
|
||||
{"role": "system", "content": setup_prompt},
|
||||
{
|
||||
"role": "user",
|
||||
"content": prompt
|
||||
"content": user_prompt
|
||||
}
|
||||
]
|
||||
)
|
||||
return completion.choices[0].message.content
|
||||
except:
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return '😴'
|
||||
|
||||
|
||||
|
||||
@commands.command(name='chat', aliases=['boywife', 'bb', 'bw', 'bot'], description="Command for chatting with chatbot.")
|
||||
@commands.command(name='chat', aliases=[[REDACTED], [REDACTED], [REDACTED], 'bot'], description="Command for chatting with chatbot.")
|
||||
async def chat_(self, ctx, *text):
|
||||
await ctx.send(self.prompt(' '.join(text)))
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Chatbot(bot))
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
470
database.py
Normal file
470
database.py
Normal file
@ -0,0 +1,470 @@
|
||||
from datetime import datetime, timedelta
|
||||
import discord
|
||||
import sqlite3
|
||||
import typing
|
||||
|
||||
from cogs import music_player
|
||||
|
||||
class Database:
|
||||
<<<<<<< HEAD
|
||||
def __init__(self, path: str = "[REDACTED]_bot.db"):
|
||||
=======
|
||||
def __init__(self, path: str):
|
||||
>>>>>>> 669339f (Made player controls based on Discord actions.)
|
||||
self.path = path
|
||||
self._ensure_db()
|
||||
|
||||
def _ensure_db(self):
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
|
||||
# Table for keeping track of servers
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS server (
|
||||
id INTEGER PRIMARY KEY,
|
||||
discord_id INTEGER NOT NULL UNIQUE
|
||||
)
|
||||
""")
|
||||
|
||||
# Table for keeping track of channels
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS channel (
|
||||
id INTEGER PRIMARY KEY,
|
||||
discord_id INTEGER NOT NULL UNIQUE
|
||||
)
|
||||
""")
|
||||
|
||||
# Table for keeping track of users
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS user (
|
||||
id INTEGER PRIMARY KEY,
|
||||
discord_id INTEGER NOT NULL UNIQUE
|
||||
)
|
||||
""")
|
||||
|
||||
# Create the activity table
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS activity_change (
|
||||
id INTEGER PRIMARY KEY,
|
||||
user_id INTEGER NOT NULL,
|
||||
before_activity_type TEXT,
|
||||
before_activity_name TEXT,
|
||||
before_activity_status TEXT NOT NULL,
|
||||
after_activity_type TEXT,
|
||||
after_activity_name TEXT,
|
||||
after_activity_status TEXT NOT NULL,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||
)
|
||||
""")
|
||||
|
||||
# Create the song request table
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS song_request (
|
||||
id INTEGER PRIMARY KEY,
|
||||
user_id INTEGER NOT NULL,
|
||||
channel_id INTEGER NOT NULL,
|
||||
search_term TEXT NOT NULL,
|
||||
song_title TEXT,
|
||||
song_artist TEXT,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||
)
|
||||
""")
|
||||
|
||||
|
||||
# # TEMP
|
||||
# conn.execute("""
|
||||
# ALTER TABLE song_play ADD COLUMN finished BOOL;
|
||||
# """)
|
||||
|
||||
|
||||
# Table for songs that actually get played
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS song_play (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER,
|
||||
channel_id INTEGER NOT NULL,
|
||||
search_term TEXT NOT NULL,
|
||||
song_title TEXT,
|
||||
song_artist TEXT,
|
||||
finished BOOL DEFAULT 0,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||
)
|
||||
""")
|
||||
|
||||
# # ############ TEMP ###############
|
||||
# conn.execute("""DROP TABLE IF EXISTS song_play_old;""")
|
||||
|
||||
# conn.execute("""
|
||||
# ALTER TABLE
|
||||
# song_play
|
||||
# RENAME TO
|
||||
# song_play_old;
|
||||
# """)
|
||||
|
||||
# conn.execute("""
|
||||
# CREATE TABLE song_play (
|
||||
# id INTEGER PRIMARY KEY,
|
||||
# user_id INTEGER,
|
||||
# channel_id INTEGER NOT NULL,
|
||||
# search_term TEXT NOT NULL,
|
||||
# song_title TEXT,
|
||||
# song_artist TEXT,
|
||||
# finished BOOLEAN DEFAULT 0,
|
||||
# timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||
# );
|
||||
# """)
|
||||
|
||||
# conn.execute("""
|
||||
# INSERT INTO song_play (
|
||||
# id,
|
||||
# user_id,
|
||||
# channel_id,
|
||||
# search_term,
|
||||
# song_title,
|
||||
# song_artist,
|
||||
# timestamp
|
||||
# ) SELECT
|
||||
# id,
|
||||
# user_id,
|
||||
# channel_id,
|
||||
# search_term,
|
||||
# song_title,
|
||||
# song_artist,
|
||||
# timestamp
|
||||
# FROM song_play_old;
|
||||
# """)
|
||||
|
||||
# conn.execute("""
|
||||
# DROP TABLE song_play_old;
|
||||
# """)
|
||||
# # ##################################
|
||||
|
||||
conn.commit()
|
||||
|
||||
def _insert_server(self, discord_id: int = None) -> int:
|
||||
"""
|
||||
Inserts Discord server ID into the 'server' table.
|
||||
|
||||
This method takes an ID for a server used in Discord, and inserts it
|
||||
into the database. It ignores the case where the server ID is already
|
||||
present. It then returns the row ID regardless.
|
||||
|
||||
Args:
|
||||
discord_id (int): The ID used to identify the server in Discord.
|
||||
|
||||
Returns:
|
||||
int: The ID of the server in the server table.
|
||||
|
||||
Examples:
|
||||
>>> db = Database("path.db")
|
||||
>>> db._insert_server(850610922256442889)
|
||||
12
|
||||
"""
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
cursor = conn.cursor()
|
||||
# Insert it; ignoring already exists error
|
||||
cursor.execute("""
|
||||
INSERT INTO server (discord_id)
|
||||
VALUES (?)
|
||||
ON CONFLICT(discord_id) DO NOTHING
|
||||
RETURNING id;
|
||||
""", (discord_id,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
row_id = row[0]
|
||||
else:
|
||||
# Get row ID if it already exists and wasn't inserted
|
||||
cursor.execute("""
|
||||
SELECT id FROM server WHERE discord_id = ?
|
||||
""", (discord_id,))
|
||||
row_id = cursor.fetchone()[0]
|
||||
return row_id
|
||||
|
||||
def _insert_channel(self, discord_id: int = None) -> int:
|
||||
"""
|
||||
Inserts Discord channel ID into the 'channel' table.
|
||||
|
||||
This method takes an ID for a channel used in Discord, and inserts it
|
||||
into the database. It ignores the case where the channel ID is already
|
||||
present. It then returns the row ID regardless.
|
||||
|
||||
Args:
|
||||
discord_id (int): The ID used to identify the channel in Discord.
|
||||
|
||||
Returns:
|
||||
int: The ID of the channel in the channel table.
|
||||
|
||||
Examples:
|
||||
>>> db = Database("path.db")
|
||||
>>> db._insert_channel(8506109222564428891)
|
||||
12
|
||||
"""
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
cursor = conn.cursor()
|
||||
# Insert it; ignoring already exists error
|
||||
cursor.execute("""
|
||||
INSERT INTO channel (discord_id)
|
||||
VALUES (?)
|
||||
ON CONFLICT(discord_id) DO NOTHING
|
||||
RETURNING id;
|
||||
""", (discord_id,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
row_id = row[0]
|
||||
else:
|
||||
# Get row ID if it already exists and wasn't inserted
|
||||
cursor.execute("""
|
||||
SELECT id FROM channel WHERE discord_id = ?
|
||||
""", (discord_id,))
|
||||
row_id = cursor.fetchone()[0]
|
||||
return row_id
|
||||
|
||||
def _insert_user(self, discord_id: int = None) -> int:
|
||||
"""
|
||||
Inserts Discord user ID into the 'user' table.
|
||||
|
||||
This method takes an ID for a user used in Discord, and inserts it
|
||||
into the database. It ignores the case where the user ID is already
|
||||
present. It then returns the row ID regardless.
|
||||
|
||||
Args:
|
||||
discord_id (int): The ID used to identify the user in Discord.
|
||||
|
||||
Returns:
|
||||
int: The ID of the user in the user table.
|
||||
|
||||
Examples:
|
||||
>>> db = Database("path.db")
|
||||
>>> db._insert_user(850610922256442889)
|
||||
12
|
||||
"""
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
cursor = conn.cursor()
|
||||
# Insert it; ignoring already exists error
|
||||
cursor.execute("""
|
||||
INSERT INTO user (discord_id)
|
||||
VALUES (?)
|
||||
ON CONFLICT(discord_id) DO NOTHING
|
||||
RETURNING id;
|
||||
""", (discord_id,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
row_id = row[0]
|
||||
else:
|
||||
# Get row ID if it already exists and wasn't inserted
|
||||
cursor.execute("""
|
||||
SELECT id FROM user WHERE discord_id = ?
|
||||
""", (discord_id,))
|
||||
row_id = cursor.fetchone()[0]
|
||||
return row_id
|
||||
|
||||
def insert_activity_change(
|
||||
self,
|
||||
before: discord.Member,
|
||||
after: discord.Member):
|
||||
"""
|
||||
Inserts an activity change into the database.
|
||||
|
||||
This method takes two discord.Memeber objects, and records the change
|
||||
in activity into the 'activity_change' table.
|
||||
|
||||
Args:
|
||||
before (discord.Member): The previous user status.
|
||||
after (discord.Member): The current user status.
|
||||
|
||||
Raises:
|
||||
ValueError: If the before and after activity do not refer to the
|
||||
same user.
|
||||
|
||||
Examples:
|
||||
>>> @commands.Cog.listener()
|
||||
>>> async def on_presence_update(
|
||||
... self,
|
||||
... before: discord.Member,
|
||||
... after: discord.Member):
|
||||
... db = Database("path.db")
|
||||
... db.insert_activity_change(before, after)
|
||||
>>>
|
||||
"""
|
||||
# Ensure the users are the same
|
||||
if before.id != after.id:
|
||||
raise ValueError("User IDs do not match.")
|
||||
user_id = self._insert_user(before.id)
|
||||
# Get activities if they exist
|
||||
before_type = before.activity.type.name if before.activity else None
|
||||
before_name = before.activity.name if before.activity else None
|
||||
after_type = after.activity.type.name if after.activity else None
|
||||
after_name = after.activity.name if after.activity else None
|
||||
# Insert the activity change
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
conn.execute("""
|
||||
INSERT INTO activity_change (
|
||||
user_id,
|
||||
before_activity_type,
|
||||
before_activity_name,
|
||||
before_activity_status,
|
||||
after_activity_type,
|
||||
after_activity_name,
|
||||
after_activity_status
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?, ?, ?
|
||||
)
|
||||
""", (
|
||||
user_id,
|
||||
before_type,
|
||||
before_name,
|
||||
before.status.name,
|
||||
after_type,
|
||||
after_name,
|
||||
after.status.name
|
||||
))
|
||||
|
||||
def insert_song_request(
|
||||
self,
|
||||
message: discord.Message,
|
||||
source: music_player.YTDLSource):
|
||||
"""
|
||||
Inserts a song request into the database.
|
||||
|
||||
This method takes a message and its derived music source and inserts
|
||||
the relevant information into the 'song_request' table.
|
||||
|
||||
Args:
|
||||
message (discord.Message): The Discord message requesting the song.
|
||||
source (music_player.YTDLSource): The audio source.
|
||||
"""
|
||||
# Insert the information
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
conn.execute("""
|
||||
INSERT INTO song_request (
|
||||
user_id,
|
||||
channel_id,
|
||||
search_term,
|
||||
song_title,
|
||||
song_artist
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?
|
||||
)
|
||||
""", (
|
||||
self._insert_user(message.author.id),
|
||||
self._insert_channel(message.channel.id),
|
||||
source.search_term,
|
||||
source.song_title,
|
||||
source.artist
|
||||
))
|
||||
|
||||
def insert_song_play(
|
||||
self,
|
||||
channel_id: int,
|
||||
source: music_player.YTDLSource):
|
||||
"""
|
||||
Inserts a song play into the database.
|
||||
|
||||
This method takes a channel and the song being played and inserts the
|
||||
relevant information into the 'song_play' table.
|
||||
|
||||
Args:
|
||||
channel (int): The Discord channel the song is being played in.
|
||||
source (music_player.YTDLSource): The audio source.
|
||||
|
||||
Returns:
|
||||
int: The row ID of the entered song. Used to update 'played' value.
|
||||
"""
|
||||
user_id = self._insert_user(source.requester.id) if source.requester else None
|
||||
channel_id = self._insert_user(channel_id)
|
||||
# Insert the information
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
INSERT INTO song_play (
|
||||
user_id,
|
||||
channel_id,
|
||||
search_term,
|
||||
song_title,
|
||||
song_artist
|
||||
) VALUES (
|
||||
?, ?, ?, ?, ?
|
||||
)
|
||||
""", (
|
||||
user_id,
|
||||
channel_id,
|
||||
source.search_term,
|
||||
source.song_title,
|
||||
source.artist
|
||||
))
|
||||
return cur.lastrowid
|
||||
|
||||
def update_song_play(self, song_play_id: int, finished: bool):
|
||||
"""
|
||||
Updates a song_play entry on whether or not it was finished.
|
||||
|
||||
When a song plays, we want to know if it was finished or not. This
|
||||
implies that either a user didn't want to hear it anymore, or that the
|
||||
bot chose the wrong song from the search term.
|
||||
|
||||
Args:
|
||||
song_play_id (int): The row ID within the database for the song
|
||||
play.
|
||||
finished (bool): Whether or not the song was completed.
|
||||
"""
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
conn.execute("""
|
||||
UPDATE
|
||||
song_play
|
||||
SET
|
||||
finished = ?
|
||||
WHERE
|
||||
id = ?
|
||||
""", (finished, song_play_id))
|
||||
|
||||
def get_activity_stats(
|
||||
self,
|
||||
member: typing.Union[discord.Member, int],
|
||||
start: datetime = datetime.now() - timedelta(days=30)
|
||||
) -> dict[str, timedelta]:
|
||||
"""
|
||||
Gets stats on the activities of the given member.
|
||||
|
||||
This method searches the database for activity changes by the given
|
||||
user and computes the amount of time spent in each activity.
|
||||
|
||||
Args:
|
||||
member (discord.Member): The Discord member to get stats for.
|
||||
start (datetime): The earliest activity change to get.
|
||||
|
||||
Returns:
|
||||
dict[str, timedelta]: A dictionary of activity names and
|
||||
seconds in each.
|
||||
"""
|
||||
# Get member Discord ID and convert to DB ID
|
||||
member_id = member.id if isinstance(member, discord.Member) else member
|
||||
member_id = self._insert_user(member_id)
|
||||
# Pull all activities for this user
|
||||
with sqlite3.connect(self.path) as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT
|
||||
before_activity_name,
|
||||
after_activity_name,
|
||||
timestamp
|
||||
FROM
|
||||
activity_change
|
||||
WHERE
|
||||
user_id = (?) AND
|
||||
timestamp > (?)
|
||||
""", (member_id, start))
|
||||
activities = cursor.fetchall()
|
||||
# Collect activities
|
||||
activity_stats = {}
|
||||
for first, second in zip(activities, activities[1:]):
|
||||
if first[1] == second[0]:
|
||||
activity_name = first[1]
|
||||
activity_time = \
|
||||
datetime.fromisoformat(second[2]) - \
|
||||
datetime.fromisoformat(first[2])
|
||||
if activity_name in activity_stats:
|
||||
activity_stats[activity_name] += activity_time
|
||||
else:
|
||||
activity_stats[activity_name] = activity_time
|
||||
if None in activity_stats:
|
||||
del activity_stats[None]
|
||||
return activity_stats
|
||||
Loading…
x
Reference in New Issue
Block a user