2025-01-14 00:55:26 -05:00
'''
twitchbot - A maubot plugin for sending Twitch stream notifications
Copyright ( C ) 2025 L . Bradley LaBoon
This program is free software : you can redistribute it and / or modify
it under the terms of the GNU Affero General Public License version 3 as
published by the Free Software Foundation .
This program is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU Affero General Public License for more details .
You should have received a copy of the GNU Affero General Public License
along with this program . If not , see < https : / / www . gnu . org / licenses / > .
'''
from aiohttp . web import Request , Response
from asyncio import Task , CancelledError , sleep
from maubot import Plugin , MessageEvent
from maubot . handlers import command , web
from mautrix . types import TextMessageEventContent , MessageType , Format
from mautrix . util import background_task
from mautrix . util . async_db import UpgradeTable , Connection
from mautrix . util . config import BaseProxyConfig , ConfigUpdateHelper
from secrets import token_hex
from typing import Type
import hashlib
import hmac
upgrade_table = UpgradeTable ( )
@upgrade_table.register ( description = " Initial version " )
async def upgrade_v1 ( conn : Connection ) - > None :
await conn . execute ( " CREATE TABLE twitchbot_data (key TEXT PRIMARY KEY, value TEXT NOT NULL) " )
await conn . execute ( " CREATE TABLE twitchbot_subs (id TEXT PRIMARY KEY, login TEXT NOT NULL, name TEXT NOT NULL) " )
class Config ( BaseProxyConfig ) :
def do_update ( self , helper : ConfigUpdateHelper ) - > None :
helper . copy ( " client_id " )
helper . copy ( " client_secret " )
helper . copy ( " notify_channels " )
helper . copy ( " matrix_users " )
helper . copy ( " notify_room " )
class TwitchBot ( Plugin ) :
access_check_loop : Task
# On startup: refresh the config, start the access token check loop, and register stream notifications
async def start ( self ) - > None :
self . config . load_and_update ( )
self . access_check_loop = background_task . create ( self . check_access_token ( ) )
background_task . create ( self . register_notifications ( ) )
# On config update: refresh the config and re-register stream notifications
def on_external_config_update ( self ) - > None :
self . config . load_and_update ( )
background_task . create ( self . register_notifications ( ) )
# On shutdown: stop the access check loop
async def stop ( self ) - > None :
self . access_check_loop . cancel ( )
@classmethod
def get_config_class ( cls ) - > Type [ BaseProxyConfig ] :
return Config
@classmethod
def get_db_upgrade_table ( cls ) - > UpgradeTable | None :
return upgrade_table
# Check that we have a valid access token and get a new one if we don't
async def check_access_token ( self ) - > None :
try :
self . log . debug ( " Access check loop started. " )
while True :
self . log . debug ( " Performing access token check... " )
q = " SELECT value FROM twitchbot_data WHERE key = $1 "
row = await self . database . fetchrow ( q , " access_token " )
if row :
headers = { " Authorization " : f " Bearer { row [ ' value ' ] } " }
async with self . http . get ( " https://id.twitch.tv/oauth2/validate " , headers = headers ) as response :
result = await response . json ( )
if ( response . status == 401 ) :
self . log . info ( " Access token is invalid, fetching a new one... " )
await self . get_access_token ( )
else :
self . log . debug ( " Current access token is valid! " )
else :
self . log . info ( " Access token is missing, fetching a new one... " )
await self . get_access_token ( )
# Sleep for an hour
await sleep ( 3600 )
except CancelledError :
self . log . debug ( " Access check loop stopped. " )
# Get a new access token
async def get_access_token ( self ) - > str :
data = {
" client_id " : self . config [ " client_id " ] ,
" client_secret " : self . config [ " client_secret " ] ,
" grant_type " : " client_credentials "
}
async with self . http . post ( " https://id.twitch.tv/oauth2/token " , data = data ) as response :
result = await response . json ( )
if ( response . status > = 400 ) :
self . log . error ( f " Error getting access token: { response . status } - { result [ ' message ' ] } " )
return " "
# Store new token in the DB
q = " INSERT INTO twitchbot_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=excluded.value "
await self . database . execute ( q , " access_token " , result [ " access_token " ] )
self . log . info ( " Stored new access token " )
return result [ " access_token " ]
# Get or generate webhook secret
async def get_webhook_secret ( self ) - > str :
q = " SELECT value FROM twitchbot_data WHERE key = $1 "
row = await self . database . fetchrow ( q , " webhook_secret " )
if row :
return row [ " value " ]
else :
self . log . info ( " No webhook secret found. Generating a new one... " )
secret = token_hex ( 32 )
qi = " INSERT INTO twitchbot_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=excluded.value "
await self . database . execute ( qi , " webhook_secret " , secret )
self . log . info ( " Stored new webhook secret " )
return secret
# Fetch an image URL, upload it to the matrix media store, and return the mcx:// address
async def upload_img ( self , link : str ) - > str :
image_req = await self . http . get ( link )
if ( image_req . status < 400 ) :
image = await image_req . read ( )
mxc = await self . client . upload_media ( image , filename = image_req . url . name )
return mxc
return " "
# Make a Twitch API call and return the data
async def twitch_api ( self , method : str , endpoint : str , params : dict | None , data : dict | None , evt : MessageEvent | None ) - > dict | None :
# Get access token from DB
q = " SELECT value FROM twitchbot_data WHERE key = $1 "
row = await self . database . fetchrow ( q , " access_token " )
if row :
access_token = row [ " value " ]
else :
self . log . info ( " Access token is missing, fetching a new one... " )
access_token = await self . get_access_token ( )
if ( len ( access_token ) < 1 ) :
if isinstance ( evt , MessageEvent ) :
await evt . respond ( " I seem to be having trouble authenticating to the Twitch API. Check the logs for more details. " )
return None
if ( method == " GET " ) :
httpfunc = self . http . get
elif ( method == " POST " ) :
httpfunc = self . http . post
elif ( method == " DELETE " ) :
httpfunc = self . http . delete
else :
self . log . error ( f " Unknown method: { method } " )
return None
headers = {
" Authorization " : f " Bearer { access_token } " ,
" Client-Id " : self . config [ " client_id " ]
}
retry = 2
while ( retry > 0 ) :
async with httpfunc ( " https://api.twitch.tv/helix " + endpoint , headers = headers , params = params , json = data ) as response :
if ( response . status == 204 ) :
rdata = { }
else :
rdata = await response . json ( )
if ( response . status == 401 ) :
self . log . info ( " Access token is invalid, fetching a new one... " )
access_token = await self . get_access_token ( )
if ( len ( access_token ) < 1 ) :
if isinstance ( evt , MessageEvent ) :
await evt . respond ( " I seem to be having trouble authenticating to the Twitch API. Check the logs for more details. " )
return None
headers [ " Authorization " ] = f " Bearer { access_token } "
retry = retry - 1
continue
elif ( response . status > = 400 ) :
# For some reason the /streams endpoint returns 400 for nonexistent or deactivated channels
if ( endpoint == " /streams " and rdata [ " message " ] == " Malformed query params. " ) :
if isinstance ( evt , MessageEvent ) :
await evt . respond ( " Sorry, I can ' t seem to find that channel. " )
else :
self . log . warning ( f " Channel ' { params [ ' user_login ' ] } ' does not exist. " )
return None
self . log . error ( f " { method } { str ( response . url ) } : { response . status } { rdata [ ' error ' ] } - { rdata [ ' message ' ] } " )
if isinstance ( evt , MessageEvent ) :
await evt . respond ( f " { rdata [ ' error ' ] } - { rdata [ ' message ' ] } " )
return None
return rdata
return None
# Main !twitch command handler
@command.new ( name = " twitch " , help = " Get the status of a Twitch channel " )
@command.argument ( " channel " , required = True , pass_raw = True )
async def twitch ( self , evt : MessageEvent , channel : str ) - > None :
await self . get_status ( evt , channel )
# Get the status of a Twitch channel
async def get_status ( self , evt : MessageEvent | None , channel : str ) - > None :
if ( len ( channel ) < 1 ) :
if isinstance ( evt , MessageEvent ) :
await evt . respond ( " You forgot to give me a channel name " )
return
# Get stream info
params = { " user_login " : channel , " type " : " live " }
streams = await self . twitch_api ( " GET " , " /streams " , params , None , evt )
if not streams :
return
# If there are no live streams, return channel info instead
if ( len ( streams [ " data " ] ) < 1 ) :
params = { " login " : channel }
users = await self . twitch_api ( " GET " , " /users " , params , None , evt )
if not users :
return
if ( len ( users [ " data " ] ) < 1 ) :
if isinstance ( evt , MessageEvent ) :
await evt . respond ( f " Could not find user: { channel } " )
return
user = users [ " data " ] [ 0 ]
# Upload channel image
if ( len ( user [ " offline_image_url " ] ) > 0 ) :
mxc = await self . upload_img ( user [ " offline_image_url " ] )
elif ( len ( user [ " profile_image_url " ] ) > 0 ) :
mxc = await self . upload_img ( user [ " profile_image_url " ] )
else :
mxc = " "
if ( len ( mxc ) > 0 ) :
img_html = f " <img width=400 src= ' { mxc } ' /> "
else :
img_html = " "
# Mention room user if defined
if user [ " login " ] in self . config [ " matrix_users " ] :
user_html = f " <a href= ' https://matrix.to/#/ { self . config [ ' matrix_users ' ] [ user [ ' login ' ] ] } ' > { user [ ' display_name ' ] } </a> "
else :
user_html = user [ " display_name " ]
# Construct and send response
content = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
format = Format . HTML ,
body = f " > { user [ ' display_name ' ] } is offline. \n > **[ { user [ ' display_name ' ] } ](https://twitch.tv/ { user [ ' login ' ] } )** \n > { user [ ' description ' ] } \n > **Total Views:** { user [ ' view_count ' ] } " ,
formatted_body = f " <h5><span data-mx-color= ' #FFFFFF ' > { user_html } is offline.</span></h5><blockquote><h4><a href= ' https://twitch.tv/ { user [ ' login ' ] } ' > { user [ ' display_name ' ] } </a></h4><p> { user [ ' description ' ] } </p> { img_html } </blockquote> "
)
if isinstance ( evt , MessageEvent ) :
await evt . respond ( content )
else :
await self . client . send_message ( self . config [ " notify_room " ] , content )
return
# Use game name as title if no title is provided
stream = streams [ " data " ] [ 0 ]
if ( len ( stream [ " title " ] ) < 1 ) :
stream [ " title " ] = stream [ " game_name " ]
# Upload thumbnail
mxc = await self . upload_img ( stream [ " thumbnail_url " ] . format ( width = " 400 " , height = " 225 " ) )
if ( len ( mxc ) > 0 ) :
img_html = f " <img src= ' { mxc } ' /> "
else :
img_html = " "
# Mention room user if defined
if stream [ " user_login " ] in self . config [ " matrix_users " ] :
user_html = f " <a href= ' https://matrix.to/#/ { self . config [ ' matrix_users ' ] [ stream [ ' user_login ' ] ] } ' > { stream [ ' user_name ' ] } </a> "
else :
user_html = stream [ " user_name " ]
# Construct and send response
content = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
format = Format . HTML ,
body = f " > { stream [ ' user_name ' ] } is now live on Twitch! \n > **[ { stream [ ' title ' ] } ](https://twitch.tv/ { stream [ ' user_login ' ] } )** \n > **Game:** { stream [ ' game_name ' ] } \n > **Viewers:** { stream [ ' viewer_count ' ] } " ,
formatted_body = f " <h5><span data-mx-color= ' #FFFFFF ' > { user_html } is now live on Twitch!</span></h5><blockquote><h4><a href= ' https://twitch.tv/ { stream [ ' user_login ' ] } ' > { stream [ ' title ' ] } </a></h4><p><b><span data-mx-color= ' #FFFFFF ' >Game:</span></b> { stream [ ' game_name ' ] } <br><b><span data-mx-color= ' #FFFFFF ' >Viewers:</span></b> { stream [ ' viewer_count ' ] } </p> { img_html } </blockquote> "
)
if isinstance ( evt , MessageEvent ) :
await evt . respond ( content )
else :
await self . client . send_message ( self . config [ " notify_room " ] , content )
# Register channel notifications and/or unregister old subscriptions
async def register_notifications ( self ) - > None :
self . log . debug ( " Registering notifications... " )
# Exit immediately if client_id and client_secret aren't set
if ( " client_id " not in self . config or " client_secret " not in self . config or len ( self . config [ " client_id " ] ) < 1 or len ( self . config [ " client_secret " ] ) < 1 ) :
return
# Get sub list from config
config_subs = self . config [ " notify_channels " ] . copy ( )
# Get subs from DB
db_subs = await self . database . fetch ( " SELECT * FROM twitchbot_subs " )
# Get current sub list from API
register_error = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
body = " I encountered an issue while registering Twitch notifications. Check the logs for more details. "
)
response = await self . twitch_api ( " GET " , " /eventsub/subscriptions " , None , None , None )
if not response :
await self . client . send_message ( self . config [ " notify_room " ] , register_error )
return
api_subs = response [ " data " ]
# Handle paginated results
while ( response [ " pagination " ] and response [ " pagination " ] [ " cursor " ] ) :
params = { " after " : response [ " pagination " ] [ " cursor " ] }
response = await self . twitch_api ( " GET " , " /eventsub/subscriptions " , params , None , None )
if not response :
await self . client . send_message ( self . config [ " notify_room " ] , register_error )
return
api_subs = api_subs + response [ " data " ]
# Check current subs for ones that need removal
for sub in api_subs :
# Ignore subs that aren't for our instance
2025-01-16 20:08:05 -05:00
if ( sub [ " transport " ] [ " callback " ] != str ( self . webapp_url ) + " stream-notify " ) :
2025-01-14 00:55:26 -05:00
continue
# Locate corresponding sub from DB
db_sub = None
for i in db_subs :
if ( i [ " id " ] == sub [ " id " ] ) :
db_sub = i
break
# Unsubscribe from the notification if we have no record of it, or if it's no longer in the config list
if ( db_sub is None or db_sub [ " login " ] not in config_subs ) :
params = { " id " : sub [ " id " ] }
await self . twitch_api ( " DELETE " , " /eventsub/subscriptions " , params , None , None )
# Remove record from the DB
if db_sub :
self . log . info ( f " Unsubscribed from channel { db_sub [ ' login ' ] } " )
db_subs . remove ( db_sub )
qd = " DELETE FROM twitchbot_subs WHERE id = $1 "
await self . database . execute ( qd , sub [ " id " ] )
else :
self . log . info ( f " Unsubscribed from user ID { sub [ ' condition ' ] [ ' broadcaster_user_id ' ] } " )
continue
# Otherwise, this sub checks out so remove it from the list of subs that need processing
config_subs . remove ( db_sub [ " login " ] )
db_subs . remove ( db_sub )
webhook_secret = await self . get_webhook_secret ( )
# Register remaining subs from the config
for sub in config_subs :
# Check if there is a corresponding DB entry
db_sub = None
for i in db_subs :
if ( i [ " login " ] == sub ) :
db_sub = i
break
# Lookup channel name
params = { " login " : sub }
users = await self . twitch_api ( " GET " , " /users " , params , None , None )
if not users or len ( users [ " data " ] ) < 1 :
if db_sub :
if ( sub in self . config [ " matrix_users " ] ) :
user_html = f " <a href= ' https://matrix.to/#/ { self . config [ ' matrix_users ' ] [ sub ] } ' > { db_sub [ ' name ' ] } </a> "
else :
user_html = db_sub [ " name " ]
content = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
format = Format . HTML ,
body = f " Lost Twitch channel notification subscription for { db_sub [ ' name ' ] } . Check the logs for more details. " ,
formatted_body = f " <p>Lost Twitch channel notification subscription for { user_html } . Check the logs for more details.</p> "
)
await self . client . send_message ( self . config [ " notify_room " ] , content )
else :
content = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
body = f " Could not subscribe to Twitch channel ' { sub } ' . Are you sure it exists? "
)
await self . client . send_message ( self . config [ " notify_room " ] , content )
continue
user = users [ " data " ] [ 0 ]
# Register subscription
req = {
" type " : " stream.online " ,
" version " : " 1 " ,
" condition " : {
" broadcaster_user_id " : user [ " id " ]
} ,
" transport " : {
" method " : " webhook " ,
2025-01-16 20:08:05 -05:00
" callback " : str ( self . webapp_url ) + " stream-notify " ,
2025-01-14 00:55:26 -05:00
" secret " : webhook_secret
}
}
response = await self . twitch_api ( " POST " , " /eventsub/subscriptions " , None , req , None )
if not response :
content = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
body = f " I encountered an issue subscribing to channel ' { sub } ' . Check the logs for more details. "
)
await self . client . send_message ( self . config [ " notify_room " ] , content )
continue
# Add new subscription to DB
self . log . info ( f " Storing new subscription for channel { sub } " )
q = " INSERT INTO twitchbot_subs (id, login, name) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET login=excluded.login, name=excluded.name "
await self . database . execute ( q , response [ " data " ] [ 0 ] [ " id " ] , sub , user [ " display_name " ] )
# Clean up remaining stale DB entries
for sub in db_subs :
self . log . info ( f " Cleaning stale DB entry for channel { sub [ ' login ' ] } " )
qd = " DELETE FROM twitchbot_subs WHERE id = $1 "
await self . database . execute ( qd , sub [ " id " ] )
# Webhook handler for live stream notifications
@web.post ( " /stream-notify " )
async def stream_notify ( self , req : Request ) - > Response :
self . log . debug ( " Handling webhook event... " )
# Check for necessary headers
id = req . headers . get ( " Twitch-Eventsub-Message-Id " )
timestamp = req . headers . get ( " Twitch-Eventsub-Message-Timestamp " )
signature = req . headers . get ( " Twitch-Eventsub-Message-Signature " )
type = req . headers . get ( " Twitch-Eventsub-Message-Type " )
if ( id is None or timestamp is None or signature is None or type is None ) :
self . log . info ( " Webhook event is missing required headers " )
return Response ( status = 403 , text = " Missing required headers " )
# Compute HMAC and validate against the one supplied
body = await req . text ( )
data = await req . json ( )
secret = await self . get_webhook_secret ( )
my_hmac = hmac . new ( secret . encode ( ) , id . encode ( ) , hashlib . sha256 )
my_hmac . update ( timestamp . encode ( ) )
my_hmac . update ( body . encode ( ) )
if ( hmac . compare_digest ( " sha256= " + my_hmac . hexdigest ( ) , signature ) == False ) :
self . log . info ( " Webhook event failed validation " )
return Response ( status = 403 , text = " Invalid HMAC signature " )
# Handle challenge requests
if ( type == " webhook_callback_verification " ) :
self . log . info ( " Responded to webhook challenge " )
return Response ( status = 200 , text = data [ " challenge " ] )
# Handle subscription revocations
if ( type == " revocation " ) :
reasons = {
" user_removed " : " Channel no longer exists " ,
" authorization_revoked " : " API access has been revoked (try resetting the bot) " ,
" notification_failures_exceeded " : " Notification responses have been too slow (consider a hardware upgrade?) " ,
" version_removed " : " This type of subscription is no longer available - please file a bug report "
}
if ( data [ " subscription " ] [ " status " ] in reasons ) :
reason = reasons [ data [ " subscription " ] [ " status " ] ]
else :
reason = data [ " subscription " ] [ " status " ]
# Get subscription data from DB and send channel notification
q = " SELECT * FROM twitchbot_subs WHERE id = $1 "
row = await self . database . fetchrow ( q , data [ " subscription " ] [ " id " ] )
if row :
self . log . warning ( f " Subscription to channel ' { row [ ' login ' ] } ' revoked. " )
user_name = row [ " name " ]
if ( row [ " login " ] in self . config [ " matrix_users " ] ) :
user_html = f " <a href= ' https://matrix.to/#/ { self . config [ ' matrix_users ' ] [ row [ ' login ' ] ] } ' > { row [ ' name ' ] } </a> "
else :
user_html = row [ " name " ]
# Delete subscription from DB
qd = " DELETE FROM twitchbot_subs WHERE id = $1 "
await self . database . execute ( qd , data [ " subscription " ] [ " id " ] )
else :
self . log . warning ( f " Subscription to user ID { data [ ' subscription ' ] [ ' condition ' ] [ ' broadcaster_user_id ' ] } revoked. " )
user_name = f " ID { data [ ' subscription ' ] [ ' condition ' ] [ ' broadcaster_user_id ' ] } "
user_html = user_name
content = TextMessageEventContent (
msgtype = MessageType . NOTICE ,
format = Format . HTML ,
body = f " Lost Twitch channel notification subscription for { user_name } : { reason } " ,
formatted_body = f " <p>Lost Twitch channel notification subscription for { user_html } : { reason } </p> "
)
await self . client . send_message ( self . config [ " notify_room " ] , content )
# Handle notifications
if ( type == " notification " ) :
self . log . info ( f " Sending channel notification for { data [ ' event ' ] [ ' broadcaster_user_login ' ] } " )
await self . get_status ( None , data [ " event " ] [ " broadcaster_user_login " ] )
return Response ( status = 204 )