mirror of
https://github.com/TehPeGaSuS/GitBot.git
synced 2026-06-25 08:25:44 +02:00
218 lines
7.6 KiB
Python
218 lines
7.6 KiB
Python
"""RSS/Atom feed poller."""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import html
|
|
import logging
|
|
import re
|
|
from string import Template
|
|
from typing import Callable
|
|
|
|
import aiohttp
|
|
import feedparser
|
|
|
|
import db
|
|
|
|
log = logging.getLogger("rss")
|
|
|
|
DEFAULT_INTERVAL = 300 # 5 minutes
|
|
DEFAULT_FORMAT = "$feed_name: $title <$link>"
|
|
|
|
# Template variables available in format strings:
|
|
#
|
|
# From the feed entry (feedparser field names):
|
|
# $title entry title, HTML stripped
|
|
# $link entry URL
|
|
# $description entry summary/description, HTML stripped
|
|
# $author entry author
|
|
# $published publication date string (as-is from feedparser)
|
|
# $id entry unique ID
|
|
# ... any other feedparser entry field
|
|
#
|
|
# From the feed itself (prefixed with feed_):
|
|
# $feed_name feed title (e.g. "pegasus/testrepo")
|
|
# $feed_link feed URL
|
|
# $feed_author feed author
|
|
# $feed_subtitle feed subtitle/description
|
|
# ... any other feedparser feed.* field, prefixed with feed_
|
|
|
|
|
|
_TAG_RE = re.compile(r"<[^>]+>")
|
|
|
|
def _strip_html(text: str) -> str:
|
|
"""Remove HTML tags and decode entities, collapse whitespace."""
|
|
text = _TAG_RE.sub("", text)
|
|
text = html.unescape(text)
|
|
return " ".join(text.split())
|
|
|
|
|
|
def _flatten_value(value) -> str:
|
|
"""
|
|
feedparser returns some fields as lists of dicts with type/value/href.
|
|
Resolve them to a plain string the same way Limnoria does:
|
|
prefer text/plain, then strip HTML from text/html, then fall back to href.
|
|
"""
|
|
if not isinstance(value, list):
|
|
if isinstance(value, str):
|
|
return value
|
|
return str(value) if value is not None else ""
|
|
|
|
for item in value:
|
|
if isinstance(item, dict) and item.get("type") == "text/plain":
|
|
return item.get("value", "")
|
|
for item in value:
|
|
if isinstance(item, dict) and item.get("type") in \
|
|
("text/html", "application/xhtml+xml"):
|
|
if "value" in item:
|
|
return _strip_html(item["value"])
|
|
if "href" in item:
|
|
return item["href"]
|
|
for item in value:
|
|
if isinstance(item, dict) and "href" in item:
|
|
return item["href"]
|
|
for item in value:
|
|
if isinstance(item, dict) and "value" in item:
|
|
return str(item["value"])
|
|
return str(value)
|
|
|
|
|
|
def _build_vars(feed_meta: dict, entry) -> dict:
|
|
"""
|
|
Build the template substitution dict from a feedparser entry
|
|
and feed metadata dict.
|
|
"""
|
|
# Feed-level vars ($feed_*)
|
|
kwargs = {}
|
|
for k, v in feed_meta.items():
|
|
if isinstance(v, str):
|
|
kwargs[f"feed_{k}"] = _strip_html(v)
|
|
elif isinstance(v, list):
|
|
kwargs[f"feed_{k}"] = _flatten_value(v)
|
|
|
|
# Convenience alias — strip Gitea's "Feed of X" wrapper if present
|
|
raw_name = kwargs.get("feed_title", "")
|
|
m = re.match(r'^[Ff]eed\s+of\s+"?(.+?)"?\s*$', raw_name)
|
|
kwargs.setdefault("feed_name", m.group(1) if m else raw_name)
|
|
|
|
# Entry-level vars — iterate feedparser entry attributes
|
|
entry_dict = dict(entry) if hasattr(entry, "items") else {}
|
|
for k, v in entry_dict.items():
|
|
kwargs[k] = _flatten_value(v) if isinstance(v, list) \
|
|
else (_strip_html(v) if isinstance(v, str) else str(v) if v is not None else "")
|
|
|
|
# Ensure the most common keys always exist (empty string if absent)
|
|
for key in ("title", "link", "description", "author", "published", "id"):
|
|
kwargs.setdefault(key, "")
|
|
|
|
# HTML-strip summary/title from their _detail counterparts if available
|
|
for key in ("title", "summary"):
|
|
detail = entry_dict.get(f"{key}_detail")
|
|
if isinstance(detail, dict) and detail.get("type") in \
|
|
("text/html", "application/xhtml+xml"):
|
|
kwargs[key] = _strip_html(detail.get("value", kwargs.get(key, "")))
|
|
|
|
# Map feedparser's "summary" to "description" if not already set
|
|
if not kwargs.get("description") and kwargs.get("summary"):
|
|
kwargs["description"] = kwargs["summary"]
|
|
|
|
return kwargs
|
|
|
|
|
|
def _format_entry(template_str: str, feed_meta: dict, entry) -> str:
|
|
variables = _build_vars(feed_meta, entry)
|
|
try:
|
|
return Template(template_str).safe_substitute(variables)
|
|
except (KeyError, ValueError) as e:
|
|
log.warning("Bad RSS format template %r: %s — falling back", template_str, e)
|
|
title = variables.get("title", "")
|
|
link = variables.get("link", "")
|
|
name = variables.get("feed_name", "")
|
|
return f"{name}: {title} <{link}>"
|
|
|
|
|
|
def _entry_id(entry) -> str:
|
|
"""Stable unique ID for a feedparser entry."""
|
|
raw = entry.get("id") or entry.get("link") or entry.get("title") or ""
|
|
return "sha1:" + hashlib.sha1(raw.encode()).hexdigest()
|
|
|
|
|
|
class RSSPoller:
|
|
def __init__(self, database, deliver: Callable,
|
|
interval: int = DEFAULT_INTERVAL):
|
|
"""deliver: async callable(network, channel, message)"""
|
|
self._db = database
|
|
self._deliver = deliver
|
|
self._interval = interval
|
|
|
|
async def run(self):
|
|
while True:
|
|
await self._poll()
|
|
await asyncio.sleep(self._interval)
|
|
|
|
async def _poll(self):
|
|
feeds = db.rss_all_feeds(self._db)
|
|
if not feeds:
|
|
return
|
|
log.debug("Polling %d RSS feed(s)", len(feeds))
|
|
async with aiohttp.ClientSession(
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as session:
|
|
for feed in feeds:
|
|
try:
|
|
await self._fetch_and_deliver(session, feed)
|
|
except Exception as e:
|
|
log.exception("Unhandled error polling %s: %s", feed["url"], e)
|
|
|
|
async def _fetch_and_deliver(self, session, feed: dict):
|
|
url = feed["url"]
|
|
feed_id = feed["id"]
|
|
network = feed["network"]
|
|
channel = feed["channel"]
|
|
fmt = feed.get("format", DEFAULT_FORMAT)
|
|
|
|
try:
|
|
async with session.get(url) as resp:
|
|
content = await resp.text()
|
|
except Exception as e:
|
|
log.warning("Failed to fetch %s: %s", url, e)
|
|
return
|
|
|
|
parsed = feedparser.parse(content)
|
|
entries = parsed.get("entries", [])
|
|
feed_meta = dict(parsed.get("feed", {}))
|
|
|
|
if not entries:
|
|
log.debug("No entries in feed %s", url)
|
|
return
|
|
|
|
seen = db.rss_get_seen(self._db, feed_id)
|
|
new_entries = [(eid, e) for e in reversed(entries) # oldest first
|
|
if (eid := _entry_id(e)) not in seen]
|
|
|
|
if not new_entries:
|
|
log.debug("No new entries in %s", url)
|
|
return
|
|
|
|
# First poll: show only the single most recent entry as a "hello",
|
|
# then mark everything seen to avoid a flood of history.
|
|
if not seen:
|
|
eid, entry = new_entries[-1]
|
|
await self._deliver(network, channel,
|
|
_format_entry(fmt, feed_meta, entry))
|
|
db.rss_mark_seen(self._db, feed_id, [e[0] for e in new_entries])
|
|
log.info("Initialised feed %s (%d entries), showed latest",
|
|
url, len(new_entries))
|
|
return
|
|
|
|
# Subsequent polls: deliver up to 3 new entries.
|
|
to_deliver = new_entries[:3]
|
|
for eid, entry in to_deliver:
|
|
await self._deliver(network, channel,
|
|
_format_entry(fmt, feed_meta, entry))
|
|
log.debug("Delivered entry from %s: %s",
|
|
url, entry.get("title", ""))
|
|
|
|
db.rss_mark_seen(self._db, feed_id, [e[0] for e in to_deliver])
|
|
if len(new_entries) > 3:
|
|
log.info("%s had %d new entries, delivered 3", url, len(new_entries))
|