feat: implement an RSS Discord bot with slash commands for feed management and Docker support.

This commit is contained in:
2026-03-21 20:54:46 +01:00
commit 6ec1df4f78
14 changed files with 1172 additions and 0 deletions

0
src/__init__.py Normal file
View File

190
src/bot.py Normal file
View File

@@ -0,0 +1,190 @@
"""Discord bot with slash commands and background RSS checking."""
import asyncio
import logging
import discord
from discord import app_commands
from discord.ext import tasks
from . import feed_store
from .feed_checker import check_feeds, initialize_feed
logger = logging.getLogger(__name__)
class RSSBot(discord.Client):
"""Discord client with RSS feed monitoring."""
def __init__(self, channel_id: int, check_interval: int, role_id: int | None = None) -> None:
intents = discord.Intents.default()
super().__init__(intents=intents)
self.tree = app_commands.CommandTree(self)
self.target_channel_id = channel_id
self.check_interval = check_interval
self.required_role_id = role_id
self._register_commands()
def _has_role(self, interaction: discord.Interaction) -> bool:
"""Check if the user has the required role. Returns True if no role is configured."""
if self.required_role_id is None:
return True
if not isinstance(interaction.user, discord.Member):
return False
return any(role.id == self.required_role_id for role in interaction.user.roles)
def _register_commands(self) -> None:
"""Register all slash commands."""
group = app_commands.Group(name="feed", description="RSS-Feed Verwaltung")
@group.command(name="add", description="Einen neuen RSS-Feed hinzufügen")
@app_commands.describe(url="Die URL des RSS-Feeds")
async def feed_add(interaction: discord.Interaction, url: str) -> None:
if not self._has_role(interaction):
await interaction.response.send_message(
"🚫 Du hast keine Berechtigung für diesen Befehl.", ephemeral=True
)
return
if feed_store.add_feed(url):
await interaction.response.send_message(
f"✅ Feed hinzugefügt: `{url}`"
)
logger.info("Feed added: %s (by %s)", url, interaction.user)
# Initialize feed in background: mark all as seen, post only newest
asyncio.create_task(self._initialize_and_post(url))
else:
await interaction.response.send_message(
f"⚠️ Feed existiert bereits: `{url}`"
)
@group.command(name="remove", description="Einen RSS-Feed entfernen")
@app_commands.describe(url="Die URL des RSS-Feeds")
async def feed_remove(interaction: discord.Interaction, url: str) -> None:
if not self._has_role(interaction):
await interaction.response.send_message(
"🚫 Du hast keine Berechtigung für diesen Befehl.", ephemeral=True
)
return
if feed_store.remove_feed(url):
await interaction.response.send_message(
f"🗑️ Feed entfernt: `{url}`"
)
logger.info("Feed removed: %s (by %s)", url, interaction.user)
else:
await interaction.response.send_message(
f"⚠️ Feed nicht gefunden: `{url}`"
)
@group.command(name="list", description="Alle gespeicherten RSS-Feeds anzeigen")
async def feed_list(interaction: discord.Interaction) -> None:
if not self._has_role(interaction):
await interaction.response.send_message(
"🚫 Du hast keine Berechtigung für diesen Befehl.", ephemeral=True
)
return
feeds = feed_store.list_feeds()
if not feeds:
await interaction.response.send_message("📭 Keine Feeds gespeichert.")
return
lines = [f"📰 **Gespeicherte Feeds ({len(feeds)}):**"]
for i, url in enumerate(feeds, 1):
lines.append(f"{i}. `{url}`")
await interaction.response.send_message("\n".join(lines))
self.tree.add_command(group)
async def setup_hook(self) -> None:
"""Called when the bot is starting up sync slash commands."""
await self.tree.sync()
logger.info("Slash commands synced globally.")
async def on_ready(self) -> None:
"""Called when bot is connected and ready."""
logger.info("Bot is ready as %s (ID: %s)", self.user, self.user.id)
channel = self.get_channel(self.target_channel_id)
if channel is None:
channel = await self.fetch_channel(self.target_channel_id)
if channel:
logger.info("Target channel: #%s", channel.name)
else:
logger.error("Could not find channel with ID %s!", self.target_channel_id)
# Start the background loop
if not self.check_loop.is_running():
self.check_loop.change_interval(seconds=self.check_interval)
self.check_loop.start()
async def _initialize_and_post(self, url: str) -> None:
"""Initialize a new feed: mark all seen, post only the newest entry."""
entry = initialize_feed(url)
if entry is None:
return
channel = self.get_channel(self.target_channel_id)
if channel is None:
try:
channel = await self.fetch_channel(self.target_channel_id)
except Exception:
logger.exception("Could not fetch target channel")
return
title = entry["title"]
link = entry["link"]
if link:
message = f"📢 **{title}**\n{link}"
else:
message = f"📢 **{title}**"
try:
await channel.send(message)
logger.info("Posted newest entry from new feed: %s", title)
except Exception:
logger.exception("Failed to post entry: %s", title)
async def _check_and_post(self) -> None:
"""Check feeds and post new entries to the target channel."""
channel = self.get_channel(self.target_channel_id)
if channel is None:
try:
channel = await self.fetch_channel(self.target_channel_id)
except Exception:
logger.exception("Could not fetch target channel")
return
new_entries = check_feeds()
for entry in new_entries:
title = entry["title"]
link = entry["link"]
if link:
message = f"📢 **{title}**\n{link}"
else:
message = f"📢 **{title}**"
try:
await channel.send(message)
logger.info("Posted: %s", title)
except Exception:
logger.exception("Failed to post entry: %s", title)
@tasks.loop(seconds=300) # default, overridden in on_ready
async def check_loop(self) -> None:
"""Background task that periodically checks RSS feeds."""
logger.debug("Running scheduled feed check...")
await self._check_and_post()
@check_loop.before_loop
async def before_check_loop(self) -> None:
"""Wait until the bot is ready before starting the loop."""
await self.wait_until_ready()

32
src/config.py Normal file
View File

@@ -0,0 +1,32 @@
"""Configuration loading from environment variables."""
import os
import sys
from dotenv import load_dotenv
def load_config() -> dict:
"""Load and validate configuration from .env file and environment."""
load_dotenv()
token = os.getenv("DISCORD_TOKEN")
channel_id = os.getenv("DISCORD_CHANNEL_ID")
check_interval = int(os.getenv("CHECK_INTERVAL", "300"))
if not token:
print("ERROR: DISCORD_TOKEN is not set.")
sys.exit(1)
if not channel_id:
print("ERROR: DISCORD_CHANNEL_ID is not set.")
sys.exit(1)
role_id = os.getenv("DISCORD_ROLE_ID")
return {
"token": token,
"channel_id": int(channel_id),
"check_interval": check_interval,
"role_id": int(role_id) if role_id else None,
}

91
src/feed_checker.py Normal file
View File

@@ -0,0 +1,91 @@
"""RSS feed checker fetches feeds and returns new (unseen) entries."""
import logging
import feedparser
from . import feed_store
logger = logging.getLogger(__name__)
def _entry_id(entry) -> str:
"""Get a unique identifier for a feed entry."""
return entry.get("id") or entry.get("link") or entry.get("title", "")
def initialize_feed(url: str) -> dict | None:
"""
Fetch a newly added feed: mark all entries as seen EXCEPT the latest one.
Returns the latest entry as a dict (title, link) or None if feed is empty.
"""
try:
parsed = feedparser.parse(url)
if parsed.bozo and not parsed.entries:
logger.warning("Failed to parse feed %s: %s", url, parsed.bozo_exception)
return None
if not parsed.entries:
return None
# Mark ALL entries as seen first
all_ids = [_entry_id(e) for e in parsed.entries if _entry_id(e)]
feed_store.mark_seen(url, all_ids)
# Return only the newest entry (first in the list)
newest = parsed.entries[0]
return {
"feed_url": url,
"title": newest.get("title", "Ohne Titel"),
"link": newest.get("link", ""),
}
except Exception:
logger.exception("Error initializing feed %s", url)
return None
def check_feeds() -> list[dict]:
"""
Check all stored feeds for new entries.
Returns a list of dicts with keys: feed_url, title, link
"""
new_entries = []
feeds = feed_store.list_feeds()
for url in feeds:
try:
parsed = feedparser.parse(url)
if parsed.bozo and not parsed.entries:
logger.warning("Failed to parse feed %s: %s", url, parsed.bozo_exception)
continue
seen = feed_store.get_seen_ids(url)
newly_seen = []
for entry in parsed.entries:
eid = _entry_id(entry)
if not eid or eid in seen:
continue
title = entry.get("title", "Ohne Titel")
link = entry.get("link", "")
new_entries.append({
"feed_url": url,
"title": title,
"link": link,
})
newly_seen.append(eid)
if newly_seen:
feed_store.mark_seen(url, newly_seen)
logger.info("Found %d new entries in %s", len(newly_seen), url)
except Exception:
logger.exception("Error checking feed %s", url)
return new_entries

68
src/feed_store.py Normal file
View File

@@ -0,0 +1,68 @@
"""Persistent storage for RSS feed URLs and seen entry IDs."""
import json
import os
from pathlib import Path
DATA_DIR = Path(os.getenv("DATA_DIR", "data"))
FEEDS_FILE = DATA_DIR / "feeds.json"
def _load_data() -> dict:
"""Load the feeds data from disk."""
if not FEEDS_FILE.exists():
return {"feeds": {}}
with open(FEEDS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def _save_data(data: dict) -> None:
"""Save the feeds data to disk."""
DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(FEEDS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def add_feed(url: str) -> bool:
"""Add a new RSS feed URL. Returns False if it already exists."""
data = _load_data()
if url in data["feeds"]:
return False
data["feeds"][url] = {"seen": []}
_save_data(data)
return True
def remove_feed(url: str) -> bool:
"""Remove an RSS feed URL. Returns False if it doesn't exist."""
data = _load_data()
if url not in data["feeds"]:
return False
del data["feeds"][url]
_save_data(data)
return True
def list_feeds() -> list[str]:
"""Return a list of all stored feed URLs."""
data = _load_data()
return list(data["feeds"].keys())
def get_seen_ids(url: str) -> set[str]:
"""Return the set of seen entry IDs for a given feed URL."""
data = _load_data()
feed = data["feeds"].get(url, {})
return set(feed.get("seen", []))
def mark_seen(url: str, entry_ids: list[str]) -> None:
"""Mark a list of entry IDs as seen for a given feed URL."""
data = _load_data()
if url not in data["feeds"]:
return
seen = set(data["feeds"][url].get("seen", []))
seen.update(entry_ids)
# Keep only the last 500 IDs per feed to avoid unbounded growth
data["feeds"][url]["seen"] = list(seen)[-500:]
_save_data(data)

29
src/main.py Normal file
View File

@@ -0,0 +1,29 @@
"""Entry point for the RSS Discord Bot."""
import logging
from .config import load_config
from .bot import RSSBot
def main() -> None:
"""Load config and start the bot."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
config = load_config()
bot = RSSBot(
channel_id=config["channel_id"],
check_interval=config["check_interval"],
role_id=config["role_id"],
)
bot.run(config["token"], log_handler=None)
if __name__ == "__main__":
main()