1
0
mirror of https://github.com/TehPeGaSuS/GitBot.git synced 2026-06-24 10:05:45 +02:00
Files
GitBot/webhook_server.py
2026-02-26 15:06:55 +01:00

195 lines
7.2 KiB
Python

"""Tiny async HTTP server for receiving git forge webhooks."""
import asyncio
import hashlib
import hmac
import json
import logging
import urllib.parse
from typing import Callable, Optional
log = logging.getLogger("webhook")
# Max payload size: 10 MB
MAX_BODY = 10 * 1024 * 1024
class WebhookServer:
def __init__(self, host: str, port: int, deliver: Callable,
secrets: Optional[dict] = None):
"""
deliver: async callable(forge, headers, data)
forge is 'github' | 'gitea' | 'gitlab'
secrets: per-forge secrets, e.g.:
{'github': 'abc', 'gitea': 'xyz', 'gitlab': 'def'}
Verification supports two modes (both optional, both can coexist):
1. URL token — append ?secret=<value> to the webhook URL you
configure in the forge. Simplest to set up.
e.g. https://yourhost/github?secret=abc
2. HMAC header — configure the same secret in the forge's webhook
settings. GitHub/Gitea send HMAC-SHA256; GitLab
sends the token directly as X-Gitlab-Token.
If a secret is configured for a forge, the URL token is checked first.
If that passes, the request is accepted without checking HMAC headers.
If no URL token is present, HMAC header verification is attempted.
If neither passes, the request is rejected with 403.
If no secret is configured for a forge, all requests are accepted.
"""
self._host = host
self._port = port
self._deliver = deliver
self._secrets = {
forge: s.encode()
for forge, s in (secrets or {}).items()
if s
}
async def run(self):
server = await asyncio.start_server(
self._handle, self._host, self._port)
addr = server.sockets[0].getsockname()
log.info("Webhook server listening on %s:%d", *addr)
async with server:
await server.serve_forever()
# ── HTTP parsing ──────────────────────────────────────────────────────────
async def _handle(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
try:
await self._dispatch(reader, writer)
except Exception as e:
log.exception("Error handling webhook request: %s", e)
self._respond(writer, 500, "Internal Server Error")
finally:
try:
writer.close()
except Exception:
pass
async def _dispatch(self, reader, writer):
try:
request_line = (await asyncio.wait_for(
reader.readline(), timeout=10)).decode()
except asyncio.TimeoutError:
self._respond(writer, 408, "Request Timeout")
return
parts = request_line.strip().split(" ")
if len(parts) < 2:
self._respond(writer, 400, "Bad Request")
return
method, path = parts[0], parts[1]
# Read headers
headers = {}
while True:
try:
line = (await asyncio.wait_for(
reader.readline(), timeout=10)).decode().strip()
except asyncio.TimeoutError:
self._respond(writer, 408, "Request Timeout")
return
if not line:
break
if ":" in line:
k, _, v = line.partition(":")
headers[k.strip()] = v.strip()
if method != "POST":
self._respond(writer, 405, "Method Not Allowed")
return
# Parse path and query string
# Supports /github?secret=abc, /gitea?secret=xyz, etc.
parsed = urllib.parse.urlparse(path)
clean_path = parsed.path.rstrip("/").lstrip("/").lower()
if clean_path not in ("github", "gitea", "gitlab"):
self._respond(writer, 404, "Not Found")
return
forge = clean_path
qs_params = urllib.parse.parse_qs(parsed.query)
url_secret = qs_params.get("secret", [None])[0]
# Read body
content_length = int(headers.get("Content-Length", 0))
if content_length > MAX_BODY:
self._respond(writer, 413, "Payload Too Large")
return
try:
body = await asyncio.wait_for(
reader.read(content_length), timeout=30)
except asyncio.TimeoutError:
self._respond(writer, 408, "Request Timeout")
return
# Verification — only enforced if a secret is configured for this forge
expected_secret = self._secrets.get(forge)
if expected_secret:
if url_secret is not None:
# Mode 1: ?secret= URL token — simple constant-time compare
if not hmac.compare_digest(url_secret.encode(), expected_secret):
log.warning("[%s] URL secret mismatch", forge)
self._respond(writer, 403, "Forbidden")
return
else:
# Mode 2: HMAC signature header
if not self._verify_hmac(forge, headers, body, expected_secret):
log.warning("[%s] HMAC signature verification failed", forge)
self._respond(writer, 403, "Forbidden")
return
# Parse body
content_type = headers.get("Content-Type", "")
if "x-www-form-urlencoded" in content_type:
qs = urllib.parse.parse_qs(body.decode())
raw = qs.get("payload", ["{}"])[0]
else:
raw = body.decode()
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
log.warning("[%s] JSON parse error: %s", forge, e)
self._respond(writer, 400, "Bad JSON")
return
self._respond(writer, 200, "OK")
asyncio.create_task(self._deliver(forge, headers, data))
@staticmethod
def _verify_hmac(forge: str, headers: dict, body: bytes,
secret: bytes) -> bool:
if forge == "github":
sig_header = headers.get("X-Hub-Signature-256", "")
if not sig_header.startswith("sha256="):
return False
expected = hmac.new(secret, body, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig_header[7:], expected)
elif forge == "gitea":
sig_header = headers.get("X-Gitea-Signature", "")
expected = hmac.new(secret, body, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig_header, expected)
elif forge == "gitlab":
token = headers.get("X-Gitlab-Token", "")
return hmac.compare_digest(token.encode(), secret)
return True
@staticmethod
def _respond(writer, code: int, message: str):
body = message.encode()
response = (
f"HTTP/1.1 {code} {message}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Content-Type: text/plain\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode() + body
writer.write(response)