twitchbot/twitchbot.py

504 lines
20 KiB
Python
Raw Permalink Normal View History

2025-01-14 00:55:26 -05:00
'''
twitchbot - A maubot plugin for sending Twitch stream notifications
Copyright (C) 2025 L. Bradley LaBoon
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
from aiohttp.web import Request, Response
from asyncio import Task, CancelledError, sleep
from maubot import Plugin, MessageEvent
from maubot.handlers import command, web
from mautrix.types import TextMessageEventContent, MessageType, Format
from mautrix.util import background_task
from mautrix.util.async_db import UpgradeTable, Connection
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from secrets import token_hex
from typing import Type
import hashlib
import hmac
upgrade_table = UpgradeTable()
@upgrade_table.register(description="Initial version")
async def upgrade_v1(conn: Connection) -> None:
await conn.execute("CREATE TABLE twitchbot_data (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
await conn.execute("CREATE TABLE twitchbot_subs (id TEXT PRIMARY KEY, login TEXT NOT NULL, name TEXT NOT NULL)")
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("client_id")
helper.copy("client_secret")
helper.copy("notify_channels")
helper.copy("matrix_users")
helper.copy("notify_room")
class TwitchBot(Plugin):
access_check_loop: Task
# On startup: refresh the config, start the access token check loop, and register stream notifications
async def start(self) -> None:
self.config.load_and_update()
self.access_check_loop = background_task.create(self.check_access_token())
background_task.create(self.register_notifications())
# On config update: refresh the config and re-register stream notifications
def on_external_config_update(self) -> None:
self.config.load_and_update()
background_task.create(self.register_notifications())
# On shutdown: stop the access check loop
async def stop(self) -> None:
self.access_check_loop.cancel()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
@classmethod
def get_db_upgrade_table(cls) -> UpgradeTable | None:
return upgrade_table
# Check that we have a valid access token and get a new one if we don't
async def check_access_token(self) -> None:
try:
self.log.debug("Access check loop started.")
while True:
self.log.debug("Performing access token check...")
q = "SELECT value FROM twitchbot_data WHERE key = $1"
row = await self.database.fetchrow(q, "access_token")
if row:
headers = { "Authorization": f"Bearer {row['value']}" }
async with self.http.get("https://id.twitch.tv/oauth2/validate", headers=headers) as response:
result = await response.json()
if (response.status == 401):
self.log.info("Access token is invalid, fetching a new one...")
await self.get_access_token()
else:
self.log.debug("Current access token is valid!")
else:
self.log.info("Access token is missing, fetching a new one...")
await self.get_access_token()
# Sleep for an hour
await sleep(3600)
except CancelledError:
self.log.debug("Access check loop stopped.")
# Get a new access token
async def get_access_token(self) -> str:
data = {
"client_id": self.config["client_id"],
"client_secret": self.config["client_secret"],
"grant_type": "client_credentials"
}
async with self.http.post("https://id.twitch.tv/oauth2/token", data=data) as response:
result = await response.json()
if (response.status >= 400):
self.log.error(f"Error getting access token: {response.status} - {result['message']}")
return ""
# Store new token in the DB
q = "INSERT INTO twitchbot_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=excluded.value"
await self.database.execute(q, "access_token", result["access_token"])
self.log.info("Stored new access token")
return result["access_token"]
# Get or generate webhook secret
async def get_webhook_secret(self) -> str:
q = "SELECT value FROM twitchbot_data WHERE key = $1"
row = await self.database.fetchrow(q, "webhook_secret")
if row:
return row["value"]
else:
self.log.info("No webhook secret found. Generating a new one...")
secret = token_hex(32)
qi = "INSERT INTO twitchbot_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=excluded.value"
await self.database.execute(qi, "webhook_secret", secret)
self.log.info("Stored new webhook secret")
return secret
# Fetch an image URL, upload it to the matrix media store, and return the mcx:// address
async def upload_img(self, link: str) -> str:
image_req = await self.http.get(link)
if (image_req.status < 400):
image = await image_req.read()
mxc = await self.client.upload_media(image, filename=image_req.url.name)
return mxc
return ""
# Make a Twitch API call and return the data
async def twitch_api(self, method: str, endpoint: str, params: dict | None, data: dict | None, evt: MessageEvent | None) -> dict | None:
# Get access token from DB
q = "SELECT value FROM twitchbot_data WHERE key = $1"
row = await self.database.fetchrow(q, "access_token")
if row:
access_token = row["value"]
else:
self.log.info("Access token is missing, fetching a new one...")
access_token = await self.get_access_token()
if (len(access_token) < 1):
if isinstance(evt, MessageEvent):
await evt.respond("I seem to be having trouble authenticating to the Twitch API. Check the logs for more details.")
return None
if (method == "GET"):
httpfunc = self.http.get
elif (method == "POST"):
httpfunc = self.http.post
elif (method == "DELETE"):
httpfunc = self.http.delete
else:
self.log.error(f"Unknown method: {method}")
return None
headers = {
"Authorization": f"Bearer {access_token}",
"Client-Id": self.config["client_id"]
}
retry = 2
while (retry > 0):
async with httpfunc("https://api.twitch.tv/helix" + endpoint, headers=headers, params=params, json=data) as response:
if (response.status == 204):
rdata = {}
else:
rdata = await response.json()
if (response.status == 401):
self.log.info("Access token is invalid, fetching a new one...")
access_token = await self.get_access_token()
if (len(access_token) < 1):
if isinstance(evt, MessageEvent):
await evt.respond("I seem to be having trouble authenticating to the Twitch API. Check the logs for more details.")
return None
headers["Authorization"] = f"Bearer {access_token}"
retry = retry - 1
continue
elif (response.status >= 400):
# For some reason the /streams endpoint returns 400 for nonexistent or deactivated channels
if (endpoint == "/streams" and rdata["message"] == "Malformed query params."):
if isinstance(evt, MessageEvent):
await evt.respond("Sorry, I can't seem to find that channel.")
else:
self.log.warning(f"Channel '{params['user_login']}' does not exist.")
return None
self.log.error(f"{method} {str(response.url)}: {response.status} {rdata['error']} - {rdata['message']}")
if isinstance(evt, MessageEvent):
await evt.respond(f"{rdata['error']} - {rdata['message']}")
return None
return rdata
return None
# Main !twitch command handler
@command.new(name="twitch", help="Get the status of a Twitch channel")
@command.argument("channel", required=True, pass_raw=True)
async def twitch(self, evt: MessageEvent, channel: str) -> None:
await self.get_status(evt, channel)
# Get the status of a Twitch channel
async def get_status(self, evt: MessageEvent | None, channel: str) -> None:
if (len(channel) < 1):
if isinstance(evt, MessageEvent):
await evt.respond("You forgot to give me a channel name")
return
# Get stream info
params = { "user_login": channel, "type": "live" }
streams = await self.twitch_api("GET", "/streams", params, None, evt)
if not streams:
return
# If there are no live streams, return channel info instead
if (len(streams["data"]) < 1):
params = { "login": channel }
users = await self.twitch_api("GET", "/users", params, None, evt)
if not users:
return
if (len(users["data"]) < 1):
if isinstance(evt, MessageEvent):
await evt.respond(f"Could not find user: {channel}")
return
user = users["data"][0]
# Upload channel image
if (len(user["offline_image_url"]) > 0):
mxc = await self.upload_img(user["offline_image_url"])
elif (len(user["profile_image_url"]) > 0):
mxc = await self.upload_img(user["profile_image_url"])
else:
mxc = ""
if (len(mxc) > 0):
img_html = f"<img width=400 src='{mxc}' />"
else:
img_html = ""
# Mention room user if defined
if user["login"] in self.config["matrix_users"]:
user_html = f"<a href='https://matrix.to/#/{self.config['matrix_users'][user['login']]}'>{user['display_name']}</a>"
else:
user_html = user["display_name"]
# Construct and send response
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
format=Format.HTML,
body=f"> {user['display_name']} is offline.\n> **[{user['display_name']}](https://twitch.tv/{user['login']})**\n> {user['description']}\n> **Total Views:** {user['view_count']}",
formatted_body=f"<h5><span data-mx-color='#FFFFFF'>{user_html} is offline.</span></h5><blockquote><h4><a href='https://twitch.tv/{user['login']}'>{user['display_name']}</a></h4><p>{user['description']}</p>{img_html}</blockquote>"
)
if isinstance(evt, MessageEvent):
await evt.respond(content)
else:
await self.client.send_message(self.config["notify_room"], content)
return
# Use game name as title if no title is provided
stream = streams["data"][0]
if (len(stream["title"]) < 1):
stream["title"] = stream["game_name"]
# Upload thumbnail
mxc = await self.upload_img(stream["thumbnail_url"].format(width="400", height="225"))
if (len(mxc) > 0):
img_html = f"<img src='{mxc}' />"
else:
img_html = ""
# Mention room user if defined
if stream["user_login"] in self.config["matrix_users"]:
user_html = f"<a href='https://matrix.to/#/{self.config['matrix_users'][stream['user_login']]}'>{stream['user_name']}</a>"
else:
user_html = stream["user_name"]
# Construct and send response
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
format=Format.HTML,
body=f"> {stream['user_name']} is now live on Twitch!\n> **[{stream['title']}](https://twitch.tv/{stream['user_login']})**\n> **Game:** {stream['game_name']}\n> **Viewers:** {stream['viewer_count']}",
formatted_body=f"<h5><span data-mx-color='#FFFFFF'>{user_html} is now live on Twitch!</span></h5><blockquote><h4><a href='https://twitch.tv/{stream['user_login']}'>{stream['title']}</a></h4><p><b><span data-mx-color='#FFFFFF'>Game:</span></b> {stream['game_name']}<br><b><span data-mx-color='#FFFFFF'>Viewers:</span></b> {stream['viewer_count']}</p>{img_html}</blockquote>"
)
if isinstance(evt, MessageEvent):
await evt.respond(content)
else:
await self.client.send_message(self.config["notify_room"], content)
# Register channel notifications and/or unregister old subscriptions
async def register_notifications(self) -> None:
self.log.debug("Registering notifications...")
# Exit immediately if client_id and client_secret aren't set
if ("client_id" not in self.config or "client_secret" not in self.config or len(self.config["client_id"]) < 1 or len(self.config["client_secret"]) < 1):
return
# Get sub list from config
config_subs = self.config["notify_channels"].copy()
# Get subs from DB
db_subs = await self.database.fetch("SELECT * FROM twitchbot_subs")
# Get current sub list from API
register_error = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body="I encountered an issue while registering Twitch notifications. Check the logs for more details."
)
response = await self.twitch_api("GET", "/eventsub/subscriptions", None, None, None)
if not response:
await self.client.send_message(self.config["notify_room"], register_error)
return
api_subs = response["data"]
# Handle paginated results
while (response["pagination"] and response["pagination"]["cursor"]):
params = { "after": response["pagination"]["cursor"] }
response = await self.twitch_api("GET", "/eventsub/subscriptions", params, None, None)
if not response:
await self.client.send_message(self.config["notify_room"], register_error)
return
api_subs = api_subs + response["data"]
# Check current subs for ones that need removal
for sub in api_subs:
# Ignore subs that aren't for our instance
if (sub["transport"]["callback"] != str(self.webapp_url) + "/stream-notify"):
continue
# Locate corresponding sub from DB
db_sub = None
for i in db_subs:
if (i["id"] == sub["id"]):
db_sub = i
break
# Unsubscribe from the notification if we have no record of it, or if it's no longer in the config list
if (db_sub is None or db_sub["login"] not in config_subs):
params = { "id": sub["id"] }
await self.twitch_api("DELETE", "/eventsub/subscriptions", params, None, None)
# Remove record from the DB
if db_sub:
self.log.info(f"Unsubscribed from channel {db_sub['login']}")
db_subs.remove(db_sub)
qd = "DELETE FROM twitchbot_subs WHERE id = $1"
await self.database.execute(qd, sub["id"])
else:
self.log.info(f"Unsubscribed from user ID {sub['condition']['broadcaster_user_id']}")
continue
# Otherwise, this sub checks out so remove it from the list of subs that need processing
config_subs.remove(db_sub["login"])
db_subs.remove(db_sub)
webhook_secret = await self.get_webhook_secret()
# Register remaining subs from the config
for sub in config_subs:
# Check if there is a corresponding DB entry
db_sub = None
for i in db_subs:
if (i["login"] == sub):
db_sub = i
break
# Lookup channel name
params = { "login": sub }
users = await self.twitch_api("GET", "/users", params, None, None)
if not users or len(users["data"]) < 1:
if db_sub:
if (sub in self.config["matrix_users"]):
user_html = f"<a href='https://matrix.to/#/{self.config['matrix_users'][sub]}'>{db_sub['name']}</a>"
else:
user_html = db_sub["name"]
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
format=Format.HTML,
body=f"Lost Twitch channel notification subscription for {db_sub['name']}. Check the logs for more details.",
formatted_body=f"<p>Lost Twitch channel notification subscription for {user_html}. Check the logs for more details.</p>"
)
await self.client.send_message(self.config["notify_room"], content)
else:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body=f"Could not subscribe to Twitch channel '{sub}'. Are you sure it exists?"
)
await self.client.send_message(self.config["notify_room"], content)
continue
user = users["data"][0]
# Register subscription
req = {
"type": "stream.online",
"version": "1",
"condition": {
"broadcaster_user_id": user["id"]
},
"transport": {
"method": "webhook",
"callback": str(self.webapp_url) + "/stream-notify",
"secret": webhook_secret
}
}
response = await self.twitch_api("POST", "/eventsub/subscriptions", None, req, None)
if not response:
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
body=f"I encountered an issue subscribing to channel '{sub}'. Check the logs for more details."
)
await self.client.send_message(self.config["notify_room"], content)
continue
# Add new subscription to DB
self.log.info(f"Storing new subscription for channel {sub}")
q = "INSERT INTO twitchbot_subs (id, login, name) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET login=excluded.login, name=excluded.name"
await self.database.execute(q, response["data"][0]["id"], sub, user["display_name"])
# Clean up remaining stale DB entries
for sub in db_subs:
self.log.info(f"Cleaning stale DB entry for channel {sub['login']}")
qd = "DELETE FROM twitchbot_subs WHERE id = $1"
await self.database.execute(qd, sub["id"])
# Webhook handler for live stream notifications
@web.post("/stream-notify")
async def stream_notify(self, req: Request) -> Response:
self.log.debug("Handling webhook event...")
# Check for necessary headers
id = req.headers.get("Twitch-Eventsub-Message-Id")
timestamp = req.headers.get("Twitch-Eventsub-Message-Timestamp")
signature = req.headers.get("Twitch-Eventsub-Message-Signature")
type = req.headers.get("Twitch-Eventsub-Message-Type")
if (id is None or timestamp is None or signature is None or type is None):
self.log.info("Webhook event is missing required headers")
return Response(status=403, text="Missing required headers")
# Compute HMAC and validate against the one supplied
body = await req.text()
data = await req.json()
secret = await self.get_webhook_secret()
my_hmac = hmac.new(secret.encode(), id.encode(), hashlib.sha256)
my_hmac.update(timestamp.encode())
my_hmac.update(body.encode())
if (hmac.compare_digest("sha256=" + my_hmac.hexdigest(), signature) == False):
self.log.info("Webhook event failed validation")
return Response(status=403, text="Invalid HMAC signature")
# Handle challenge requests
if (type == "webhook_callback_verification"):
self.log.info("Responded to webhook challenge")
return Response(status=200, text=data["challenge"])
# Handle subscription revocations
if (type == "revocation"):
reasons = {
"user_removed": "Channel no longer exists",
"authorization_revoked": "API access has been revoked (try resetting the bot)",
"notification_failures_exceeded": "Notification responses have been too slow (consider a hardware upgrade?)",
"version_removed": "This type of subscription is no longer available - please file a bug report"
}
if (data["subscription"]["status"] in reasons):
reason = reasons[data["subscription"]["status"]]
else:
reason = data["subscription"]["status"]
# Get subscription data from DB and send channel notification
q = "SELECT * FROM twitchbot_subs WHERE id = $1"
row = await self.database.fetchrow(q, data["subscription"]["id"])
if row:
self.log.warning(f"Subscription to channel '{row['login']}' revoked.")
user_name = row["name"]
if (row["login"] in self.config["matrix_users"]):
user_html = f"<a href='https://matrix.to/#/{self.config['matrix_users'][row['login']]}'>{row['name']}</a>"
else:
user_html = row["name"]
# Delete subscription from DB
qd = "DELETE FROM twitchbot_subs WHERE id = $1"
await self.database.execute(qd, data["subscription"]["id"])
else:
self.log.warning(f"Subscription to user ID {data['subscription']['condition']['broadcaster_user_id']} revoked.")
user_name = f"ID {data['subscription']['condition']['broadcaster_user_id']}"
user_html = user_name
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
format=Format.HTML,
body=f"Lost Twitch channel notification subscription for {user_name}: {reason}",
formatted_body=f"<p>Lost Twitch channel notification subscription for {user_html}: {reason}</p>"
)
await self.client.send_message(self.config["notify_room"], content)
# Handle notifications
if (type == "notification"):
self.log.info(f"Sending channel notification for {data['event']['broadcaster_user_login']}")
await self.get_status(None, data["event"]["broadcaster_user_login"])
return Response(status=204)