From 3b573a2878831bd00944f4c2ddd6db3c86110e09 Mon Sep 17 00:00:00 2001 From: Martin Szuc Date: Sun, 15 Dec 2024 23:39:45 +0100 Subject: [PATCH] chore: code cleanup --- CHANGELOG.md | 19 ++- src/cogs/help_cog.py | 85 +++++++---- src/communication/announcement.py | 57 ++++++-- src/database.py | 59 ++++++-- src/event_definitions.py | 57 ++++---- src/managers/event_manager.py | 173 ++++++++++++++++++----- src/managers/tts_manager.py | 92 +++++++++--- src/timer.py | 138 +++++++++++++----- src/timers/base.py | 84 ++++++++--- src/timers/glyph.py | 37 +++-- src/timers/mindful.py | 89 ++++++++---- src/timers/roshan.py | 58 ++++++-- src/timers/tormentor.py | 34 +++-- tests/communication/test_announcement.py | 2 + 14 files changed, 732 insertions(+), 252 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f322d56..47615ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,21 @@ -# [](/compare/v1.0.0...v) (2024-12-15) +# [](/compare/v1.1.0...v) (2024-12-15) + + +### Bug Fixes + +* python version 7248367 +* removed docker no-cache-dir a4ab127 +* TTS Messages in docker 5443994 + + +### Features + +* Queue for announcements 4be33d8 +* **test:** added multiple servers test case 707ec13 + + + +# [1.1.0](/compare/v1.0.0...v1.1.0) (2024-12-15) ### Bug Fixes diff --git a/src/cogs/help_cog.py b/src/cogs/help_cog.py index f92d40d..506c233 100644 --- a/src/cogs/help_cog.py +++ b/src/cogs/help_cog.py @@ -1,5 +1,3 @@ -# src/cogs/help_cog.py - import logging import discord @@ -9,25 +7,49 @@ class HelpCog(commands.Cog): - """A Cog for handling the help command.""" + """ + A Cog for handling the help command, providing users with information about available bot commands. + """ + + def __init__(self, bot: commands.Bot): + """ + Initialize the HelpCog with the bot instance. - def __init__(self, bot): + Args: + bot (commands.Bot): The Discord bot instance. + """ self.bot = bot self.logger = logging.getLogger('DotaDiscordBot') self.prefix = PREFIX - @commands.command(name="bot-help", aliases=['dota-help', 'dotahelp', 'pls', 'help']) - async def send_help(self, ctx): - """Show available commands with examples.""" - self.logger.info(f"Command '!bot-help' invoked by {ctx.author}") + @commands.command( + name="bot-help", + aliases=['dota-help', 'dotahelp', 'pls', 'help'], + brief="Displays the help message with available commands." + ) + async def send_help(self, ctx: commands.Context) -> None: + """ + Sends an embedded help message detailing all available commands and their usage. - embed = discord.Embed(title="Dota Timer Bot - Help", color=0x00ff00) + Args: + ctx (commands.Context): The context in which the command was invoked. + """ + self.logger.info(f"Command '!bot-help' invoked by {ctx.author} in guild ID {ctx.guild.id}") + # Create an embed to structure the help message. + embed = discord.Embed( + title="Dota Timer Bot - Help", + color=0x00ff00, + description="List of available commands and their descriptions." + ) + + # Define command categories and their respective commands. categories = { "📅 **Game Timer**": [ { "name": f"{self.prefix}start [mode]", - "value": "Starts the game timer.\n**Example:** `{0}start 45` or `{0}start -10:00 turbo`".format(self.prefix) + "value": "Starts the game timer.\n**Example:** `{0}start 45` or `{0}start -10:00 turbo`".format( + self.prefix) }, { "name": f"{self.prefix}stop", @@ -49,17 +71,20 @@ async def send_help(self, ctx): "🛡️ **Roshan Timer**": [ { "name": f"{self.prefix}rosh *(Aliases: `rs`, `rsdead`, `rs-dead`, `rsdied`, `rs-died`)*", - "value": "Logs Roshan's death and starts the respawn timer.\n**Example:** `{0}rosh`".format(self.prefix) + "value": "Logs Roshan's death and starts the respawn timer.\n**Example:** `{0}rosh`".format( + self.prefix) }, { "name": f"{self.prefix}cancel-rosh *(Aliases: `rsalive`, `rsback`, `rsb`)*", - "value": "Cancels the Roshan respawn timer if active.\n**Example:** `{0}cancel-rosh`".format(self.prefix) + "value": "Cancels the Roshan respawn timer if active.\n**Example:** `{0}cancel-rosh`".format( + self.prefix) } ], "🔮 **Glyph Timer**": [ { "name": f"{self.prefix}glyph *(Alias: `g`)*", - "value": "Starts a 5-minute cooldown timer for the enemy's glyph.\n**Example:** `{0}glyph`".format(self.prefix) + "value": "Starts a 5-minute cooldown timer for the enemy's glyph.\n**Example:** `{0}glyph`".format( + self.prefix) }, { "name": f"{self.prefix}cancel-glyph *(Alias: `cg`)*", @@ -69,7 +94,8 @@ async def send_help(self, ctx): "🐉 **Tormentor Timer**": [ { "name": f"{self.prefix}tormentor *(Aliases: `tm`, `torm`, `t`)*", - "value": "Logs Tormentor's death and starts the respawn timer.\n**Example:** `{0}tormentor`".format(self.prefix) + "value": "Logs Tormentor's death and starts the respawn timer.\n**Example:** `{0}tormentor`".format( + self.prefix) }, { "name": f"{self.prefix}cancel-torm *(Aliases: `ct`, `tormentorcancel`)*", @@ -79,17 +105,20 @@ async def send_help(self, ctx): "💬 **Mindful Messages**": [ { "name": f"{self.prefix}enable-mindful *(Aliases: `enable-pma`, `pma`)*", - "value": "Enables periodic mindful messages to encourage positive play.\n**Example:** `{0}enable-mindful`".format(self.prefix) + "value": "Enables periodic mindful messages to encourage positive play.\n**Example:** `{0}enable-mindful`".format( + self.prefix) }, { "name": f"{self.prefix}disable-mindful *(Aliases: `disable-pma`, `no-pma`)*", - "value": "Disables the periodic mindful messages.\n**Example:** `{0}disable-mindful`".format(self.prefix) + "value": "Disables the periodic mindful messages.\n**Example:** `{0}disable-mindful`".format( + self.prefix) } ], "⚙️ **Custom Events**": [ { "name": f"{self.prefix}add-event ", - "value": "Adds a custom event.\n**Static:** `{0}add-event static 10:00 \"Siege Creep incoming!\"`\n**Periodic:** `{0}add-event periodic 05:00 02:00 20:00 \"Bounty Runes soon!\"`".format(self.prefix) + "value": "Adds a custom event.\n**Static:** `{0}add-event static 10:00 \"Siege Creep incoming!\"`\n**Periodic:** `{0}add-event periodic 05:00 02:00 20:00 \"Bounty Runes soon!\"`".format( + self.prefix) }, { "name": f"{self.prefix}remove-event ", @@ -101,7 +130,8 @@ async def send_help(self, ctx): }, { "name": f"{self.prefix}reset-events", - "value": "Resets all events to their default values.\n**Example:** `{0}reset-events`".format(self.prefix) + "value": "Resets all events to their default values.\n**Example:** `{0}reset-events`".format( + self.prefix) } ], "ℹ️ **General**": [ @@ -112,16 +142,23 @@ async def send_help(self, ctx): ] } + # Add each category and its commands to the embed as separate fields. for category, commands_list in categories.items(): - value = "" + field_value = "" for cmd in commands_list: - value += f"**{cmd['name']}**\n{cmd['value']}\n\n" - embed.add_field(name=category, value=value, inline=False) + field_value += f"**{cmd['name']}**\n{cmd['value']}\n\n" + embed.add_field(name=category, value=field_value, inline=False) + # Send the embed to the context channel. await ctx.send(embed=embed) - self.logger.info(f"Help message sent to {ctx.author}") + self.logger.info(f"Help message sent to {ctx.author} in guild ID {ctx.guild.id}") + +async def setup(bot: commands.Bot) -> None: + """ + Function required for loading the Cog. -async def setup(bot): - """Function required for loading the Cog.""" + Args: + bot (commands.Bot): The Discord bot instance. + """ await bot.add_cog(HelpCog(bot)) diff --git a/src/communication/announcement.py b/src/communication/announcement.py index 953ae80..3275ef5 100644 --- a/src/communication/announcement.py +++ b/src/communication/announcement.py @@ -1,38 +1,62 @@ -import discord import asyncio + +import discord + from src.managers.tts_manager import TTSManager from src.utils.config import logger + class Announcement: - """A class to manage announcements in text and voice channels.""" + """ + Manages announcements in both text and voice channels for a Discord server. + + This class handles sending embed messages to text channels and managing + text-to-speech (TTS) announcements in voice channels via a queue system. + """ def __init__(self): + """ + Initialize the Announcement manager. + + Sets up the TTS manager, message queue, and initializes consumer task state. + """ self.tts_manager = TTSManager() self.queue = asyncio.Queue() self.consumer_task = None self._consumer_running = False - async def announce(self, game_timer, message): + async def announce(self, game_timer, message: str): """ Announce a message in both text and voice channels. + + Sends an embed message to the text channel and queues the message for + TTS playback in the voice channel if connected. + + Args: + game_timer: An instance containing game state and channel information. + message (str): The message to announce. """ - # Announce in text channel + # Send the message to the text channel await self._announce_text(game_timer, message) - # Instead of playing TTS immediately, put it into a queue - # if voice_client is connected and ready. + # Check if the voice client is connected before queuing the TTS message if game_timer.voice_client and game_timer.voice_client.is_connected(): - # Put the message into the queue for sequential playback await self.queue.put((game_timer, message)) - # Start the consumer if not already running + # Start the consumer task if it's not already running if not self._consumer_running: self._consumer_running = True self.consumer_task = asyncio.create_task(self._message_consumer()) else: logger.warning("Voice client is not connected; cannot announce message.") - async def _announce_text(self, game_timer, message): - """Announce a message in the text channel.""" + async def _announce_text(self, game_timer, message: str): + """ + Send an embed message to the designated text channel. + + Args: + game_timer: An instance containing game state and channel information. + message (str): The message to send as an embed. + """ if game_timer.channel: try: embed = discord.Embed(description=message, color=0x00ff00) @@ -44,18 +68,27 @@ async def _announce_text(self, game_timer, message): logger.warning("Cannot send message; text channel is not set.") async def _message_consumer(self): - """Continuously consume messages from the queue and play them sequentially.""" + """ + Continuously process messages from the queue and play them in the voice channel. + + This coroutine runs as a background task, ensuring that TTS messages are + played sequentially without overlapping. + """ try: while True: game_timer, msg = await self.queue.get() if game_timer.voice_client and game_timer.voice_client.is_connected(): try: await self.tts_manager.play_tts(game_timer.voice_client, msg) + logger.info(f"Played TTS message in voice channel: {msg}") except Exception as e: logger.error(f"Error during voice announcement: {e}", exc_info=True) - # Mark the task as done + else: + logger.warning("Voice client disconnected while processing queue.") + # Indicate that the queued task is done self.queue.task_done() except asyncio.CancelledError: logger.info("Message consumer task cancelled") finally: self._consumer_running = False + logger.debug("Message consumer has stopped running.") diff --git a/src/database.py b/src/database.py index 7d0b2ef..596ae11 100644 --- a/src/database.py +++ b/src/database.py @@ -1,22 +1,34 @@ -# database.py - from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import sessionmaker, declarative_base from src.utils.config import DATABASE_URL -# Create the SQLAlchemy engine +# Initialize the SQLAlchemy engine with the provided database URL. +# The 'check_same_thread' parameter is set to False to allow usage with multiple threads. engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) -# Create a configured "Session" class +# Create a configured "Session" class for database interactions. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) -# Base class for declarative class definitions +# Base class for declarative class definitions. Base = declarative_base() + class ServerSettings(Base): - """Model for storing server-specific settings.""" + """ + Represents server-specific settings in the database. + + Attributes: + id (int): Primary key for the server settings record. + server_id (str): Unique identifier for the Discord server. + prefix (str): Command prefix used by the bot in the server. + timer_channel (str): Name of the text channel designated for timer announcements. + voice_channel (str): Name of the voice channel designated for TTS announcements. + tts_language (str): Language setting for text-to-speech announcements. + mindful_messages_enabled (int): Flag indicating if mindful messages are enabled (1) or disabled (0). + """ __tablename__ = "server_settings" + id = Column(Integer, primary_key=True, index=True) server_id = Column(String, unique=True, index=True, nullable=False) prefix = Column(String, default="!", nullable=False) @@ -25,25 +37,50 @@ class ServerSettings(Base): tts_language = Column(String, default="en-US-AriaNeural", nullable=False) mindful_messages_enabled = Column(Integer, default=0, nullable=False) + class StaticEvent(Base): - """Model for storing static events.""" + """ + Represents static events in the database. + + Attributes: + id (int): Primary key for the static event record. + guild_id (str): Identifier for the Discord guild/server. + mode (str): Game mode ('regular' or 'turbo') associated with the event. + time (int): Time in seconds when the event should trigger. + message (str): Message to announce when the event triggers. + """ __tablename__ = "static_events" + id = Column(Integer, primary_key=True, index=True) guild_id = Column(String, index=True, nullable=False) mode = Column(String, index=True, nullable=False) # 'regular' or 'turbo' time = Column(Integer, nullable=False) # Stored as seconds message = Column(String, nullable=False) + class PeriodicEvent(Base): - """Model for storing periodic events.""" + """ + Represents periodic events in the database. + + Attributes: + id (int): Primary key for the periodic event record. + guild_id (str): Identifier for the Discord guild/server. + mode (str): Game mode ('regular' or 'turbo') associated with the event. + start_time (int): Time in seconds when the event series starts. + interval (int): Interval in seconds between consecutive event triggers. + end_time (int): Time in seconds when the event series ends. + message (str): Message to announce when the event triggers. + """ __tablename__ = "periodic_events" + id = Column(Integer, primary_key=True, index=True) guild_id = Column(String, index=True, nullable=False) mode = Column(String, index=True, nullable=False) # 'regular' or 'turbo' start_time = Column(Integer, nullable=False) # in seconds - interval = Column(Integer, nullable=False) # in seconds - end_time = Column(Integer, nullable=False) # in seconds + interval = Column(Integer, nullable=False) # in seconds + end_time = Column(Integer, nullable=False) # in seconds message = Column(String, nullable=False) -# Create all tables in the database + +# Create all tables in the database based on the defined models. Base.metadata.create_all(bind=engine) diff --git a/src/event_definitions.py b/src/event_definitions.py index c5dc77e..287072a 100644 --- a/src/event_definitions.py +++ b/src/event_definitions.py @@ -1,75 +1,75 @@ -# event_definitions.py - from src.utils.utils import min_to_sec + +# Define static events for regular game mode. regular_static_events = [ {"mode": "regular", "time": min_to_sec("00:01"), "message": "Game has started"}, -# Creeps + # Creeps {"mode": "regular", "time": min_to_sec("01:40"), "message": "First Flagbearer in 20 seconds!"}, {"mode": "regular", "time": min_to_sec("02:30"), "message": "Glyph in 30 seconds!"}, {"mode": "regular", "time": min_to_sec("03:00"), "message": "Glyph is now available!"}, -# Tormentor + # Tormentor {"mode": "regular", "time": min_to_sec("19:00"), "message": "First Tormentor in 1 minute!"}, {"mode": "regular", "time": min_to_sec("20:00"), "message": "Tormentor has spawned!"}, -# Neutrals + # Neutrals {"mode": "regular", "time": min_to_sec("36:40"), "message": "New neutral items!"}, {"mode": "regular", "time": min_to_sec("59:30"), "message": "New neutral items in 30 seconds!"}, {"mode": "regular", "time": min_to_sec("60:00"), "message": "New neutral items!"}, -# Items + # Items {"mode": "regular", "time": min_to_sec("15:00"), "message": "Shard available!"}, ] - +# Define periodic events for regular game mode. regular_periodic_events = [ -# Runes + # Runes {"mode": "regular", "start_time": min_to_sec("05:40"), "interval": min_to_sec("02:00"), "end_time": min_to_sec("99:00"), "message": "Power Runes soon!"}, {"mode": "regular", "start_time": min_to_sec("06:00"), "interval": min_to_sec("07:00"), "end_time": min_to_sec("60:00"), "message": "XP Runes in 60 seconds!"}, {"mode": "regular", "start_time": min_to_sec("02:30"), "interval": min_to_sec("03:00"), "end_time": min_to_sec("60:00"), "message": "Gold Runes in 30 seconds!"}, -# Tormentor + # Tormentor {"mode": "regular", "start_time": min_to_sec("21:00"), "interval": min_to_sec("10:00"), "end_time": min_to_sec("70:00"), "message": "Don't forget Tormentor!"}, -# Creeps + # Creeps {"mode": "regular", "start_time": min_to_sec("03:00"), "interval": min_to_sec("01:00"), "end_time": min_to_sec("10:00"), "message": "Flagbearer just spawned!"}, {"mode": "regular", "start_time": min_to_sec("04:30"), "interval": min_to_sec("05:00"), "end_time": min_to_sec("99:00"), "message": "Siege Creep in 30 seconds!"}, {"mode": "regular", "start_time": min_to_sec("20:00"), "interval": min_to_sec("10:00"), "end_time": min_to_sec("99:00"), "message": "Roshan bottom side!"}, {"mode": "regular", "start_time": min_to_sec("25:00"), "interval": min_to_sec("10:00"), "end_time": min_to_sec("99:00"), "message": "Roshan top side!"}, -# Lotus + # Lotus {"mode": "regular", "start_time": min_to_sec("02:30"), "interval": min_to_sec("03:00"), "end_time": min_to_sec("16:00"), "message": "Lotus pool in 30 seconds!"}, {"mode": "regular", "start_time": min_to_sec("03:00"), "interval": min_to_sec("03:00"), "end_time": min_to_sec("16:00"), "message": "Lotus spawned!"}, -# Neutrals + # Neutrals {"mode": "regular", "start_time": min_to_sec("07:00"), "interval": min_to_sec("10:00"), "end_time": min_to_sec("27:05"), "message": "New neutral items available!"}, -# Items + # Items {"mode": "regular", "start_time": min_to_sec("02:15"), "interval": min_to_sec("02:15"), "end_time": min_to_sec("07:30"), "message": "Wards now available!"}, ] +# Define static events for turbo game mode. turbo_static_events = [ {"mode": "turbo", "time": min_to_sec("00:01"), "message": "Game has started"}, -# Neutrals + # Neutrals {"mode": "turbo", "time": min_to_sec("16:40"), "message": "New neutral items!"}, {"mode": "turbo", "time": min_to_sec("27:16"), "message": "New neutral items!"}, -# Items - {"mode": "turbo", "time": min_to_sec("7:30"), "message": "Shard available!"}, -# Tormentor + # Items + {"mode": "turbo", "time": min_to_sec("07:30"), "message": "Shard available!"}, + # Tormentor {"mode": "turbo", "time": min_to_sec("09:00"), "message": "Tormentor in 60 seconds!"}, {"mode": "turbo", "time": min_to_sec("10:00"), "message": "Tormentor spawned!"}, ] +# Define periodic events for turbo game mode. turbo_periodic_events = [ -# Runes + # Runes {"mode": "turbo", "start_time": min_to_sec("05:30"), "interval": min_to_sec("02:00"), "end_time": min_to_sec("99:00"), "message": "Power Runes in 30 seconds!!"}, - {"mode": "turbo", "start_time": min_to_sec("01:40"), "interval": min_to_sec("02:00"), "end_time": min_to_sec("4:10"), "message": "Water runes in 20 seconds!!"}, + {"mode": "turbo", "start_time": min_to_sec("01:40"), "interval": min_to_sec("02:00"), "end_time": min_to_sec("04:10"), "message": "Water runes in 20 seconds!!"}, {"mode": "turbo", "start_time": min_to_sec("06:00"), "interval": min_to_sec("07:00"), "end_time": min_to_sec("99:00"), "message": "XP Runes in 60 seconds!"}, {"mode": "turbo", "start_time": min_to_sec("06:30"), "interval": min_to_sec("07:00"), "end_time": min_to_sec("99:00"), "message": "XP Runes in 30 seconds!"}, -# Creeps + # Creeps {"mode": "turbo", "start_time": min_to_sec("05:00"), "interval": min_to_sec("05:00"), "end_time": min_to_sec("50:00"), "message": "Siege Creep in 30 seconds!"}, -# Lotus + # Lotus {"mode": "turbo", "start_time": min_to_sec("01:00"), "interval": min_to_sec("01:30"), "end_time": min_to_sec("13:00"), "message": "Lotus pool in 30 seconds!"}, -# Roshan - - {"mode": "turbo", "start_time": min_to_sec("10:00"), "interval": min_to_sec("10:00"), - "end_time": min_to_sec("99:00"), "message": "Roshan bottom side!"}, - {"mode": "turbo", "start_time": min_to_sec("15:00"), "interval": min_to_sec("10:00"), - "end_time": min_to_sec("99:00"), "message": "Roshan top side!"}, + # Roshan + {"mode": "turbo", "start_time": min_to_sec("10:00"), "interval": min_to_sec("10:00"), "end_time": min_to_sec("99:00"), "message": "Roshan bottom side!"}, + {"mode": "turbo", "start_time": min_to_sec("15:00"), "interval": min_to_sec("10:00"), "end_time": min_to_sec("99:00"), "message": "Roshan top side!"}, ] +# Define mindful messages to be periodically sent to encourage positive play. mindful_messages = [ {"message": "Remember to take a deep breath and stay calm!"}, {"message": "Focus on your strategy and enjoy the game!"}, @@ -83,6 +83,8 @@ {"message": "Stay hydrated, and keep your cool!"}, {"message": "Remember, it’s just a game. Have fun and relax!"}, ] + +# Define pre-mindful messages to introduce audio calming sounds. mindful_pre_messages = [ {"message": "Take a deep breath. Here are a few moments of soothing nature sounds, just for you."}, {"message": "For a brief pause, listen to these calming sounds from nature. Let yourself unwind."}, @@ -95,4 +97,3 @@ {"message": "Allow yourself to slow down. Here’s a snippet of nature’s calm to help you refocus and relax."}, {"message": "This is your reminder to breathe and let go. Listen to these sounds of nature to find calm."}, ] - diff --git a/src/managers/event_manager.py b/src/managers/event_manager.py index a8d9187..ca1f17a 100644 --- a/src/managers/event_manager.py +++ b/src/managers/event_manager.py @@ -1,7 +1,3 @@ -# src/managers/event_manager.py - -import random - from sqlalchemy.orm import sessionmaker from src.database import StaticEvent, PeriodicEvent, engine, ServerSettings @@ -10,23 +6,38 @@ regular_periodic_events, turbo_static_events, turbo_periodic_events, - mindful_messages ) from src.utils.config import logger -# Create a configured "Session" class +# Create a configured "Session" class for database interactions SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) logger.debug("Database session maker created.") class EventsManager: - """Class to manage static and periodic events for different game modes.""" + """ + Manages static and periodic events for different game modes within Discord guilds. + + This class handles CRUD operations for events and manages server settings related + to events and mindful messages. + """ def __init__(self): + """ + Initialize the EventsManager with a new database session. + """ self.session = SessionLocal() logger.debug("EventsManager session initialized.") - def guild_has_events(self, guild_id): - """Check if a guild already has any events.""" + def guild_has_events(self, guild_id: int) -> bool: + """ + Check if a guild already has any static or periodic events. + + Args: + guild_id (int): The ID of the Discord guild. + + Returns: + bool: True if the guild has at least one static or periodic event, False otherwise. + """ try: has_static = self.session.query(StaticEvent).filter_by(guild_id=str(guild_id)).first() is not None has_periodic = self.session.query(PeriodicEvent).filter_by(guild_id=str(guild_id)).first() is not None @@ -36,11 +47,16 @@ def guild_has_events(self, guild_id): logger.error(f"Error checking if guild ID {guild_id} has events: {e}", exc_info=True) return False - def populate_events_for_guild(self, guild_id): - """Populate the database with base events for a specific guild.""" + def populate_events_for_guild(self, guild_id: int) -> None: + """ + Populate the database with base static and periodic events for a specific guild. + + Args: + guild_id (int): The ID of the Discord guild. + """ logger.info(f"Populating events for guild ID {guild_id}.") try: - # Add static events + # Add static events from predefined regular and turbo lists for event_data in regular_static_events + turbo_static_events: static_event = StaticEvent( guild_id=str(guild_id), @@ -51,7 +67,7 @@ def populate_events_for_guild(self, guild_id): self.session.add(static_event) logger.debug(f"Added StaticEvent: {static_event}") - # Add periodic events + # Add periodic events from predefined regular and turbo lists for event_data in regular_periodic_events + turbo_periodic_events: periodic_event = PeriodicEvent( guild_id=str(guild_id), @@ -71,8 +87,17 @@ def populate_events_for_guild(self, guild_id): self.session.rollback() logger.error(f"Error populating events for guild ID {guild_id}: {e}", exc_info=True) - def get_static_events(self, guild_id, mode='regular'): - """Retrieve all static events for a guild and mode.""" + def get_static_events(self, guild_id: int, mode: str = 'regular') -> dict: + """ + Retrieve all static events for a guild and specified mode. + + Args: + guild_id (int): The ID of the Discord guild. + mode (str, optional): The game mode ('regular' or 'turbo'). Defaults to 'regular'. + + Returns: + dict: A dictionary of static events with event IDs as keys and their details as values. + """ try: events = self.session.query(StaticEvent).filter_by(guild_id=str(guild_id), mode=mode).all() event_dict = {event.id: {"time": event.time, "message": event.message} for event in events} @@ -82,8 +107,17 @@ def get_static_events(self, guild_id, mode='regular'): logger.error(f"Error retrieving static events for guild ID {guild_id}: {e}", exc_info=True) return {} - def get_periodic_events(self, guild_id, mode='regular'): - """Retrieve all periodic events for a guild and mode.""" + def get_periodic_events(self, guild_id: int, mode: str = 'regular') -> dict: + """ + Retrieve all periodic events for a guild and specified mode. + + Args: + guild_id (int): The ID of the Discord guild. + mode (str, optional): The game mode ('regular' or 'turbo'). Defaults to 'regular'. + + Returns: + dict: A dictionary of periodic events with event IDs as keys and their details as values. + """ try: events = self.session.query(PeriodicEvent).filter_by(guild_id=str(guild_id), mode=mode).all() event_dict = { @@ -100,8 +134,22 @@ def get_periodic_events(self, guild_id, mode='regular'): logger.error(f"Error retrieving periodic events for guild ID {guild_id}: {e}", exc_info=True) return {} - def add_static_event(self, guild_id, time, message, mode='regular'): - """Add a static event for a guild.""" + def add_static_event(self, guild_id: int, time: str, message: str, mode: str = 'regular') -> int: + """ + Add a static event for a guild. + + Args: + guild_id (int): The ID of the Discord guild. + time (str): The time at which the event occurs. + message (str): The message associated with the event. + mode (str, optional): The game mode ('regular' or 'turbo'). Defaults to 'regular'. + + Returns: + int: The ID of the newly added static event. + + Raises: + Exception: If there is an error adding the static event. + """ logger.info(f"Adding static event for guild ID {guild_id}: Time={time}, Message='{message}', Mode='{mode}'.") try: new_event = StaticEvent(guild_id=str(guild_id), mode=mode, time=time, message=message) @@ -114,8 +162,32 @@ def add_static_event(self, guild_id, time, message, mode='regular'): logger.error(f"Error adding static event for guild ID {guild_id}: {e}", exc_info=True) raise - def add_periodic_event(self, guild_id, start_time, interval, end_time, message, mode='regular'): - """Add a periodic event for a guild.""" + def add_periodic_event( + self, + guild_id: int, + start_time: str, + interval: int, + end_time: str, + message: str, + mode: str = 'regular' + ) -> int: + """ + Add a periodic event for a guild. + + Args: + guild_id (int): The ID of the Discord guild. + start_time (str): The start time of the periodic event. + interval (int): The interval in seconds between event occurrences. + end_time (str): The end time of the periodic event. + message (str): The message associated with the event. + mode (str, optional): The game mode ('regular' or 'turbo'). Defaults to 'regular'. + + Returns: + int: The ID of the newly added periodic event. + + Raises: + Exception: If there is an error adding the periodic event. + """ logger.info(f"Adding periodic event for guild ID {guild_id}: Start={start_time}, Interval={interval}, End={end_time}, Message='{message}', Mode='{mode}'.") try: new_event = PeriodicEvent( @@ -135,12 +207,23 @@ def add_periodic_event(self, guild_id, start_time, interval, end_time, message, logger.error(f"Error adding periodic event for guild ID {guild_id}: {e}", exc_info=True) raise - def remove_event(self, guild_id, event_id): - """Remove an event (static or periodic) by ID.""" + def remove_event(self, guild_id: int, event_id: int) -> bool: + """ + Remove an event (static or periodic) by its ID. + + Args: + guild_id (int): The ID of the Discord guild. + event_id (int): The ID of the event to remove. + + Returns: + bool: True if the event was successfully removed, False otherwise. + """ logger.info(f"Attempting to remove event ID {event_id} for guild ID {guild_id}.") try: + # Attempt to find the event in StaticEvent event = self.session.query(StaticEvent).filter_by(guild_id=str(guild_id), id=event_id).first() if not event: + # If not found, attempt to find it in PeriodicEvent event = self.session.query(PeriodicEvent).filter_by(guild_id=str(guild_id), id=event_id).first() if event: @@ -156,13 +239,20 @@ def remove_event(self, guild_id, event_id): logger.error(f"Error removing event ID {event_id} for guild ID {guild_id}: {e}", exc_info=True) return False - def close(self): - """Close the database session.""" + def close(self) -> None: + """ + Close the current database session. + """ self.session.close() logger.debug("EventsManager session closed.") - def delete_events_for_guild(self, guild_id): - """Delete all static and periodic events for a specific guild.""" + def delete_events_for_guild(self, guild_id: int) -> None: + """ + Delete all static and periodic events for a specific guild. + + Args: + guild_id (int): The ID of the Discord guild. + """ logger.info(f"Deleting all events for guild ID {guild_id}.") try: # Delete all static events for the guild @@ -179,8 +269,14 @@ def delete_events_for_guild(self, guild_id): self.session.rollback() logger.error(f"Error deleting events for guild ID {guild_id}: {e}", exc_info=True) - def set_mindful_messages(self, guild_id, enabled): - """Enable or disable mindful messages for a guild.""" + def set_mindful_messages(self, guild_id: int, enabled: bool) -> None: + """ + Enable or disable mindful messages for a guild. + + Args: + guild_id (int): The ID of the Discord guild. + enabled (bool): True to enable, False to disable mindful messages. + """ state = 'enabled' if enabled else 'disabled' logger.info(f"Setting mindful messages to {state} for guild ID {guild_id}.") try: @@ -189,6 +285,7 @@ def set_mindful_messages(self, guild_id, enabled): settings.mindful_messages_enabled = 1 if enabled else 0 logger.debug(f"Updated ServerSettings for guild ID {guild_id}.") else: + # Create new settings if none exist settings = ServerSettings(server_id=str(guild_id), mindful_messages_enabled=1 if enabled else 0) self.session.add(settings) logger.debug(f"Created new ServerSettings for guild ID {guild_id}.") @@ -199,8 +296,16 @@ def set_mindful_messages(self, guild_id, enabled): self.session.rollback() logger.error(f"Error setting mindful messages for guild ID {guild_id}: {e}", exc_info=True) - def mindful_messages_enabled(self, guild_id): - """Check if mindful messages are enabled for a guild.""" + def mindful_messages_enabled(self, guild_id: int) -> bool: + """ + Check if mindful messages are enabled for a guild. + + Args: + guild_id (int): The ID of the Discord guild. + + Returns: + bool: True if mindful messages are enabled, False otherwise. + """ try: settings = self.session.query(ServerSettings).filter_by(server_id=str(guild_id)).first() enabled = bool(settings.mindful_messages_enabled) if settings else False @@ -209,9 +314,3 @@ def mindful_messages_enabled(self, guild_id): except Exception as e: logger.error(f"Error checking mindful messages status for guild ID {guild_id}: {e}", exc_info=True) return False - - def get_random_mindful_message(self): - """Get a random mindful message.""" - message = random.choice(mindful_messages)["message"] - logger.debug(f"Selected random mindful message: '{message}'") - return message diff --git a/src/managers/tts_manager.py b/src/managers/tts_manager.py index 65ffd66..ff1d46b 100644 --- a/src/managers/tts_manager.py +++ b/src/managers/tts_manager.py @@ -10,31 +10,55 @@ class TTSManager: - """A class to manage TTS generation, caching, and playback in Discord voice channels.""" + """ + Manages Text-to-Speech (TTS) generation, caching, and playback in Discord voice channels. - # def __init__(self, voice="en-US-AriaNeural", volume=0.5): - def __init__(self, voice="en-GB-RyanNeural", rate="0%", volume=0.5): - self.voice = voice # Default to British male Ryan - self.rate = rate # Moderate rate for clarity, can adjust for slower or faster speech + This class handles the creation and caching of TTS audio files and facilitates their playback + in connected voice channels with volume control. + """ + + def __init__(self, voice: str = "en-GB-RyanNeural", rate: str = "0%", volume: float = 0.5): + """ + Initialize the TTSManager with default voice settings. + + Args: + voice (str, optional): The voice model to use for TTS. Defaults to "en-GB-RyanNeural". + rate (str, optional): The speech rate for TTS. Defaults to "0%". + volume (float, optional): The playback volume (0.0 to 1.0). Defaults to 0.5. + """ + self.voice = voice + self.rate = rate self.volume = volume os.makedirs(TTS_CACHE_DIR, exist_ok=True) + logger.debug(f"TTSManager initialized with voice={self.voice}, rate={self.rate}, volume={self.volume}.") - async def set_voice(self, new_voice): - """Set a new TTS voice.""" + async def set_voice(self, new_voice: str) -> None: + """ + Set a new voice model for TTS. + + Args: + new_voice (str): The identifier for the new voice model. + """ self.voice = new_voice logger.info(f"TTS voice set to {self.voice}") - async def get_tts_audio(self, message): - """Generates or retrieves TTS audio for a given message.""" - # Clean the message to make it safe for TTS - clean_message = re.sub(r'[^\w\s]', '', message) - clean_message = re.sub(r'\s+', ' ', clean_message).strip() + async def get_tts_audio(self, message: str) -> str: + """ + Generate or retrieve a cached TTS audio file for a given message. + + Args: + message (str): The text message to convert to speech. + Returns: + str: The file path to the TTS audio file, or None if generation failed. + """ + # Clean the message to ensure it's safe for TTS + clean_message = self._clean_message(message) if not clean_message: logger.warning("Cleaned message is empty. Skipping TTS generation.") return None - # Generate a unique filename based on the message + # Generate a unique filename based on the message content filename = os.path.join(TTS_CACHE_DIR, f"{hashlib.md5(clean_message.encode()).hexdigest()}.mp3") if os.path.exists(filename): @@ -51,8 +75,31 @@ async def get_tts_audio(self, message): return filename - async def play_tts(self, voice_client, message): - """Plays TTS audio in the specified voice client channel with volume control.""" + def _clean_message(self, message: str) -> str: + """ + Clean the input message by removing unwanted characters and extra spaces. + + Args: + message (str): The original message. + + Returns: + str: The cleaned message. + """ + # Remove non-alphanumeric characters except spaces + clean_message = re.sub(r'[^\w\s]', '', message) + # Replace multiple spaces with a single space and strip leading/trailing spaces + clean_message = re.sub(r'\s+', ' ', clean_message).strip() + logger.debug(f"Cleaned message: '{clean_message}'") + return clean_message + + async def play_tts(self, voice_client: discord.VoiceClient, message: str) -> None: + """ + Play TTS audio in the specified Discord voice channel with volume control. + + Args: + voice_client (discord.VoiceClient): The voice client connected to a voice channel. + message (str): The text message to convert to speech. + """ audio_file = await self.get_tts_audio(message) if audio_file and voice_client and voice_client.is_connected(): audio_source = discord.FFmpegPCMAudio(audio_file) @@ -72,10 +119,19 @@ async def play_tts(self, voice_client, message): else: logger.warning("Voice client is not connected or TTS audio could not be generated.") - async def set_volume(self, new_volume): - """Set a new playback volume (0.0 to 1.0).""" + async def set_volume(self, new_volume: float) -> None: + """ + Set a new playback volume for TTS audio. + + Args: + new_volume (float): The desired volume level (0.0 to 1.0). + + Raises: + ValueError: If the volume is outside the valid range. + """ if 0.0 <= new_volume <= 1.0: self.volume = new_volume logger.info(f"Volume set to {self.volume}") else: - logger.warning("Invalid volume level. Volume must be between 0.0 and 1.0.") \ No newline at end of file + logger.warning("Invalid volume level. Volume must be between 0.0 and 1.0.") + raise ValueError("Volume must be between 0.0 and 1.0.") diff --git a/src/timer.py b/src/timer.py index 8a1af4c..8b7e1d7 100644 --- a/src/timer.py +++ b/src/timer.py @@ -1,5 +1,3 @@ -# src/timer.py - import asyncio from discord.ext import tasks @@ -11,23 +9,50 @@ from src.timers.roshan import RoshanTimer from src.timers.tormentor import TormentorTimer from src.utils.config import logger -from src.utils.utils import parse_initial_countdown # <--- import the new function +from src.utils.utils import parse_initial_countdown class GameTimer: - """Class to manage the game timer and events.""" - - def __init__(self, guild_id, mode='regular'): + """ + Manages the overall game timer and coordinates various event timers. + + Attributes: + guild_id (int): Unique identifier for the Discord guild/server. + mode (str): Game mode ('regular' or 'turbo'). + time_elapsed (int): Total time elapsed since the timer started, in seconds. + channel (discord.TextChannel): Discord text channel for sending announcements. + paused (bool): Indicates if the timer is currently paused. + pause_event (asyncio.Event): Event to handle pausing and resuming. + announcement_manager (Announcement): Instance to manage announcements. + events_manager (EventsManager): Instance to manage event data. + roshan_timer (RoshanTimer): Timer for Roshan's respawn. + glyph_timer (GlyphTimer): Timer for Glyph cooldowns. + tormentor_timer (TormentorTimer): Timer for Tormentor's respawn. + mindful_timer (MindfulTimer): Timer for sending mindful messages. + static_events (dict): Static events loaded for the guild. + periodic_events (dict): Periodic events loaded for the guild. + """ + + def __init__(self, guild_id: int, mode: str = 'regular'): + """ + Initialize the GameTimer with guild-specific settings. + + Args: + guild_id (int): The Discord guild/server ID. + mode (str, optional): The game mode. Defaults to 'regular'. + """ self.guild_id = guild_id self.mode = mode self.time_elapsed = 0 self.channel = None self.paused = False self.pause_event = asyncio.Event() + self.pause_event.set() # Initially not paused + self.announcement_manager = Announcement() self.events_manager = EventsManager() - # Instantiate child timers (but do NOT auto-start them, except mindful) + # Instantiate child timers without starting them automatically, except for mindful_timer. self.roshan_timer = RoshanTimer(self) self.glyph_timer = GlyphTimer(self) self.tormentor_timer = TormentorTimer(self) @@ -36,49 +61,62 @@ def __init__(self, guild_id, mode='regular'): self.static_events = {} self.periodic_events = {} - async def start(self, channel, countdown): - """Start the game timer with either a countdown or already elapsed time.""" + async def start(self, channel: 'discord.TextChannel', countdown: str) -> None: + """ + Start the game timer with an initial countdown. + + Args: + channel (discord.TextChannel): The channel where announcements will be sent. + countdown (str): Initial countdown time (e.g., "45" or "-10:00 turbo"). + """ self.channel = channel self.paused = False self.pause_event.set() + # Parse the initial countdown to set the elapsed time. self.time_elapsed = parse_initial_countdown(countdown) - logger.info(f"Game timer parsed countdown '{countdown}' -> time_elapsed={self.time_elapsed} (seconds).") + logger.info(f"Game timer parsed countdown '{countdown}' -> time_elapsed={self.time_elapsed} seconds.") - # Load event definitions for the guild and mode + # Load event definitions for the guild and specified mode. self.static_events = self.events_manager.get_static_events(self.guild_id, self.mode) self.periodic_events = self.events_manager.get_periodic_events(self.guild_id, self.mode) logger.debug(f"Loaded static/periodic events for guild ID {self.guild_id} in mode '{self.mode}'.") - # Start the main timer loop + # Start the main timer loop if it's not already running. if not self.timer_task.is_running(): self.timer_task.start() logger.info("GameTimer main loop started.") else: logger.debug("GameTimer main loop is already running, skipping restart.") - # Start only the mindful timer automatically + # Start only the mindful timer automatically upon game start. await self.mindful_timer.start(channel) logger.debug("MindfulTimer started automatically upon game start.") - async def stop(self): - """Stop the game timer and all child timers.""" + async def stop(self) -> None: + """ + Stop the game timer and all associated child timers. + """ logger.info(f"Stopping GameTimer for guild ID {self.guild_id}.") self.timer_task.cancel() self.paused = False await self._stop_all_child_timers() logger.info(f"GameTimer and all child timers stopped for guild ID {self.guild_id}.") - async def pause(self): - """Pause the game timer and all child timers.""" + async def pause(self) -> None: + """ + Pause the game timer and all associated child timers. + """ logger.info(f"Pausing GameTimer for guild ID {self.guild_id}.") self.paused = True self.pause_event.clear() await self._pause_all_child_timers() logger.info(f"GameTimer and all child timers paused for guild ID {self.guild_id}.") - async def unpause(self): - """Unpause the game timer and all child timers.""" + async def unpause(self) -> None: + """ + Resume the game timer and all associated child timers if they were paused. + """ logger.info(f"Unpausing GameTimer for guild ID {self.guild_id}.") self.paused = False self.pause_event.set() @@ -86,16 +124,18 @@ async def unpause(self): logger.info(f"GameTimer and all child timers resumed for guild ID {self.guild_id}.") @tasks.loop(seconds=1) - async def timer_task(self): - """Main timer loop checks events every second.""" + async def timer_task(self) -> None: + """ + Main timer loop that increments the elapsed time every second and checks for event triggers. + """ try: if self.paused: await self.pause_event.wait() - self.time_elapsed += 1 # Advance the timer by 1 second + self.time_elapsed += 1 # Advance the timer by 1 second. logger.debug(f"Time elapsed: {self.time_elapsed} seconds (guild_id={self.guild_id})") - # Check both static and periodic events + # Check and trigger both static and periodic events. await self._check_static_events() await self._check_periodic_events() @@ -104,46 +144,70 @@ async def timer_task(self): except Exception as e: logger.error(f"Unexpected error in GameTimer loop for guild ID {self.guild_id}: {e}", exc_info=True) - async def _check_static_events(self): - """Check and trigger static events.""" + async def _check_static_events(self) -> None: + """ + Check and trigger static events based on the elapsed time. + """ for event_id, event in self.static_events.items(): if self.time_elapsed == event["time"]: message = event['message'] - logger.info(f"Triggering static event ID {event_id} for guild ID {self.guild_id}: '{message}' at {self.time_elapsed} seconds.") + logger.info( + f"Triggering static event ID {event_id} for guild ID {self.guild_id}: '{message}' at {self.time_elapsed} seconds.") await self.announcement_manager.announce(self, message) logger.info(f"Static event triggered: ID={event_id}, time={event['time']}, message='{message}'") - async def _check_periodic_events(self): - """Check and trigger predefined periodic events.""" + async def _check_periodic_events(self) -> None: + """ + Check and trigger periodic events based on the elapsed time. + """ for event_id, event in self.periodic_events.items(): if event["start_time"] <= self.time_elapsed <= event["end_time"]: if (self.time_elapsed - event["start_time"]) % event["interval"] == 0: message = event['message'] - logger.info(f"Triggering periodic event ID {event_id} for guild ID {self.guild_id}: '{message}' at {self.time_elapsed} seconds.") + logger.info( + f"Triggering periodic event ID {event_id} for guild ID {self.guild_id}: '{message}' at {self.time_elapsed} seconds.") await self.announcement_manager.announce(self, message) logger.info( f"Periodic event triggered: ID={event_id}, message='{message}', interval={event['interval']}") - async def _stop_all_child_timers(self): - """Stop roshan, glyph, tormentor, mindful timers.""" + async def _stop_all_child_timers(self) -> None: + """ + Stop all child timers (Roshan, Glyph, Tormentor, Mindful). + """ for timer in [self.roshan_timer, self.glyph_timer, self.tormentor_timer, self.mindful_timer]: if timer.is_running: await timer.stop() - async def _pause_all_child_timers(self): - """Pause roshan, glyph, tormentor, mindful timers.""" + async def _pause_all_child_timers(self) -> None: + """ + Pause all child timers (Roshan, Glyph, Tormentor, Mindful). + """ for timer in [self.roshan_timer, self.glyph_timer, self.tormentor_timer, self.mindful_timer]: if timer.is_running and not timer.is_paused: await timer.pause() - async def _resume_all_child_timers(self): - """Resume roshan, glyph, tormentor, mindful timers.""" + async def _resume_all_child_timers(self) -> None: + """ + Resume all child timers (Roshan, Glyph, Tormentor, Mindful) if they were paused. + """ for timer in [self.roshan_timer, self.glyph_timer, self.tormentor_timer, self.mindful_timer]: if timer.is_running and timer.is_paused: await timer.resume() - def is_running(self): + def is_running(self) -> bool: + """ + Check if the main timer task is currently running. + + Returns: + bool: True if the main timer loop is running, False otherwise. + """ return self.timer_task.is_running() - def is_paused(self): + def is_paused(self) -> bool: + """ + Check if the game timer is currently paused. + + Returns: + bool: True if the timer is paused, False otherwise. + """ return self.paused diff --git a/src/timers/base.py b/src/timers/base.py index 39c80e5..9a4a3f7 100644 --- a/src/timers/base.py +++ b/src/timers/base.py @@ -1,14 +1,23 @@ -# src/timers/base.py - import asyncio from src.utils.config import logger class BaseTimer: - """Base class for Dota timers with pause, resume, and stop functionality.""" + """ + Abstract base class for Dota timers with pause, resume, and stop functionality. + + This class provides the foundational structure for specific game timers, + handling common operations such as starting, pausing, resuming, and stopping the timer. + """ def __init__(self, game_timer): + """ + Initialize the BaseTimer with a reference to the game timer. + + Args: + game_timer: An instance containing game state and timer-related information. + """ self.game_timer = game_timer self.is_running = False self.is_paused = False @@ -17,8 +26,13 @@ def __init__(self, game_timer): self.pause_event.set() # Initially not paused logger.debug(f"{self.__class__.__name__} initialized for guild ID {self.game_timer.guild_id}.") - async def start(self, channel): - """Start the timer task asynchronously.""" + async def start(self, channel: any) -> None: + """ + Start the timer task asynchronously. + + Args: + channel: The Discord channel where announcements will be sent. + """ if not self.is_running: self.is_running = True self.is_paused = False @@ -28,26 +42,45 @@ async def start(self, channel): else: logger.warning(f"{self.__class__.__name__} is already running.") - async def _run_timer(self, channel): - """Internal method for timer countdown logic.""" + async def _run_timer(self, channel: any) -> None: + """ + Abstract method for timer countdown logic. + + Must be implemented by subclasses. + + Args: + channel: The Discord channel where announcements will be sent. + """ raise NotImplementedError("This method should be implemented by subclasses") - async def pause(self): - """Pause the timer.""" + async def pause(self) -> None: + """ + Pause the timer. + + If the timer is running and not already paused, it will be paused. + """ if self.is_running and not self.is_paused: self.is_paused = True self.pause_event.clear() logger.info(f"{self.__class__.__name__} paused for guild ID {self.game_timer.guild_id}.") - async def resume(self): - """Resume the timer if it is paused.""" + async def resume(self) -> None: + """ + Resume the timer if it is paused. + + If the timer is running and currently paused, it will resume. + """ if self.is_running and self.is_paused: self.is_paused = False self.pause_event.set() logger.info(f"{self.__class__.__name__} resumed for guild ID {self.game_timer.guild_id}.") - async def stop(self): - """Stop the timer and cancel the task if running.""" + async def stop(self) -> None: + """ + Stop the timer and cancel the task if running. + + Resets the timer's state and ensures that any paused operations are unblocked. + """ if self.is_running: self.is_running = False self.is_paused = False @@ -57,13 +90,19 @@ async def stop(self): try: await self.task except asyncio.CancelledError: - logger.info( - f"{self.__class__.__name__} task was cancelled for guild ID {self.game_timer.guild_id}.") + logger.info(f"{self.__class__.__name__} task was cancelled for guild ID {self.game_timer.guild_id}.") self.task = None logger.info(f"{self.__class__.__name__} stopped for guild ID {self.game_timer.guild_id}.") - async def sleep_with_pause(self, duration): - """Sleep for the specified duration, respecting pause.""" + async def sleep_with_pause(self, duration: float) -> None: + """ + Sleep for the specified duration, respecting pause state. + + This method allows the timer to be paused and resumed during the sleep period. + + Args: + duration (float): The total duration to sleep in seconds. + """ try: while duration > 0 and self.is_running: await self.pause_event.wait() @@ -73,15 +112,16 @@ async def sleep_with_pause(self, duration): except asyncio.CancelledError: pass - async def schedule_warnings(self, warnings_list, announcement): + async def schedule_warnings(self, warnings_list: list, announcement: any) -> None: """ - Helper method to handle repeated "sleep and announce" logic. + Handle repeated "sleep and announce" logic for scheduled warnings. - :param warnings_list: A list of tuples: [(delay_in_seconds, message), ...] - :param announcement: The Announcement() instance to use for sending messages. + Args: + warnings_list (list): A list of tuples in the format [(delay_in_seconds, message), ...]. + announcement: An instance of Announcement to send messages. """ for delay, message in warnings_list: await self.sleep_with_pause(delay) if not self.is_running: - return # If timer was stopped mid-way, exit the loop + return # Exit the loop if the timer was stopped await announcement.announce(self.game_timer, message) diff --git a/src/timers/glyph.py b/src/timers/glyph.py index ccd9454..d949176 100644 --- a/src/timers/glyph.py +++ b/src/timers/glyph.py @@ -1,5 +1,3 @@ -# src/timers/glyph.py - import asyncio from src.communication.announcement import Announcement @@ -8,30 +6,49 @@ class GlyphTimer(BaseTimer): - """Class to handle the Glyph cooldown timer.""" + """ + Handles the Glyph cooldown timer for a Discord guild. + + This timer announces the activation of an enemy glyph and manages cooldown warnings + leading up to the glyph's availability. + """ def __init__(self, game_timer): + """ + Initialize the GlyphTimer with the associated game timer. + + Args: + game_timer: An instance containing game state and timer-related information. + """ super().__init__(game_timer) self.announcement = Announcement() logger.debug(f"GlyphTimer initialized for guild ID {self.game_timer.guild_id}.") - async def _run_timer(self, channel): + async def _run_timer(self, channel: any) -> None: + """ + Execute the Glyph cooldown countdown and send appropriate announcements. + + Args: + channel: The Discord channel where announcements will be sent. + """ try: logger.info(f"GlyphTimer running for guild ID {self.game_timer.guild_id}.") - if self.game_timer.mode == 'regular': - cooldown_duration = 5 * 60 # 5 minutes - else: - cooldown_duration = 3 * 60 # 3 minutes + # Determine cooldown duration based on game mode + cooldown_duration = 5 * 60 if self.game_timer.mode == 'regular' else 3 * 60 + logger.debug(f"Cooldown duration set to {cooldown_duration} seconds.") + # Announce the start of the glyph cooldown await self.announcement.announce(self.game_timer, "Enemy glyph activated. Cooldown started.") logger.info(f"Glyph cooldown started for guild ID {self.game_timer.guild_id}.") - # Warnings for glyph availability + # Define warnings leading up to glyph availability warnings = [ (cooldown_duration - 60, "Enemy glyph available in 1 minute!"), - (60, "Enemy glyph is now available!") + (60, "Enemy glyph is now available!") ] + + # Schedule the warnings await self.schedule_warnings(warnings, self.announcement) except asyncio.CancelledError: diff --git a/src/timers/mindful.py b/src/timers/mindful.py index c691657..2e28038 100644 --- a/src/timers/mindful.py +++ b/src/timers/mindful.py @@ -1,5 +1,3 @@ -# src/timers/mindful.py - import asyncio import os import random @@ -13,16 +11,28 @@ class MindfulTimer(BaseTimer): - """Manages periodic mindful message announcements, with text and optional audio selection.""" - - def __init__(self, game_timer, min_interval=600, max_interval=900, audio_chance=0.07): + """ + Manages periodic mindful message announcements with text and optional audio in Discord guilds. + + This timer sends random mindful messages at random intervals and may include audio messages + based on a specified probability. + """ + + def __init__( + self, + game_timer, + min_interval: int = 600, + max_interval: int = 900, + audio_chance: float = 0.07 + ): """ - Initialize MindfulTimer with text and optional audio functionality. + Initialize the MindfulTimer with configurable intervals and audio probability. - :param game_timer: Reference to the main game timer - :param min_interval: Minimum interval (seconds) between messages - :param max_interval: Maximum interval (seconds) between messages - :param audio_chance: Probability (0 to 1) of sending an audio message + Args: + game_timer: An instance containing game state and timer-related information. + min_interval (int, optional): Minimum interval in seconds between messages. Defaults to 600. + max_interval (int, optional): Maximum interval in seconds between messages. Defaults to 900. + audio_chance (float, optional): Probability (0 to 1) of sending an audio message. Defaults to 0.07. """ super().__init__(game_timer) self.announcement = Announcement() @@ -30,24 +40,41 @@ def __init__(self, game_timer, min_interval=600, max_interval=900, audio_chance= self.max_interval = max_interval self.audio_chance = audio_chance self.audio_files = self._load_audio_files() - logger.debug(f"MindfulTimer initialized for guild ID {self.game_timer.guild_id} with intervals {self.min_interval}-{self.max_interval}s and audio chance {self.audio_chance}.") + logger.debug( + f"MindfulTimer initialized for guild ID {self.game_timer.guild_id} with intervals " + f"{self.min_interval}-{self.max_interval}s and audio chance {self.audio_chance}." + ) + + def _load_audio_files(self) -> list: + """ + Load available .mp3 files from the designated audio directory. - def _load_audio_files(self): - """Load available .mp3 files from the audio directory.""" + Returns: + list: A list of file paths to available audio files. + """ if not os.path.isdir(MINDFUL_AUDIO_DIR): logger.warning(f"Audio directory '{MINDFUL_AUDIO_DIR}' not found.") return [] - audio_files = [os.path.join(MINDFUL_AUDIO_DIR, f) for f in os.listdir(MINDFUL_AUDIO_DIR) if f.endswith('.mp3')] + audio_files = [ + os.path.join(MINDFUL_AUDIO_DIR, f) + for f in os.listdir(MINDFUL_AUDIO_DIR) + if f.endswith('.mp3') + ] logger.info(f"Loaded {len(audio_files)} audio files for mindful messages.") return audio_files - async def _play_audio_with_tts_intro(self, channel): - """Play a TTS message followed by an audio file in the voice channel.""" + async def _play_audio_with_tts_intro(self, channel: any) -> None: + """ + Play a TTS message followed by an audio file in the voice channel. + + Args: + channel: The Discord channel where announcements will be sent. + """ if not self.audio_files or not self.game_timer.voice_client or not self.game_timer.voice_client.is_connected(): logger.warning("Unable to play audio. Ensure audio files are available and the bot is connected to a voice channel.") return - # Send a TTS message before playing audio + # Select and announce a pre-message via TTS message = random.choice(mindful_pre_messages)["message"] await self.announcement.announce(self.game_timer, message) logger.info(f"Sent mindful TTS message in guild ID {self.game_timer.guild_id}: '{message}'") @@ -55,7 +82,7 @@ async def _play_audio_with_tts_intro(self, channel): # Short delay before playing audio await self.sleep_with_pause(2) - # Play a random audio file + # Play a randomly selected audio file audio_file = random.choice(self.audio_files) audio_source = discord.FFmpegPCMAudio(audio_file) self.game_timer.voice_client.play(audio_source) @@ -65,32 +92,38 @@ async def _play_audio_with_tts_intro(self, channel): while self.game_timer.voice_client.is_playing(): await self.sleep_with_pause(0.1) - async def _run_timer(self, channel): - """Send a random mindful message or audio with TTS intro at random intervals while enabled.""" + async def _run_timer(self, channel: any) -> None: + """ + Send random mindful messages or audio at random intervals while enabled. + + Args: + channel: The Discord channel where announcements will be sent. + """ logger.info(f"Starting MindfulTimer for guild ID {self.game_timer.guild_id}.") try: - # Check if mindful messages are enabled, stop if disabled + # Check if mindful messages are enabled; stop if disabled if not self.game_timer.events_manager.mindful_messages_enabled(self.game_timer.guild_id): - logger.info(f"Mindful: messages are disabled for guild ID {self.game_timer.guild_id}. Stopping MindfulTimer.") - return # Exit _run_timer if messages are disabled + logger.info(f"Mindful messages are disabled for guild ID {self.game_timer.guild_id}. Stopping MindfulTimer.") + return # Exit if messages are disabled - # Initial delay before the first message (10 to 15 minutes) + # Initial delay before the first message initial_delay = random.randint(self.min_interval, self.max_interval) - logger.debug(f"Mindful: Initial delay set to {initial_delay} seconds.") + logger.debug(f"MindfulTimer initial delay: {initial_delay} seconds.") await self.sleep_with_pause(initial_delay) while self.is_running: - # Randomly choose between a text or audio message + # Decide whether to send an audio message based on probability if random.random() < self.audio_chance and self.audio_files: await self._play_audio_with_tts_intro(channel) else: + # Send a random text mindful message message = random.choice(mindful_messages)["message"] await self.announcement.announce(self.game_timer, message) logger.info(f"Sent mindful text message in guild ID {self.game_timer.guild_id}: '{message}'") - # Set a random interval between min_interval and max_interval for the next message + # Set a random interval for the next message next_interval = random.randint(self.min_interval, self.max_interval) - logger.debug(f"Waiting {next_interval} seconds until the next mindful message.") + logger.debug(f"MindfulTimer waiting for {next_interval} seconds until the next message.") await self.sleep_with_pause(next_interval) except asyncio.CancelledError: diff --git a/src/timers/roshan.py b/src/timers/roshan.py index 2369418..804caf4 100644 --- a/src/timers/roshan.py +++ b/src/timers/roshan.py @@ -1,5 +1,3 @@ -# src/timers/roshan.py - import asyncio from src.communication.announcement import Announcement @@ -8,16 +6,34 @@ class RoshanTimer(BaseTimer): - """Class to manage Roshan's respawn timer.""" + """ + Manages Roshan's respawn timer in a Discord guild. + + This timer announces the start of Roshan's respawn and schedules warnings leading + up to Roshan's availability based on the game mode. + """ def __init__(self, game_timer): + """ + Initialize the RoshanTimer with the associated game timer. + + Args: + game_timer: An instance containing game state and timer-related information. + """ super().__init__(game_timer) self.announcement = Announcement() logger.debug(f"RoshanTimer initialized for guild ID {self.game_timer.guild_id}.") - async def _run_timer(self, channel): + async def _run_timer(self, channel: any) -> None: + """ + Execute the Roshan respawn countdown and send appropriate announcements. + + Args: + channel: The Discord channel where announcements will be sent. + """ logger.info(f"RoshanTimer running for guild ID {self.game_timer.guild_id}.") try: + # Determine respawn duration based on game mode if self.game_timer.mode == 'turbo': min_respawn = 4 * 60 # 4 minutes max_respawn = 5.5 * 60 # 5.5 minutes @@ -25,22 +41,24 @@ async def _run_timer(self, channel): min_respawn = 8 * 60 # 8 minutes max_respawn = 11 * 60 # 11 minutes - # Start message + logger.debug(f"RoshanTimer set with min_respawn={min_respawn} seconds and max_respawn={max_respawn} seconds.") + + # Announce the start of the Roshan timer await self.announcement.announce(self.game_timer, "Roshan timer started.") logger.info(f"Roshan timer started for guild ID {self.game_timer.guild_id}.") - # Slight delay before announcing the window + # Short delay before announcing the respawn window await self.sleep_with_pause(1) - # Calculate possible respawn window + # Calculate respawn window in minutes max_respawn_minutes, min_respawn_minutes = await self.calc_respawn_time(max_respawn, min_respawn) - - await self.announcement.announce( - self.game_timer, - f"Next roshan between minute {min_respawn_minutes} and {max_respawn_minutes}." + respawn_window_message = ( + f"Next Roshan between minute {min_respawn_minutes} and {max_respawn_minutes}." ) + await self.announcement.announce(self.game_timer, respawn_window_message) + logger.info(f"Announced Roshan respawn window: '{respawn_window_message}'") - # Combine all warnings into a single list + # Define warnings leading up to Roshan's respawn warnings = [ (min_respawn - 300, "Roshan may respawn in 5 minutes!"), (120, "Roshan may respawn in 3 minutes!"), @@ -49,7 +67,7 @@ async def _run_timer(self, channel): ((max_respawn - min_respawn), "Roshan is definitely up now!") ] - # Use the new helper method from BaseTimer + # Schedule the warnings await self.schedule_warnings(warnings, self.announcement) except asyncio.CancelledError: @@ -58,10 +76,20 @@ async def _run_timer(self, channel): self.is_running = False logger.debug(f"RoshanTimer concluded for guild ID {self.game_timer.guild_id}.") - async def calc_respawn_time(self, max_respawn, min_respawn): + async def calc_respawn_time(self, max_respawn: float, min_respawn: float) -> tuple: + """ + Calculate the respawn time window in minutes based on elapsed game time. + + Args: + max_respawn (float): The maximum respawn time in seconds. + min_respawn (float): The minimum respawn time in seconds. + + Returns: + tuple: A tuple containing (max_respawn_minutes, min_respawn_minutes). + """ current_game_time_seconds = self.game_timer.time_elapsed current_game_minutes = current_game_time_seconds // 60 - # Convert min_respawn and max_respawn to integer minutes min_respawn_minutes = int(current_game_minutes + (min_respawn / 60)) max_respawn_minutes = int(current_game_minutes + (max_respawn / 60)) + logger.debug(f"Calculated respawn time window: {min_respawn_minutes}-{max_respawn_minutes} minutes.") return max_respawn_minutes, min_respawn_minutes diff --git a/src/timers/tormentor.py b/src/timers/tormentor.py index cb9b51c..99857b4 100644 --- a/src/timers/tormentor.py +++ b/src/timers/tormentor.py @@ -1,5 +1,3 @@ -# src/timers/tormentor.py - import asyncio from src.communication.announcement import Announcement @@ -8,35 +6,53 @@ class TormentorTimer(BaseTimer): - """Class to handle the Tormentor respawn timer.""" + """ + Handles the Tormentor respawn timer for a Discord guild. + + This timer announces the start of Tormentor's respawn countdown and schedules warnings + leading up to Tormentor's availability based on the game mode. + """ def __init__(self, game_timer): + """ + Initialize the TormentorTimer with the associated game timer. + + Args: + game_timer: An instance containing game state and timer-related information. + """ super().__init__(game_timer) self.announcement = Announcement() logger.debug(f"TormentorTimer initialized for guild ID {self.game_timer.guild_id}.") - async def _run_timer(self, channel): + async def _run_timer(self, channel: any) -> None: + """ + Execute the Tormentor respawn countdown and send appropriate announcements. + + Args: + channel: The Discord channel where announcements will be sent. + """ logger.info(f"TormentorTimer running for guild ID {self.game_timer.guild_id}.") try: - # Determine the full respawn duration based on mode + # Determine respawn duration based on game mode if self.game_timer.mode == 'regular': respawn_duration = 10 * 60 # 10 minutes else: respawn_duration = 5 * 60 # 5 minutes + logger.debug(f"TormentorTimer set with respawn_duration={respawn_duration} seconds.") + + # Announce the start of the Tormentor timer await self.announcement.announce(self.game_timer, "Tormentor timer started.") logger.info(f"Tormentor timer started for guild ID {self.game_timer.guild_id}.") - # Schedule warnings: - # - 3-minute warning at (respawn_duration - 180) - # - 1-minute warning at 120 seconds later - # - final announcement after another 60 seconds + # Define warnings leading up to Tormentor's respawn warnings = [ (respawn_duration - 180, "Tormentor will respawn in 3 minutes!"), (120, "Tormentor will respawn in 1 minute!"), (60, "Tormentor has respawned!") ] + # Schedule the warnings await self.schedule_warnings(warnings, self.announcement) except asyncio.CancelledError: diff --git a/tests/communication/test_announcement.py b/tests/communication/test_announcement.py index 0de01eb..e80234e 100644 --- a/tests/communication/test_announcement.py +++ b/tests/communication/test_announcement.py @@ -1,9 +1,11 @@ import asyncio from unittest.mock import AsyncMock, Mock, patch + import pytest from src.communication.announcement import Announcement + @pytest.fixture def announcement(): with patch('src.communication.announcement.TTSManager') as MockTTSManager: