117 lines
4.3 KiB
Python
117 lines
4.3 KiB
Python
|
"""
|
||
|
youtube - A maubot plugin to display YouTube video information
|
||
|
Copyright (C) 2025 L. Bradley LaBoon
|
||
|
|
||
|
This program is free software: you can redistribute it and/or modify
|
||
|
it under the terms of the GNU Affero General Public License version 3 as
|
||
|
published by the Free Software Foundation.
|
||
|
|
||
|
This program is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
GNU Affero General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU Affero General Public License
|
||
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
"""
|
||
|
from maubot import Plugin, MessageEvent
|
||
|
from maubot.handlers import command
|
||
|
from mautrix.types import TextMessageEventContent, MessageType, Format
|
||
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
||
|
from typing import Type
|
||
|
from yarl import URL
|
||
|
|
||
|
class Config(BaseProxyConfig):
|
||
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
||
|
helper.copy("api_key")
|
||
|
|
||
|
class YouTube(Plugin):
|
||
|
async def start(self) -> None:
|
||
|
self.config.load_and_update()
|
||
|
|
||
|
@classmethod
|
||
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
||
|
return Config
|
||
|
|
||
|
# Fetch an image URL, upload it to the matrix media store, and return the mcx:// address
|
||
|
async def upload_img(self, link: str) -> str:
|
||
|
image_req = await self.http.get(link)
|
||
|
if (image_req.status < 400):
|
||
|
image = await image_req.read()
|
||
|
mxc = await self.client.upload_media(image, filename=image_req.url.name)
|
||
|
return mxc
|
||
|
return ""
|
||
|
|
||
|
# Main !youtube command handler
|
||
|
@command.new(name="youtube", aliases=["yt"], help="Display the details of a YouTube video")
|
||
|
@command.argument("video", required=True, pass_raw=True)
|
||
|
async def youtube(self, evt: MessageEvent, video: str) -> None:
|
||
|
if (len(video) < 1):
|
||
|
await evt.respond("You forgot to give me a video ID or URL")
|
||
|
return
|
||
|
|
||
|
# Handle URLs
|
||
|
offset = ""
|
||
|
if "https://" in video:
|
||
|
video_url = URL(video)
|
||
|
video_params = video_url.query
|
||
|
if (video_url.host == "youtube.com" or video_url.host == "www.youtube.com"):
|
||
|
if "shorts" in video_url.parts:
|
||
|
video_id = video_url.name
|
||
|
else:
|
||
|
if "v" in video_params:
|
||
|
video_id = video_params["v"]
|
||
|
else:
|
||
|
await evt.respond("Sorry, I couldn't find the video ID in the URL you gave me.")
|
||
|
return
|
||
|
if "t" in video_params:
|
||
|
offset = f"&t={video_params['t']}"
|
||
|
elif (video_url.host == "youtu.be"):
|
||
|
video_id = video_url.name
|
||
|
if "t" in video_params:
|
||
|
offset = f"&t={video_params['t']}"
|
||
|
else:
|
||
|
await evt.respond("Sorry, I don't know how to parse that URL.")
|
||
|
return
|
||
|
else:
|
||
|
video_id = video
|
||
|
|
||
|
# Fetch video info
|
||
|
params = {
|
||
|
"key": self.config["api_key"],
|
||
|
"part": "snippet",
|
||
|
"id": video_id
|
||
|
}
|
||
|
async with self.http.get("https://www.googleapis.com/youtube/v3/videos", params=params) as response:
|
||
|
videos = await response.json()
|
||
|
if "error" in videos:
|
||
|
self.log.error(f"GET {str(response.url)}: {response.status} {videos['error']['message']}")
|
||
|
await evt.respond(f"Error: {videos['error']['message']}")
|
||
|
return
|
||
|
elif response.status >= 400:
|
||
|
self.log.error(f"GET {str(response.url)}: {response.status}")
|
||
|
await evt.respond("I ran into an issue fetching that video. Check the logs for more details.")
|
||
|
return
|
||
|
|
||
|
if (len(videos["items"]) < 1):
|
||
|
await evt.respond("Sorry, I couldn't find a video with that ID.")
|
||
|
return
|
||
|
|
||
|
video = videos["items"][0]
|
||
|
|
||
|
# Upload thumbnail
|
||
|
mxc = await self.upload_img(video["snippet"]["thumbnails"]["maxres"]["url"])
|
||
|
if (len(mxc) > 0):
|
||
|
img_html = f"<img width=400 src='{mxc}' />"
|
||
|
else:
|
||
|
img_html = ""
|
||
|
|
||
|
# Construct and send response
|
||
|
content = TextMessageEventContent(
|
||
|
msgtype=MessageType.NOTICE,
|
||
|
format=Format.HTML,
|
||
|
body=f"> YouTube\n> [{video['snippet']['channelTitle']}](https://www.youtube.com/channel/{video['snippet']['channelId']})\n> **[{video['snippet']['title']}](https://www.youtube.com/watch?v={video['id']}{offset})**",
|
||
|
formatted_body=f"<blockquote><h5>YouTube<br><br><a href='https://www.youtube.com/channel/{video['snippet']['channelId']}'><span data-mx-color='#FFFFFF'>{video['snippet']['channelTitle']}</span></a></h5><h4><a href='https://www.youtube.com/watch?v={video['id']}{offset}'>{video['snippet']['title']}</a></h4>{img_html}</blockquote>"
|
||
|
)
|
||
|
await evt.respond(content)
|