base-discord-bot/cogs/activities.py

207 lines
7.8 KiB
Python

import datetime
import discord
from discord.ext import commands
import os
import pathlib
import sqlite3
class Activities(commands.Cog):
"""Related commands."""
__slots__ = ("nerd", "nerds", "fword", "fwords")
def __init__(self, bot):
self.bot = bot
# Initialize the databse
self.db_path = pathlib.Path(os.getenv('DB_PATH', 'activities.db'))
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_activity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
before_activity_type TEXT,
before_activity_name TEXT,
after_activity_type TEXT,
after_activity_name TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
async def __local_check(self, ctx):
"""A local check which applies to all commands in this cog."""
if not ctx.guild:
raise commands.NoPrivateMessage
return True
async def __error(self, ctx, error):
"""A local error handler for all errors arising from commands in this cog."""
if isinstance(error, commands.NoPrivateMessage):
try:
return await ctx.send('This command can not be used in Private Messages.')
except discord.HTTPException:
pass
print('Ignoring exception in command {}:'.format(ctx.command), file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
def log_activity(self,
user_id: int,
before_activity_type: str = None,
before_activity_name: str = None,
after_activity_type: str = None,
after_activity_name: str = None):
"""Log an activity into the activities database for stats."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute("""
INSERT INTO user_activity (user_id, before_activity_type,
before_activity_name, after_activity_type, after_activity_name)
VALUES (?, ?, ?, ?, ?)
""", (user_id, before_activity_type, before_activity_name,
after_activity_type, after_activity_name))
conn.commit()
except sqlite3.Error as e:
print(f"Database error: {e}")
finally:
conn.close()
@commands.Cog.listener()
async def on_presence_update(self, before: discord.Member, after: discord.Member):
# Ignore changes to any bots
if after.bot:
return
if isinstance(before.activity, discord.Game):
before_type = "game"
before_name = before.activity.name
elif isinstance(before.activity, discord.Spotify):
before_type = "spotify"
before_name = before.activity.artist
elif isinstance(before.activity, discord.Activity):
before_type = "activity"
before_name = before.activity.name
else:
before_type = "other"
before_name = "other"
if isinstance(after.activity, discord.Game):
after_type = "game"
after_name = after.activity.name
elif isinstance(after.activity, discord.Spotify):
after_type = "spotify"
after_name = after.activity.artist
elif isinstance(after.activity, discord.Activity):
after_type = "activity"
after_name = after.activity.name
else:
after_type = "other"
after_name = "other"
self.log_activity(
user_id=before.id,
before_activity_type=before_type,
before_activity_name=before_name,
after_activity_type=after_type,
after_activity_name=after_name
)
@commands.command(name='nerd', aliases=['nerdscale'],
description="Find how nerdy a user is.")
async def nerd_(self, ctx, member: discord.Member = None):
"""Checks the play history of all users in the Discord server, and
based on how many hours they have in League of Legends compared to
other users, declare how nerdy they are."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Start by getting all unique user IDs
if member:
user_ids = [member.id]
else:
user_ids = []
cursor.execute("SELECT DISTINCT user_id FROM user_activity")
user_ids = [int(r[0]) for r in cursor.fetchall()]
# For each user, compute timedelta for time playing League
league_stats = {}
for user_id in user_ids:
# Get all entries that mention league
cursor.execute("""
SELECT
before_activity_name,
after_activity_name,
timestamp
FROM
user_activity
WHERE
user_id = (?) AND
timestamp > datetime('now', '-1 month')
""", (user_id,))
league_activities = cursor.fetchall()
# Sort by timestamp
league_activities = sorted(league_activities, key=lambda x: x[2])
# Compare each status change to find each time playing League
total_time = datetime.timedelta()
for first, second in zip(league_activities, league_activities[1:]):
if "league of legends" in first[1].lower().strip() and \
"league of legends" in second[0].lower().strip():
total_time += \
datetime.datetime.fromisoformat(second[2]) - \
datetime.datetime.fromisoformat(first[2])
# Save total time in League
league_stats[user_id] = total_time
# Sort all users by time in League
league_stats = dict(sorted(
league_stats.items(), key=lambda x: x[1],
reverse=True
))
# Print all stats for testing
print("League Stats:")
for user_id, time in league_stats.items():
print(user_id, ":", str(time))
# Get top user
user_id, time = next(iter(league_stats.items()))
time_val = None
time_units = None
if time.total_seconds() // 3600 > 0:
time_val = int(time.total_seconds() // 3600)
time_units = "hours" if time_val > 1 else "hour"
else:
time_val = int(time.total_seconds() // 60)
time_units = "minutes" if time_val > 1 else "minute"
# Send Discord message to clown on user
response = ""
if member:
if time_val != 0:
descriptor = ""
if time_units == "hours":
descriptor = "a massive fucking nerd"
elif time_units == "hour":
descriptor = "a huge nerd"
else:
descriptor = "a nerd"
response = f"<@{user_id}> has played League for {time_val} "\
f"{time_units} in the past month, making them "\
f"{descriptor}."
else:
response = f"<@{user_id}> doesn't have any time in League. "\
f"They're not a nerd."
else:
response = (
f"<@{user_id}> has played League for {time_val} {time_units} "\
f"in the past month, making them the biggest nerd."
)
await ctx.send(response)
async def setup(bot):
await bot.add_cog(Activities(bot))