Files
2026-02-27 19:42:08 +01:00

904 lines
34 KiB
Python

###
# Resilience - plugin.py
#
# Per-network / per-channel automatic IRC self-maintenance:
# - Join retry with configurable max attempts (ban / full / invite-only / bad-key)
# - Per-error command hook (onBanCommand, onFullCommand, etc.)
# - MODE -b self-unban fallback when bot retains ops
# - Auto-rejoin after kick with configurable max attempts
# - Auto-reop after deop (halfop self-op → onDeopCommand fallback)
# - Per-channel on-join command (e.g. ChanServ UP)
# - Per-network perform on connect (comma-separated, $substitutions)
# - Per-network nick-recovery commands + periodic polling
# - Nick password stored privately, available as $password everywhere
###
import re
import string
import time
import supybot.conf as conf
import supybot.world as world
import supybot.ircmsgs as ircmsgs
import supybot.ircutils as ircutils
import supybot.schedule as schedule
import supybot.callbacks as callbacks
from supybot.commands import wrap
from supybot.i18n import PluginInternationalization
_ = PluginInternationalization('Resilience')
# ---------------------------------------------------------------------------
# Module-level helpers (no state, safe to call from anywhere)
# ---------------------------------------------------------------------------
def _desired_nick(irc):
"""Return the configured nick for this network, falling back to global."""
n = conf.supybot.networks.get(irc.network).nick()
return n if n else conf.supybot.nick()
# Split on real commas, not escaped ones.
_REAL_COMMA = re.compile(r'(?<!\\),')
_ESCAPED_COMMA = re.compile(r'\\,')
def _split_commands(s):
"""
Split a comma-separated command string into a list of individual commands.
Backslash-escaped commas (\\,) are preserved as literal commas.
Leading/trailing whitespace is stripped from each command.
Empty commands are dropped.
"""
parts = _REAL_COMMA.split(s)
return [_ESCAPED_COMMA.sub(',', p).strip() for p in parts if p.strip()]
def _substitute(template, irc, desired, password, channel=''):
"""
Apply $-substitutions to a command template string.
$nick / $botnick — desired (configured) nick for this network
$currentnick — nick the bot is actually using right now
$network — network name
$password — value of nickPassword config
$channel — channel name (empty string when not applicable)
"""
t = string.Template(template)
t.idpattern = r'[a-zA-Z][a-zA-Z0-9]*'
return t.safe_substitute(
nick=desired,
botnick=desired,
currentnick=irc.nick,
network=irc.network,
password=password,
channel=channel,
)
def _parse_irc_command(cmd_str):
"""
Parse a raw IRC command string such as
'PRIVMSG ChanServ :UNBAN #channel'
into an IrcMsg ready to send. Returns None for empty/blank input.
Handles the standard IRC colon-prefixed trailing parameter.
"""
cmd_str = cmd_str.strip()
if not cmd_str:
return None
parts = cmd_str.split(None, 1)
command = parts[0].upper()
rest = parts[1] if len(parts) > 1 else ''
if ':' in rest:
idx = rest.index(':')
positional = rest[:idx].split()
trailing = rest[idx + 1:]
args = tuple(positional) + (trailing,)
else:
args = tuple(rest.split()) if rest else ()
return ircmsgs.IrcMsg(command=command, args=args)
# ---------------------------------------------------------------------------
# Main plugin class
# ---------------------------------------------------------------------------
class Resilience(callbacks.Plugin):
"""
Automatic channel and nick resilience for Limnoria.
All settings live under supybot.plugins.Resilience.* and support
Limnoria's three-level hierarchy:
global default
└─ per-network override (config network <net> supybot.plugins.Resilience.X value)
└─ per-channel override (config channel #chan supybot.plugins.Resilience.X value)
Network-only settings (perform, nickPassword, recoverNick, etc.) are set
with 'config network <net> ...' only.
See 'help resilience' and 'config list supybot.plugins.Resilience' for
the full list of options.
"""
def __init__(self, irc):
self.__parent = super(Resilience, self)
self.__parent.__init__(irc)
# (network, channel) -> schedule event name
self._joinRetryEvents = {}
self._reopEvents = {}
self._kickRejoinEvents = {}
# (network, channel) -> int — attempt counters
self._joinAttempts = {} # reset on successful join
self._kickAttempts = {} # reset on successful join
# network -> schedule event name
self._nickPollEvents = {}
self._performEvents = {}
# network -> schedule event name for the delayed NICK after recover cmds
self._nickClaimEvents = {}
# -----------------------------------------------------------------------
# Cleanup
# -----------------------------------------------------------------------
def die(self):
for mapping in (self._joinRetryEvents, self._reopEvents,
self._kickRejoinEvents, self._nickPollEvents,
self._performEvents, self._nickClaimEvents):
for name in list(mapping.values()):
try:
schedule.removeEvent(name)
except KeyError:
pass
mapping.clear()
self.__parent.die()
# -----------------------------------------------------------------------
# Scheduling helpers
# -----------------------------------------------------------------------
def _safe_add_event(self, func, when, name):
"""Add a one-shot event, silently replacing any existing event with
the same name."""
try:
schedule.removeEvent(name)
except KeyError:
pass
return schedule.addEvent(func, when, name=name)
def _safe_add_periodic(self, func, interval, name):
"""Add a periodic event, silently replacing any existing one."""
try:
schedule.removeEvent(name)
except KeyError:
pass
return schedule.addPeriodicEvent(func, interval, name=name, now=False)
def _cancel_event(self, mapping, key):
"""Pop a key from a dict and cancel its scheduled event."""
name = mapping.pop(key, None)
if name is not None:
try:
schedule.removeEvent(name)
except KeyError:
pass
# -----------------------------------------------------------------------
# Substitution / command sending helpers
# -----------------------------------------------------------------------
def _password(self, network):
return self.registryValue('nickPassword', network=network)
def _sub(self, template, irc, channel=''):
"""Substitute $-variables in template using the current irc state."""
return _substitute(
template, irc,
desired=_desired_nick(irc),
password=self._password(irc.network),
channel=channel,
)
def _send_raw(self, irc, template, channel='', label='cmd'):
"""
Substitute, parse, and send a single command template.
Does nothing if the template is empty.
"""
raw = template.strip()
if not raw:
return
substituted = self._sub(raw, irc, channel=channel)
msg = _parse_irc_command(substituted)
if msg is None:
return
self.log.info('Resilience[%s]: %s%s', irc.network, label,
substituted)
irc.sendMsg(msg)
def _send_command_list(self, irc, raw_string, channel='', label='perform'):
"""
Parse, substitute, and send a comma-separated list of command
templates. Does nothing if raw_string is empty.
"""
for cmd in _split_commands(raw_string):
self._send_raw(irc, cmd, channel=channel, label=label)
# -----------------------------------------------------------------------
# Perform on connect
# -----------------------------------------------------------------------
def _schedule_perform(self, irc):
raw = self.registryValue('perform', network=irc.network).strip()
if not raw:
return
delay = self.registryValue('performDelay', network=irc.network)
net = irc.network
name = 'Resilience_perform_%s' % net
def _fire():
self._performEvents.pop(net, None)
self._send_command_list(irc, raw, label='perform')
if delay == 0:
_fire()
else:
self._performEvents[net] = self._safe_add_event(
_fire, time.time() + delay, name)
# -----------------------------------------------------------------------
# Nick recovery
# -----------------------------------------------------------------------
def _start_nick_poll(self, irc):
"""Start periodic polling to reclaim the configured nick."""
net = irc.network
delay = self.registryValue('recoverNickDelay', network=net)
if delay == 0 or net in self._nickPollEvents:
return
name = 'Resilience_nickpoll_%s' % net
def _poll():
desired = _desired_nick(irc)
if ircutils.strEqual(irc.nick, desired):
self._stop_nick_poll(irc)
return
# Only attempt if the nick appears free
if desired not in irc.state.nicksToHostmasks:
self.log.info('Resilience[%s]: polling — attempting NICK %s',
net, desired)
irc.queueMsg(ircmsgs.nick(desired))
self._nickPollEvents[net] = self._safe_add_periodic(_poll, delay, name)
self.log.info('Resilience[%s]: started nick poll every %ds', net, delay)
def _stop_nick_poll(self, irc):
self._cancel_event(self._nickPollEvents, irc.network)
def _send_nick_recover_commands(self, irc):
"""Send nickRecoverCommands then schedule a delayed NICK attempt."""
net = irc.network
raw = self.registryValue('nickRecoverCommands', network=net).strip()
if raw:
self._send_command_list(irc, raw, label='nickRecover')
# Wait recoverNickDelay seconds then try NICK <desired>.
# This gives services (RECOVER/RELEASE on DALnet etc.) time to process.
delay = self.registryValue('recoverNickDelay', network=net)
desired = _desired_nick(irc)
name = 'Resilience_nickclaim_%s' % net
def _claim():
self._nickClaimEvents.pop(net, None)
if not ircutils.strEqual(irc.nick, desired):
self.log.info('Resilience[%s]: sending NICK %s', net, desired)
irc.queueMsg(ircmsgs.nick(desired))
if delay == 0:
_claim()
else:
self._nickClaimEvents[net] = self._safe_add_event(
_claim, time.time() + delay, name)
# -----------------------------------------------------------------------
# Join retry internals
# -----------------------------------------------------------------------
def _schedule_join_retry(self, irc, channel):
key = (irc.network, channel)
delay = self.registryValue('retryJoinDelay', channel, irc.network)
name = 'Resilience_join_%s_%s' % (irc.network, channel)
def _retry():
self._joinRetryEvents.pop(key, None)
if channel not in irc.state.channels:
self.log.info('Resilience[%s]: retrying JOIN %s (attempt %d)',
irc.network, channel,
self._joinAttempts.get(key, 0))
ng = conf.supybot.networks.get(irc.network)
irc.queueMsg(ng.channels.join(channel))
self._joinRetryEvents[key] = self._safe_add_event(
_retry, time.time() + delay, name)
def _cancel_join_retry(self, irc, channel):
self._cancel_event(self._joinRetryEvents, (irc.network, channel))
def _handle_join_error(self, irc, msg, error_label, cmd_config_key):
"""
Shared handler for 471 / 473 / 474 / 475.
Flow:
1. Check retryJoin master switch and per-channel config.
2. Increment attempt counter; bail if over retryJoinMax.
3. If selfUnban and it's a ban: attempt MODE -b.
4. Send the on*Command template if configured.
5. Schedule the next join attempt.
"""
channel = msg.args[1]
net = irc.network
if not self.registryValue('retryJoin', channel, net):
return
key = (net, channel)
max_ = self.registryValue('retryJoinMax', channel, net)
attempt = self._joinAttempts.get(key, 0) + 1
self._joinAttempts[key] = attempt
if max_ > 0 and attempt > max_:
self.log.warning(
'Resilience[%s]: giving up on %s after %d attempt(s) (%s).',
net, channel, max_, error_label)
self._joinAttempts.pop(key, None)
return
delay = self.registryValue('retryJoinDelay', channel, net)
self.log.info(
'Resilience[%s]: cannot join %s (%s), attempt %d/%s, retry in %ds.',
net, channel, error_label, attempt,
str(max_) if max_ > 0 else '', delay)
# MODE -b self-unban (only useful if we somehow still have ops)
if error_label == 'banned':
if self.registryValue('selfUnban', channel, net):
self._try_self_unban(irc, channel)
# Send the per-error command hook
cmd_raw = self.registryValue(cmd_config_key, channel, net).strip()
if cmd_raw:
self._send_raw(irc, cmd_raw, channel=channel,
label=cmd_config_key)
self._schedule_join_retry(irc, channel)
# -----------------------------------------------------------------------
# MODE -b self-unban
# -----------------------------------------------------------------------
def _try_self_unban(self, irc, channel):
"""
If the bot still has ops in the channel, find every ban mask that
matches its current hostmask and remove them with MODE -b.
"""
if channel not in irc.state.channels:
return False
chanstate = irc.state.channels[channel]
if irc.nick not in chanstate.ops:
return False
try:
hostmask = irc.state.nickToHostmask(irc.nick)
except KeyError:
hostmask = irc.prefix
matching = [bm for bm in chanstate.bans
if ircutils.hostmaskPatternEqual(bm, hostmask)]
if not matching:
return False
self.log.info('Resilience[%s]: MODE -b self-unban in %s: %s',
irc.network, channel, ', '.join(matching))
chunk_size = irc.state.supported.get('modes', 1) or 1
for i in range(0, len(matching), chunk_size):
chunk = matching[i:i + chunk_size]
irc.queueMsg(ircmsgs.mode(
channel, ['-' + 'b' * len(chunk)] + chunk))
return True
# -----------------------------------------------------------------------
# Kick rejoin internals
# -----------------------------------------------------------------------
def _schedule_kick_rejoin(self, irc, channel):
key = (irc.network, channel)
delay = self.registryValue('rejoinKickDelay', channel, irc.network)
name = 'Resilience_kick_%s_%s' % (irc.network, channel)
def _rejoin():
self._kickRejoinEvents.pop(key, None)
if channel not in irc.state.channels:
self.log.info('Resilience[%s]: rejoining %s after kick '
'(attempt %d)', irc.network, channel,
self._kickAttempts.get(key, 0))
ng = conf.supybot.networks.get(irc.network)
irc.queueMsg(ng.channels.join(channel))
self._kickRejoinEvents[key] = self._safe_add_event(
_rejoin, time.time() + delay, name)
# -----------------------------------------------------------------------
# Reop internals
# -----------------------------------------------------------------------
def _schedule_reop(self, irc, channel):
key = (irc.network, channel)
delay = self.registryValue('autoReopDelay', channel, irc.network)
name = 'Resilience_reop_%s_%s' % (irc.network, channel)
def _reop():
self._reopEvents.pop(key, None)
if channel not in irc.state.channels:
return
chanstate = irc.state.channels[channel]
if irc.nick in chanstate.ops:
return # already opped, nothing to do
# Try halfop self-op first (no external command needed)
if irc.nick in chanstate.halfops:
self.log.info('Resilience[%s]: reop via halfop in %s',
irc.network, channel)
irc.queueMsg(ircmsgs.op(channel, irc.nick))
return
# Fall back to the configured command (e.g. ChanServ OP)
cmd_raw = self.registryValue(
'onDeopCommand', channel, irc.network).strip()
if cmd_raw:
self._send_raw(irc, cmd_raw, channel=channel,
label='onDeopCommand')
else:
self.log.warning(
'Resilience[%s]: deopped in %s, no halfop and '
'onDeopCommand not set — cannot self-reop.',
irc.network, channel)
self._reopEvents[key] = self._safe_add_event(
_reop, time.time() + delay, name)
# -----------------------------------------------------------------------
# IRC event handlers
# -----------------------------------------------------------------------
def do001(self, irc, msg):
"""New connection: clear all stale state for this network."""
net = irc.network
for key in [k for k in self._joinRetryEvents if k[0] == net]:
self._cancel_event(self._joinRetryEvents, key)
for key in [k for k in self._kickRejoinEvents if k[0] == net]:
self._cancel_event(self._kickRejoinEvents, key)
for key in [k for k in self._joinAttempts if k[0] == net]:
self._joinAttempts.pop(key, None)
for key in [k for k in self._kickAttempts if k[0] == net]:
self._kickAttempts.pop(key, None)
self._stop_nick_poll(irc)
self._cancel_event(self._nickClaimEvents, net)
def do376(self, irc, msg):
"""End of MOTD: fire perform, start nick recovery if needed."""
self._schedule_perform(irc)
desired = _desired_nick(irc)
if not ircutils.strEqual(irc.nick, desired):
if self.registryValue('recoverNick', network=irc.network):
self.log.info(
'Resilience[%s]: connected as %s, want %s'
'starting nick recovery.', irc.network, irc.nick, desired)
self._send_nick_recover_commands(irc)
self._start_nick_poll(irc)
do377 = do422 = do376 # ERR_NOMOTD and RPL_MOTD (alt)
# --- Join errors ---
def do471(self, irc, msg):
"""Channel is full (+l)."""
self._handle_join_error(irc, msg, 'full', 'onFullCommand')
def do473(self, irc, msg):
"""Channel is invite-only (+i)."""
self._handle_join_error(irc, msg, 'invite-only', 'onInviteOnlyCommand')
def do474(self, irc, msg):
"""Banned from channel."""
self._handle_join_error(irc, msg, 'banned', 'onBanCommand')
def do475(self, irc, msg):
"""Bad channel key."""
self._handle_join_error(irc, msg, 'bad-key', 'onBadKeyCommand')
def doJoin(self, irc, msg):
"""Successful join: reset attempt counters, cancel pending retries,
fire onJoinCommand if configured."""
if not ircutils.strEqual(msg.nick, irc.nick):
return
channel = msg.channel
net = irc.network
key = (net, channel)
self._cancel_join_retry(irc, channel)
self._cancel_event(self._kickRejoinEvents, key)
self._joinAttempts.pop(key, None)
self._kickAttempts.pop(key, None)
cmd_raw = self.registryValue('onJoinCommand', channel, net).strip()
if cmd_raw:
self._send_raw(irc, cmd_raw, channel=channel,
label='onJoinCommand')
# --- Kick ---
def doKick(self, irc, msg):
channel = msg.channel
if not ircutils.strEqual(msg.args[1], irc.nick):
return
if not self.registryValue('rejoinOnKick', channel, irc.network):
self.log.info('Resilience[%s]: kicked from %s, rejoin disabled.',
irc.network, channel)
return
key = (irc.network, channel)
max_ = self.registryValue('rejoinKickMax', channel, irc.network)
attempt = self._kickAttempts.get(key, 0) + 1
self._kickAttempts[key] = attempt
if max_ > 0 and attempt > max_:
self.log.warning(
'Resilience[%s]: giving up rejoining %s after %d kick(s).',
irc.network, channel, max_)
self._kickAttempts.pop(key, None)
return
self.log.info('Resilience[%s]: kicked from %s by %s, attempt %d/%s.',
irc.network, channel, msg.nick,
attempt, str(max_) if max_ > 0 else '')
self._schedule_kick_rejoin(irc, channel)
# --- Deop ---
def doMode(self, irc, msg):
channel = msg.args[0]
if not irc.isChannel(channel):
return
if not self.registryValue('autoReop', channel, irc.network):
return
for (mode, value) in ircutils.separateModes(msg.args[1:]):
if mode == '-o' and value and ircutils.strEqual(value, irc.nick):
self.log.info('Resilience[%s]: deopped in %s by %s.',
irc.network, channel, msg.nick)
self._schedule_reop(irc, channel)
# --- Nick events ---
def doNick(self, irc, msg):
net = irc.network
desired = _desired_nick(irc)
# Did WE just successfully change to our desired nick?
if ircutils.strEqual(msg.nick, irc.nick):
if ircutils.strEqual(msg.args[0], desired):
self._stop_nick_poll(irc)
self._cancel_event(self._nickClaimEvents, net)
return
# Did someone ELSE just vacate our desired nick?
if ircutils.strEqual(msg.nick, desired) and \
not ircutils.strEqual(irc.nick, desired):
if self.registryValue('recoverNick', network=net):
self.log.info(
'Resilience[%s]: %s freed %s via NICK, claiming.',
net, msg.nick, desired)
irc.queueMsg(ircmsgs.nick(desired))
def doQuit(self, irc, msg):
net = irc.network
desired = _desired_nick(irc)
if ircutils.strEqual(msg.nick, desired) and \
not ircutils.strEqual(irc.nick, desired):
if self.registryValue('recoverNick', network=net):
self.log.info(
'Resilience[%s]: %s quit, claiming freed nick.',
net, desired)
irc.queueMsg(ircmsgs.nick(desired))
def do433(self, irc, msg):
"""Nick in use: make sure the poll loop is running."""
if not ircutils.strEqual(irc.nick, _desired_nick(irc)):
if self.registryValue('recoverNick', network=irc.network):
self._start_nick_poll(irc)
# -----------------------------------------------------------------------
# User-facing commands
# -----------------------------------------------------------------------
# --- perform -----------------------------------------------------------
class perform(callbacks.Commands):
"""
Manage the per-network perform list (commands sent on connect).
Commands are comma-separated raw IRC strings. Use \\, for a literal
comma inside a single command. Substitutions: $nick, $botnick,
$currentnick, $network, $password.
Subcommands: set, show, run, clear
"""
def set(self, irc, msg, args, network, commands):
"""<network> <cmd1>, <cmd2>, ...
Set the perform list for <network>. Separate commands with
commas; use \\, for a literal comma inside a command.
Example:
perform set libera PRIVMSG NickServ :IDENTIFY $password, MODE $nick +ix
"""
plugin = irc.getCallback('Resilience')
plugin.setRegistryValue('perform', commands, network=network)
n = len(_split_commands(commands))
irc.reply(_('Perform list for %s set (%d command(s)).' % (
network, n)))
set = wrap(set, ['admin', 'somethingWithoutSpaces', 'text'])
def show(self, irc, msg, args, network):
"""<network>
Show the current perform list for <network>.
"""
plugin = irc.getCallback('Resilience')
raw = plugin.registryValue('perform', network=network).strip()
if not raw:
irc.reply(_('No perform commands set for %s.' % network))
return
cmds = _split_commands(raw)
irc.reply(' '.join('%d: %s' % (i + 1, c)
for i, c in enumerate(cmds)))
show = wrap(show, ['admin', 'somethingWithoutSpaces'])
def run(self, irc, msg, args, network):
"""<network>
Immediately send the perform list for <network> without
reconnecting. Useful for testing.
"""
plugin = irc.getCallback('Resilience')
target = next((o for o in world.ircs if o.network == network),
None)
if target is None:
irc.error(_('Not connected to %s.' % network), Raise=True)
raw = plugin.registryValue('perform', network=network).strip()
if not raw:
irc.error(_('No perform commands set for %s.' % network),
Raise=True)
plugin._send_command_list(target, raw, label='perform(manual)')
irc.replySuccess()
run = wrap(run, ['admin', 'somethingWithoutSpaces'])
def clear(self, irc, msg, args, network):
"""<network>
Clear the perform list for <network>.
"""
irc.getCallback('Resilience').setRegistryValue(
'perform', '', network=network)
irc.replySuccess()
clear = wrap(clear, ['admin', 'somethingWithoutSpaces'])
# --- nickrecover -------------------------------------------------------
class nickrecover(callbacks.Commands):
"""
Manage per-network nick-recovery commands.
Sent when the bot is not using its configured nick. Same
comma-separated format as perform. Substitutions include
$nick (desired), $currentnick (actual), $password.
Subcommands: set, show, run, clear
"""
def set(self, irc, msg, args, network, commands):
"""<network> <cmd1>, <cmd2>, ...
Set the nick-recovery command list for <network>.
Example for DALnet:
nickrecover set dalnet PRIVMSG NickServ :RECOVER $nick $password, PRIVMSG NickServ :RELEASE $nick $password
"""
plugin = irc.getCallback('Resilience')
plugin.setRegistryValue('nickRecoverCommands', commands,
network=network)
n = len(_split_commands(commands))
irc.reply(_('Nick-recovery list for %s set (%d command(s)).' % (
network, n)))
set = wrap(set, ['admin', 'somethingWithoutSpaces', 'text'])
def show(self, irc, msg, args, network):
"""<network>
Show the current nick-recovery command list for <network>.
"""
plugin = irc.getCallback('Resilience')
raw = plugin.registryValue('nickRecoverCommands',
network=network).strip()
if not raw:
irc.reply(_('No nick-recovery commands set for %s.' % network))
return
cmds = _split_commands(raw)
irc.reply(' '.join('%d: %s' % (i + 1, c)
for i, c in enumerate(cmds)))
show = wrap(show, ['admin', 'somethingWithoutSpaces'])
def run(self, irc, msg, args, network):
"""<network>
Immediately run the nick-recovery commands for <network>.
"""
plugin = irc.getCallback('Resilience')
target = next((o for o in world.ircs if o.network == network),
None)
if target is None:
irc.error(_('Not connected to %s.' % network), Raise=True)
raw = plugin.registryValue('nickRecoverCommands',
network=network).strip()
if not raw:
irc.error(
_('No nick-recovery commands set for %s.' % network),
Raise=True)
plugin._send_command_list(target, raw,
label='nickRecover(manual)')
irc.replySuccess()
run = wrap(run, ['admin', 'somethingWithoutSpaces'])
def clear(self, irc, msg, args, network):
"""<network>
Clear the nick-recovery command list for <network>.
"""
irc.getCallback('Resilience').setRegistryValue(
'nickRecoverCommands', '', network=network)
irc.replySuccess()
clear = wrap(clear, ['admin', 'somethingWithoutSpaces'])
# --- nickpassword ------------------------------------------------------
def nickpassword(self, irc, msg, args, network, password):
"""<network> <password>
Set the nick password for <network>. Stored privately and available
as $password in all command templates.
"""
self.setRegistryValue('nickPassword', password, network=network)
irc.replySuccess()
nickpassword = wrap(nickpassword,
['admin', 'somethingWithoutSpaces', 'something'])
# --- Join retry status / control ---------------------------------------
def retrylist(self, irc, msg, args):
"""takes no arguments
Show all channels currently pending a join retry, across all networks.
"""
if not self._joinRetryEvents:
irc.reply(_('No pending join retries.'))
return
entries = sorted('%s @ %s' % (ch, net)
for (net, ch) in self._joinRetryEvents)
irc.reply(', '.join(entries))
retrylist = wrap(retrylist, ['admin'])
def retrycancel(self, irc, msg, args, channel):
"""[<channel>]
Cancel the pending join retry for <channel> on the current network.
<channel> defaults to the current channel.
"""
key = (irc.network, channel)
if key in self._joinRetryEvents:
self._cancel_join_retry(irc, channel)
self._joinAttempts.pop(key, None)
irc.replySuccess()
else:
irc.reply(_('No pending retry for %s on %s.' % (
channel, irc.network)))
retrycancel = wrap(retrycancel, ['admin', 'inChannel'])
def retrynow(self, irc, msg, args, channel):
"""[<channel>]
Cancel any scheduled retry and immediately attempt to join <channel>
on the current network.
"""
key = (irc.network, channel)
self._cancel_join_retry(irc, channel)
self._joinAttempts.pop(key, None)
ng = conf.supybot.networks.get(irc.network)
irc.queueMsg(ng.channels.join(channel))
irc.noReply()
retrynow = wrap(retrynow, ['admin', 'inChannel'])
# --- Nick recovery control --------------------------------------------
def claimnick(self, irc, msg, args):
"""takes no arguments
Immediately run nick-recovery commands for the current network and
attempt to reclaim the configured nick.
"""
desired = _desired_nick(irc)
if ircutils.strEqual(irc.nick, desired):
irc.reply(_('Already using the configured nick (%s).' % desired))
return
self._send_nick_recover_commands(irc)
if self.registryValue('recoverNick', network=irc.network):
self._start_nick_poll(irc)
irc.noReply()
claimnick = wrap(claimnick, ['admin'])
# --- Manual reop -------------------------------------------------------
def reop(self, irc, msg, args, channel):
"""[<channel>]
Attempt to recover ops in <channel>: tries halfop self-op first,
then falls back to onDeopCommand if configured.
<channel> defaults to the current channel.
"""
if channel not in irc.state.channels:
irc.error(_('I am not in %s.' % channel), Raise=True)
chanstate = irc.state.channels[channel]
if irc.nick in chanstate.ops:
irc.reply(_('I already have ops in %s.' % channel))
return
if irc.nick in chanstate.halfops:
irc.queueMsg(ircmsgs.op(channel, irc.nick))
irc.replySuccess()
return
cmd_raw = self.registryValue('onDeopCommand', channel,
irc.network).strip()
if cmd_raw:
self._send_raw(irc, cmd_raw, channel=channel,
label='onDeopCommand(manual)')
irc.replySuccess()
else:
irc.error(
_('No halfop and onDeopCommand not set for %s.' % channel),
Raise=True)
reop = wrap(reop, ['admin', 'inChannel'])
# --- Manual self-unban -------------------------------------------------
def selfunban(self, irc, msg, args, channel):
"""[<channel>]
Attempt to remove own ban masks from <channel> via MODE -b.
Requires the bot to currently have ops in the channel.
<channel> defaults to the current channel.
"""
if self._try_self_unban(irc, channel):
irc.replySuccess()
else:
irc.error(
_('Cannot unban self in %s '
'(not opped there, or no matching bans).' % channel),
Raise=True)
selfunban = wrap(selfunban, ['admin', 'inChannel'])
Class = Resilience
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: