Skip to content
This repository was archived by the owner on Jun 9, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 313 additions & 3 deletions lightning/cogs/automod/cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
AutoModDurationResponse,
IgnorableEntities)
from lightning.cogs.automod.models import (AutomodConfig, GateKeeperConfig,
SpamConfig)
HeatConfig, SpamConfig)
from lightning.constants import (AUTOMOD_ADVANCED_EVENT_NAMES_MAPPING,
AUTOMOD_ALL_EVENT_NAMES_LITERAL,
AUTOMOD_BASIC_EVENT_NAMES_MAPPING,
Expand Down Expand Up @@ -67,6 +67,7 @@ class AutoMod(LightningCog, required=["Moderation"]):
def __init__(self, bot: LightningBot):
super().__init__(bot)
self.gatekeepers: dict[int, GateKeeperConfig] = {}
self.heat_configs: dict[int, HeatConfig] = {}
self.bot.loop.create_task(self.load_all_gatekeepers())
self.bot.add_dynamic_items(ui.GatekeeperVerificationButton,
ui.GatekeeperVerificationHoneyPotButton)
Expand Down Expand Up @@ -96,6 +97,37 @@ def invalidate_gatekeeper(self, guild_id: int):
if gtkp := self.gatekeepers.pop(guild_id, None):
gtkp.gtkp_loop.cancel()

async def get_heat_config(self, guild_id: int) -> Optional[HeatConfig]:
"""Gets the heat configuration for a guild.

Parameters
----------
guild_id : int
The guild ID

Returns
-------
Optional[HeatConfig]
The heat configuration, or None if not configured
"""
if guild_id in self.heat_configs:
return self.heat_configs[guild_id]

query = "SELECT * FROM guild_automod_heat_config WHERE guild_id=$1;"
config = await self.bot.pool.fetchrow(query, guild_id)
if not config:
return None

query = "SELECT * FROM guild_automod_heat_thresholds WHERE guild_id=$1 ORDER BY heat_threshold ASC;"
thresholds = await self.bot.pool.fetch(query, guild_id)

self.heat_configs[guild_id] = heat_config = HeatConfig(self.bot, config, thresholds)
return heat_config

def invalidate_heat_config(self, guild_id: int):
"""Invalidates the cached heat config for a guild."""
self.heat_configs.pop(guild_id, None)

async def cog_check(self, ctx: LightningContext) -> bool:
if ctx.guild is None:
raise commands.NoPrivateMessage()
Expand Down Expand Up @@ -419,6 +451,229 @@ async def automod_warn_threshold_remove(self, ctx: GuildContext):
await ctx.send("Removed warn threshold!")
await self.get_automod_config.invalidate(ctx.guild.id)

# Heat-based automod commands
@automod.group(name='heat', level=CommandLevel.Admin)
async def automod_heat(self, ctx: GuildContext):
"""Manage heat-based automod system"""
if ctx.invoked_subcommand is None:
await ctx.send_help('automod heat')

@automod_heat.command(name='enable', level=CommandLevel.Admin)
@is_server_manager()
async def automod_heat_enable(self, ctx: GuildContext, decay_seconds: commands.Range[int, 60, 86400] = 3600):
"""Enables the heat-based automod system

Parameters
----------
decay_seconds: int
How long (in seconds) it takes for heat to fully decay. Default is 1 hour (3600 seconds).
Minimum 60 seconds, maximum 24 hours (86400 seconds).
"""
query = """INSERT INTO guild_automod_heat_config (guild_id, enabled, decay_seconds)
VALUES ($1, true, $2)
ON CONFLICT (guild_id)
DO UPDATE SET enabled=true, decay_seconds=EXCLUDED.decay_seconds;"""
await self.bot.pool.execute(query, ctx.guild.id, decay_seconds)
self.invalidate_heat_config(ctx.guild.id)
await ctx.send(f"✅ Heat-based automod enabled! Heat will decay over {decay_seconds} seconds.")

@automod_heat.command(name='disable', level=CommandLevel.Admin)
@is_server_manager()
async def automod_heat_disable(self, ctx: GuildContext):
"""Disables the heat-based automod system"""
query = "UPDATE guild_automod_heat_config SET enabled=false WHERE guild_id=$1;"
resp = await self.bot.pool.execute(query, ctx.guild.id)

if resp == "UPDATE 0":
await ctx.send("Heat-based automod was not enabled!")
return

self.invalidate_heat_config(ctx.guild.id)
await ctx.send("✅ Heat-based automod disabled!")

@automod_heat.command(name='setheat', level=CommandLevel.Admin)
@is_server_manager()
async def automod_heat_set_value(self, ctx: GuildContext, violation_type: str, heat_value: float):
"""Sets the heat value for a specific violation type

Parameters
----------
violation_type: str
The violation type (e.g., 'message-spam', 'invite-spam', 'url-spam', 'mass-mentions', 'message-content-spam')
heat_value: float
The amount of heat to add for this violation (must be positive)
"""
# Validate heat value is positive
if heat_value <= 0:
await ctx.send("❌ Heat value must be a positive number!")
return

# Validate violation type
valid_types = ['message-spam', 'invite-spam', 'url-spam', 'mass-mentions', 'message-content-spam',
'message_spam', 'invite_spam', 'url_spam', 'mass_mentions', 'message_content_spam']
if violation_type not in valid_types:
await ctx.send(f"❌ Invalid violation type! Valid types are: {', '.join(set(valid_types))}")
return

# Normalize to hyphenated format for storage
violation_type = violation_type.replace('_', '-')

# Ensure heat config exists
query = """INSERT INTO guild_automod_heat_config (guild_id)
VALUES ($1)
ON CONFLICT (guild_id) DO NOTHING;"""
await self.bot.pool.execute(query, ctx.guild.id)

# Update heat values
query = """UPDATE guild_automod_heat_config
SET heat_per_violation = jsonb_set(
COALESCE(heat_per_violation, '{}'::jsonb),
ARRAY[$2],
to_jsonb($3::text)
)
WHERE guild_id=$1;"""
await self.bot.pool.execute(query, ctx.guild.id, violation_type, str(heat_value))
self.invalidate_heat_config(ctx.guild.id)
await ctx.send(f"✅ Set heat value for `{violation_type}` to `{heat_value}`")

@automod_heat.command(name='addthreshold', level=CommandLevel.Admin)
@is_server_manager()
async def automod_heat_add_threshold(self, ctx: GuildContext, heat_threshold: int,
punishment: Literal['WARN', 'MUTE', 'KICK', 'BAN'],
duration: Optional[AutoModDuration] = None):
"""Adds a heat threshold with a punishment

Parameters
----------
heat_threshold: int
The heat level that triggers this punishment (must be positive)
punishment: Literal['WARN', 'MUTE', 'KICK', 'BAN']
The punishment to apply
duration: Optional[AutoModDuration]
Duration for temporary punishments (MUTE/BAN only)
"""
# Validate heat threshold is positive
if heat_threshold <= 0:
await ctx.send("❌ Heat threshold must be a positive number!")
return

# Validate duration is only provided for MUTE/BAN
if duration and punishment not in ('MUTE', 'BAN'):
await ctx.send(f"❌ Duration can only be specified for MUTE or BAN punishments, not {punishment}!")
return

# Ensure heat config exists
query = """INSERT INTO guild_automod_heat_config (guild_id)
VALUES ($1)
ON CONFLICT (guild_id) DO NOTHING;"""
await self.bot.pool.execute(query, ctx.guild.id)

# Check for duplicate threshold
query = """SELECT id FROM guild_automod_heat_thresholds
WHERE guild_id=$1 AND heat_threshold=$2;"""
existing = await self.bot.pool.fetchrow(query, ctx.guild.id, heat_threshold)
if existing:
await ctx.send(f"❌ A threshold already exists for {heat_threshold} heat (ID: {existing['id']}). Remove it first if you want to change it.")
return

punishment_duration = None
if duration:
punishment_duration = duration.seconds

query = """INSERT INTO guild_automod_heat_thresholds (guild_id, heat_threshold, punishment_type, punishment_duration)
VALUES ($1, $2, $3, $4);"""
await self.bot.pool.execute(query, ctx.guild.id, heat_threshold, punishment, punishment_duration)
self.invalidate_heat_config(ctx.guild.id)

duration_str = f" for {duration.human_readable}" if duration else ""
await ctx.send(f"✅ Added threshold: {heat_threshold} heat → {punishment}{duration_str}")

@automod_heat.command(name='removethreshold', level=CommandLevel.Admin)
@is_server_manager()
async def automod_heat_remove_threshold(self, ctx: GuildContext, threshold_id: int):
"""Removes a heat threshold

Parameters
----------
threshold_id: int
The ID of the threshold to remove
"""
query = "DELETE FROM guild_automod_heat_thresholds WHERE id=$1 AND guild_id=$2;"
resp = await self.bot.pool.execute(query, threshold_id, ctx.guild.id)

if resp == "DELETE 0":
await ctx.send("Threshold not found!")
return

self.invalidate_heat_config(ctx.guild.id)
await ctx.send("✅ Threshold removed!")

@automod_heat.command(name='view', level=CommandLevel.Admin)
async def automod_heat_view(self, ctx: GuildContext):
"""Views the current heat configuration"""
heat_config = await self.get_heat_config(ctx.guild.id)

if not heat_config:
await ctx.send("Heat-based automod is not configured for this server!")
return

embed = discord.Embed(title="Heat-Based Automod Configuration", color=discord.Color.blue())
embed.add_field(name="Enabled", value="✅ Yes" if heat_config.enabled else "❌ No", inline=False)
embed.add_field(name="Decay Time", value=f"{heat_config.decay_seconds} seconds", inline=False)

if heat_config.heat_per_violation:
violations = "\n".join([f"`{k}`: {v}" for k, v in heat_config.heat_per_violation.items()])
embed.add_field(name="Heat Values", value=violations or "None set", inline=False)

if heat_config.thresholds:
thresholds = "\n".join([
f"ID {t.id}: {t.threshold} heat → {t.punishment}" +
(f" ({t.duration}s)" if t.duration else "")
for t in heat_config.thresholds
])
embed.add_field(name="Thresholds", value=thresholds, inline=False)
else:
embed.add_field(name="Thresholds", value="None configured", inline=False)

await ctx.send(embed=embed)

@automod_heat.command(name='check', level=CommandLevel.Mod)
async def automod_heat_check(self, ctx: GuildContext, member: discord.Member):
"""Checks a member's current heat level

Parameters
----------
member: discord.Member
The member to check
"""
heat_config = await self.get_heat_config(ctx.guild.id)

if not heat_config or not heat_config.enabled:
await ctx.send("Heat-based automod is not enabled!")
return

heat = await heat_config.get_user_heat(member.id)
await ctx.send(f"{member.mention} currently has **{heat:.1f}** heat.")

@automod_heat.command(name='reset', level=CommandLevel.Admin)
@is_server_manager()
async def automod_heat_reset(self, ctx: GuildContext, member: discord.Member):
"""Resets a member's heat to 0

Parameters
----------
member: discord.Member
The member whose heat to reset
"""
heat_config = await self.get_heat_config(ctx.guild.id)

if not heat_config:
await ctx.send("Heat-based automod is not configured!")
return

await heat_config.reset_heat(member.id)
await ctx.send(f"✅ Reset heat for {member.mention}")

async def create_automod_config(self, guild: discord.Guild):
query = """INSERT INTO guild_automod_config (guild_id)
VALUES ($1)
Expand Down Expand Up @@ -617,6 +872,49 @@ async def _handle_punishment(self, options: GuildAutoModRulePunishment, message:

await meth(self, message, options.duration, reason=reason)

async def _handle_heat_punishment(self, message: AutoModMessage, violation_type: str) -> Optional[str]:
"""Handles heat-based punishment for a violation.

Parameters
----------
message : AutoModMessage
The message that triggered the violation
violation_type : str
The type of violation (e.g., 'message_spam', 'invite_spam', 'url_spam', 'mass_mentions', 'message_content_spam')
These are the internal attribute names used by the automod system.

Returns
-------
Optional[str]
The punishment type that was applied (e.g., 'WARN', 'MUTE', 'KICK', 'BAN'), or None if no punishment was applied
"""
heat_config = await self.get_heat_config(message.guild.id)
if not heat_config or not heat_config.enabled:
return None

# Add heat and get new level
new_heat = await heat_config.add_heat(message.author.id, violation_type)

# Check if we should apply a punishment
punishment = heat_config.get_punishment_for_heat(new_heat)
if not punishment:
return None

# Apply the punishment
automod_rule_name = AUTOMOD_EVENT_NAMES_MAPPING.get(violation_type.replace('_', '-'), "AutoMod rule")
reason = f"{automod_rule_name} triggered (Heat: {new_heat:.1f})"

meth = self.punishments[punishment.punishment]

if punishment.punishment not in ("MUTE", "BAN"):
await meth(self, message, reason=reason)
else:
await meth(self, message, punishment.duration, reason=reason)

# Reset heat after punishment
await heat_config.reset_heat(message.author.id)
return punishment.punishment

async def _delete_tracked_messages(self, messages: set[str], guild: discord.Guild):
# Deletes message IDs tracked in AutoMod
tmp: Dict[str, List[discord.Object]] = {}
Expand Down Expand Up @@ -652,8 +950,20 @@ async def handle_bucket(attr_name: str, increment: Optional[Callable[[discord.Me
self.bot.dispatch("lightning_guild_automod_rule_triggered", attr_name, message.guild.id)
messages = await obj.fetch_responsible_messages(message)
await obj.reset_bucket(message)
await self._handle_punishment(obj.punishment, message, attr_name)
if obj.punishment.type != "BAN":

# Try heat-based punishment first
heat_punishment_type = await self._handle_heat_punishment(message, attr_name)

# Determine which punishment was actually applied
if heat_punishment_type:
applied_punishment_type = heat_punishment_type
else:
# If no heat punishment was applied, use the configured rule punishment
await self._handle_punishment(obj.punishment, message, attr_name)
applied_punishment_type = str(obj.punishment.type)

# Only delete messages if the punishment wasn't a BAN
if applied_punishment_type != "BAN":
await self._delete_tracked_messages(messages, message.guild)

await handle_bucket('mass_mentions', lambda m: len(m.mentions) + len(m.role_mentions))
Expand Down
Loading