1
0
mirror of https://github.com/TehPeGaSuS/GitBot.git synced 2026-06-25 23:05:46 +02:00
Files
GitBot/rss.py
T
2026-02-26 15:06:55 +01:00

218 lines
7.6 KiB
Python

"""RSS/Atom feed poller."""
import asyncio
import hashlib
import html
import logging
import re
from string import Template
from typing import Callable
import aiohttp
import feedparser
import db
log = logging.getLogger("rss")
DEFAULT_INTERVAL = 300 # 5 minutes
DEFAULT_FORMAT = "$feed_name: $title <$link>"
# Template variables available in format strings:
#
# From the feed entry (feedparser field names):
# $title entry title, HTML stripped
# $link entry URL
# $description entry summary/description, HTML stripped
# $author entry author
# $published publication date string (as-is from feedparser)
# $id entry unique ID
# ... any other feedparser entry field
#
# From the feed itself (prefixed with feed_):
# $feed_name feed title (e.g. "pegasus/testrepo")
# $feed_link feed URL
# $feed_author feed author
# $feed_subtitle feed subtitle/description
# ... any other feedparser feed.* field, prefixed with feed_
_TAG_RE = re.compile(r"<[^>]+>")
def _strip_html(text: str) -> str:
"""Remove HTML tags and decode entities, collapse whitespace."""
text = _TAG_RE.sub("", text)
text = html.unescape(text)
return " ".join(text.split())
def _flatten_value(value) -> str:
"""
feedparser returns some fields as lists of dicts with type/value/href.
Resolve them to a plain string the same way Limnoria does:
prefer text/plain, then strip HTML from text/html, then fall back to href.
"""
if not isinstance(value, list):
if isinstance(value, str):
return value
return str(value) if value is not None else ""
for item in value:
if isinstance(item, dict) and item.get("type") == "text/plain":
return item.get("value", "")
for item in value:
if isinstance(item, dict) and item.get("type") in \
("text/html", "application/xhtml+xml"):
if "value" in item:
return _strip_html(item["value"])
if "href" in item:
return item["href"]
for item in value:
if isinstance(item, dict) and "href" in item:
return item["href"]
for item in value:
if isinstance(item, dict) and "value" in item:
return str(item["value"])
return str(value)
def _build_vars(feed_meta: dict, entry) -> dict:
"""
Build the template substitution dict from a feedparser entry
and feed metadata dict.
"""
# Feed-level vars ($feed_*)
kwargs = {}
for k, v in feed_meta.items():
if isinstance(v, str):
kwargs[f"feed_{k}"] = _strip_html(v)
elif isinstance(v, list):
kwargs[f"feed_{k}"] = _flatten_value(v)
# Convenience alias — strip Gitea's "Feed of X" wrapper if present
raw_name = kwargs.get("feed_title", "")
m = re.match(r'^[Ff]eed\s+of\s+"?(.+?)"?\s*$', raw_name)
kwargs.setdefault("feed_name", m.group(1) if m else raw_name)
# Entry-level vars — iterate feedparser entry attributes
entry_dict = dict(entry) if hasattr(entry, "items") else {}
for k, v in entry_dict.items():
kwargs[k] = _flatten_value(v) if isinstance(v, list) \
else (_strip_html(v) if isinstance(v, str) else str(v) if v is not None else "")
# Ensure the most common keys always exist (empty string if absent)
for key in ("title", "link", "description", "author", "published", "id"):
kwargs.setdefault(key, "")
# HTML-strip summary/title from their _detail counterparts if available
for key in ("title", "summary"):
detail = entry_dict.get(f"{key}_detail")
if isinstance(detail, dict) and detail.get("type") in \
("text/html", "application/xhtml+xml"):
kwargs[key] = _strip_html(detail.get("value", kwargs.get(key, "")))
# Map feedparser's "summary" to "description" if not already set
if not kwargs.get("description") and kwargs.get("summary"):
kwargs["description"] = kwargs["summary"]
return kwargs
def _format_entry(template_str: str, feed_meta: dict, entry) -> str:
variables = _build_vars(feed_meta, entry)
try:
return Template(template_str).safe_substitute(variables)
except (KeyError, ValueError) as e:
log.warning("Bad RSS format template %r: %s — falling back", template_str, e)
title = variables.get("title", "")
link = variables.get("link", "")
name = variables.get("feed_name", "")
return f"{name}: {title} <{link}>"
def _entry_id(entry) -> str:
"""Stable unique ID for a feedparser entry."""
raw = entry.get("id") or entry.get("link") or entry.get("title") or ""
return "sha1:" + hashlib.sha1(raw.encode()).hexdigest()
class RSSPoller:
def __init__(self, database, deliver: Callable,
interval: int = DEFAULT_INTERVAL):
"""deliver: async callable(network, channel, message)"""
self._db = database
self._deliver = deliver
self._interval = interval
async def run(self):
while True:
await self._poll()
await asyncio.sleep(self._interval)
async def _poll(self):
feeds = db.rss_all_feeds(self._db)
if not feeds:
return
log.debug("Polling %d RSS feed(s)", len(feeds))
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
) as session:
for feed in feeds:
try:
await self._fetch_and_deliver(session, feed)
except Exception as e:
log.exception("Unhandled error polling %s: %s", feed["url"], e)
async def _fetch_and_deliver(self, session, feed: dict):
url = feed["url"]
feed_id = feed["id"]
network = feed["network"]
channel = feed["channel"]
fmt = feed.get("format", DEFAULT_FORMAT)
try:
async with session.get(url) as resp:
content = await resp.text()
except Exception as e:
log.warning("Failed to fetch %s: %s", url, e)
return
parsed = feedparser.parse(content)
entries = parsed.get("entries", [])
feed_meta = dict(parsed.get("feed", {}))
if not entries:
log.debug("No entries in feed %s", url)
return
seen = db.rss_get_seen(self._db, feed_id)
new_entries = [(eid, e) for e in reversed(entries) # oldest first
if (eid := _entry_id(e)) not in seen]
if not new_entries:
log.debug("No new entries in %s", url)
return
# First poll: show only the single most recent entry as a "hello",
# then mark everything seen to avoid a flood of history.
if not seen:
eid, entry = new_entries[-1]
await self._deliver(network, channel,
_format_entry(fmt, feed_meta, entry))
db.rss_mark_seen(self._db, feed_id, [e[0] for e in new_entries])
log.info("Initialised feed %s (%d entries), showed latest",
url, len(new_entries))
return
# Subsequent polls: deliver up to 3 new entries.
to_deliver = new_entries[:3]
for eid, entry in to_deliver:
await self._deliver(network, channel,
_format_entry(fmt, feed_meta, entry))
log.debug("Delivered entry from %s: %s",
url, entry.get("title", ""))
db.rss_mark_seen(self._db, feed_id, [e[0] for e in to_deliver])
if len(new_entries) > 3:
log.info("%s had %d new entries, delivered 3", url, len(new_entries))