'''
twitchbot - A maubot plugin for sending Twitch stream notifications
Copyright (C) 2025 L. Bradley LaBoon
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see
" ) if isinstance(evt, MessageEvent): await evt.respond(content) else: await self.client.send_message(self.config["notify_room"], content) return # Use game name as title if no title is provided stream = streams["data"][0] if (len(stream["title"]) < 1): stream["title"] = stream["game_name"] # Upload thumbnail mxc = await self.upload_img(stream["thumbnail_url"].format(width="400", height="225")) if (len(mxc) > 0): img_html = f"" else: img_html = "" # Mention room user if defined if stream["user_login"] in self.config["matrix_users"]: user_html = f"{stream['user_name']}" else: user_html = stream["user_name"] # Construct and send response content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"> {stream['user_name']} is now live on Twitch!\n> **[{stream['title']}](https://twitch.tv/{stream['user_login']})**\n> **Game:** {stream['game_name']}\n> **Viewers:** {stream['viewer_count']}", formatted_body=f"{user['display_name']}
{user['description']}
{img_html}
" ) if isinstance(evt, MessageEvent): await evt.respond(content) else: await self.client.send_message(self.config["notify_room"], content) # Register channel notifications and/or unregister old subscriptions async def register_notifications(self) -> None: self.log.debug("Registering notifications...") # Exit immediately if client_id and client_secret aren't set if ("client_id" not in self.config or "client_secret" not in self.config or len(self.config["client_id"]) < 1 or len(self.config["client_secret"]) < 1): return # Get sub list from config config_subs = self.config["notify_channels"].copy() # Get subs from DB db_subs = await self.database.fetch("SELECT * FROM twitchbot_subs") # Get current sub list from API register_error = TextMessageEventContent( msgtype=MessageType.NOTICE, body="I encountered an issue while registering Twitch notifications. Check the logs for more details." ) response = await self.twitch_api("GET", "/eventsub/subscriptions", None, None, None) if not response: await self.client.send_message(self.config["notify_room"], register_error) return api_subs = response["data"] # Handle paginated results while (response["pagination"] and response["pagination"]["cursor"]): params = { "after": response["pagination"]["cursor"] } response = await self.twitch_api("GET", "/eventsub/subscriptions", params, None, None) if not response: await self.client.send_message(self.config["notify_room"], register_error) return api_subs = api_subs + response["data"] # Check current subs for ones that need removal for sub in api_subs: # Ignore subs that aren't for our instance if (sub["transport"]["callback"] != str(self.webapp_url) + "/stream-notify"): continue # Locate corresponding sub from DB db_sub = None for i in db_subs: if (i["id"] == sub["id"]): db_sub = i break # Unsubscribe from the notification if we have no record of it, or if it's no longer in the config list if (db_sub is None or db_sub["login"] not in config_subs): params = { "id": sub["id"] } await self.twitch_api("DELETE", "/eventsub/subscriptions", params, None, None) # Remove record from the DB if db_sub: self.log.info(f"Unsubscribed from channel {db_sub['login']}") db_subs.remove(db_sub) qd = "DELETE FROM twitchbot_subs WHERE id = $1" await self.database.execute(qd, sub["id"]) else: self.log.info(f"Unsubscribed from user ID {sub['condition']['broadcaster_user_id']}") continue # Otherwise, this sub checks out so remove it from the list of subs that need processing config_subs.remove(db_sub["login"]) db_subs.remove(db_sub) webhook_secret = await self.get_webhook_secret() # Register remaining subs from the config for sub in config_subs: # Check if there is a corresponding DB entry db_sub = None for i in db_subs: if (i["login"] == sub): db_sub = i break # Lookup channel name params = { "login": sub } users = await self.twitch_api("GET", "/users", params, None, None) if not users or len(users["data"]) < 1: if db_sub: if (sub in self.config["matrix_users"]): user_html = f"{db_sub['name']}" else: user_html = db_sub["name"] content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"Lost Twitch channel notification subscription for {db_sub['name']}. Check the logs for more details.", formatted_body=f"{stream['title']}
Game: {stream['game_name']}
{img_html}
Viewers: {stream['viewer_count']}
Lost Twitch channel notification subscription for {user_html}. Check the logs for more details.
" ) await self.client.send_message(self.config["notify_room"], content) else: content = TextMessageEventContent( msgtype=MessageType.NOTICE, body=f"Could not subscribe to Twitch channel '{sub}'. Are you sure it exists?" ) await self.client.send_message(self.config["notify_room"], content) continue user = users["data"][0] # Register subscription req = { "type": "stream.online", "version": "1", "condition": { "broadcaster_user_id": user["id"] }, "transport": { "method": "webhook", "callback": str(self.webapp_url) + "/stream-notify", "secret": webhook_secret } } response = await self.twitch_api("POST", "/eventsub/subscriptions", None, req, None) if not response: content = TextMessageEventContent( msgtype=MessageType.NOTICE, body=f"I encountered an issue subscribing to channel '{sub}'. Check the logs for more details." ) await self.client.send_message(self.config["notify_room"], content) continue # Add new subscription to DB self.log.info(f"Storing new subscription for channel {sub}") q = "INSERT INTO twitchbot_subs (id, login, name) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET login=excluded.login, name=excluded.name" await self.database.execute(q, response["data"][0]["id"], sub, user["display_name"]) # Clean up remaining stale DB entries for sub in db_subs: self.log.info(f"Cleaning stale DB entry for channel {sub['login']}") qd = "DELETE FROM twitchbot_subs WHERE id = $1" await self.database.execute(qd, sub["id"]) # Webhook handler for live stream notifications @web.post("/stream-notify") async def stream_notify(self, req: Request) -> Response: self.log.debug("Handling webhook event...") # Check for necessary headers id = req.headers.get("Twitch-Eventsub-Message-Id") timestamp = req.headers.get("Twitch-Eventsub-Message-Timestamp") signature = req.headers.get("Twitch-Eventsub-Message-Signature") type = req.headers.get("Twitch-Eventsub-Message-Type") if (id is None or timestamp is None or signature is None or type is None): self.log.info("Webhook event is missing required headers") return Response(status=403, text="Missing required headers") # Compute HMAC and validate against the one supplied body = await req.text() data = await req.json() secret = await self.get_webhook_secret() my_hmac = hmac.new(secret.encode(), id.encode(), hashlib.sha256) my_hmac.update(timestamp.encode()) my_hmac.update(body.encode()) if (hmac.compare_digest("sha256=" + my_hmac.hexdigest(), signature) == False): self.log.info("Webhook event failed validation") return Response(status=403, text="Invalid HMAC signature") # Handle challenge requests if (type == "webhook_callback_verification"): self.log.info("Responded to webhook challenge") return Response(status=200, text=data["challenge"]) # Handle subscription revocations if (type == "revocation"): reasons = { "user_removed": "Channel no longer exists", "authorization_revoked": "API access has been revoked (try resetting the bot)", "notification_failures_exceeded": "Notification responses have been too slow (consider a hardware upgrade?)", "version_removed": "This type of subscription is no longer available - please file a bug report" } if (data["subscription"]["status"] in reasons): reason = reasons[data["subscription"]["status"]] else: reason = data["subscription"]["status"] # Get subscription data from DB and send channel notification q = "SELECT * FROM twitchbot_subs WHERE id = $1" row = await self.database.fetchrow(q, data["subscription"]["id"]) if row: self.log.warning(f"Subscription to channel '{row['login']}' revoked.") user_name = row["name"] if (row["login"] in self.config["matrix_users"]): user_html = f"{row['name']}" else: user_html = row["name"] # Delete subscription from DB qd = "DELETE FROM twitchbot_subs WHERE id = $1" await self.database.execute(qd, data["subscription"]["id"]) else: self.log.warning(f"Subscription to user ID {data['subscription']['condition']['broadcaster_user_id']} revoked.") user_name = f"ID {data['subscription']['condition']['broadcaster_user_id']}" user_html = user_name content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f"Lost Twitch channel notification subscription for {user_name}: {reason}", formatted_body=f"Lost Twitch channel notification subscription for {user_html}: {reason}
" ) await self.client.send_message(self.config["notify_room"], content) # Handle notifications if (type == "notification"): self.log.info(f"Sending channel notification for {data['event']['broadcaster_user_login']}") await self.get_status(None, data["event"]["broadcaster_user_login"]) return Response(status=204)