|
| 1 | +pub mod websocket; |
| 2 | + |
| 3 | +use std::sync::Arc; |
| 4 | + |
| 5 | +use clap::Parser; |
| 6 | +use eyre::WrapErr as _; |
| 7 | +use tokio::sync::Mutex; |
| 8 | +use twitch_api::{ |
| 9 | + client::ClientDefault, |
| 10 | + eventsub::{self, Event, Message, Payload}, |
| 11 | + HelixClient, |
| 12 | +}; |
| 13 | +use twitch_oauth2::{Scope, TwitchToken as _, UserToken}; |
| 14 | + |
| 15 | +#[derive(Parser, Debug, Clone)] |
| 16 | +#[clap(about, version)] |
| 17 | +pub struct Cli { |
| 18 | + /// Client ID of twitch application |
| 19 | + #[clap(long, env, hide_env = true)] |
| 20 | + pub client_id: twitch_oauth2::ClientId, |
| 21 | + #[clap(long, env, hide_env = true)] |
| 22 | + pub broadcaster_login: twitch_api::types::UserName, |
| 23 | + /// Path to config file |
| 24 | + #[clap(long, default_value = concat!(env!("CARGO_MANIFEST_DIR"), "/config.toml"))] |
| 25 | + pub config: std::path::PathBuf, |
| 26 | +} |
| 27 | + |
| 28 | +#[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug)] |
| 29 | +pub struct Config { |
| 30 | + command: Vec<Command>, |
| 31 | +} |
| 32 | + |
| 33 | +#[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug)] |
| 34 | +pub struct Command { |
| 35 | + pub trigger: String, |
| 36 | + pub response: String, |
| 37 | +} |
| 38 | + |
| 39 | +impl Config { |
| 40 | + pub fn load(path: &std::path::Path) -> Result<Self, eyre::Report> { |
| 41 | + let config = std::fs::read_to_string(path)?; |
| 42 | + toml::from_str(&config).wrap_err("Failed to parse config") |
| 43 | + } |
| 44 | +} |
| 45 | + |
| 46 | +#[tokio::main] |
| 47 | +async fn main() -> Result<(), eyre::Report> { |
| 48 | + color_eyre::install()?; |
| 49 | + tracing_subscriber::fmt::fmt() |
| 50 | + .with_writer(std::io::stderr) |
| 51 | + .init(); |
| 52 | + _ = dotenvy::dotenv(); |
| 53 | + let opts = Cli::parse(); |
| 54 | + let config = Config::load(&opts.config)?; |
| 55 | + |
| 56 | + let client: HelixClient<reqwest::Client> = twitch_api::HelixClient::with_client( |
| 57 | + ClientDefault::default_client_with_name(Some("my_chatbot".parse()?))?, |
| 58 | + ); |
| 59 | + |
| 60 | + // First we need to get a token, preferably you'd also store this information somewhere safe to reuse when restarting the application. |
| 61 | + // For now we'll just get a new token every time the application starts. |
| 62 | + // One way to store the token is to store the access_token and refresh_token in a file and load it when the application starts with |
| 63 | + // `twitch_oauth2::UserToken::from_existing` |
| 64 | + let mut builder = twitch_oauth2::tokens::DeviceUserTokenBuilder::new( |
| 65 | + opts.client_id.clone(), |
| 66 | + vec![Scope::UserReadChat, Scope::UserWriteChat], |
| 67 | + ); |
| 68 | + let code = builder.start(&client).await?; |
| 69 | + println!("Please go to: {}", code.verification_uri); |
| 70 | + let token = builder.wait_for_code(&client, tokio::time::sleep).await?; |
| 71 | + |
| 72 | + let Some(twitch_api::helix::users::User { |
| 73 | + id: broadcaster, .. |
| 74 | + }) = client |
| 75 | + .get_user_from_login(&opts.broadcaster_login, &token) |
| 76 | + .await? |
| 77 | + else { |
| 78 | + eyre::bail!( |
| 79 | + "No broadcaster found with login: {}", |
| 80 | + opts.broadcaster_login |
| 81 | + ); |
| 82 | + }; |
| 83 | + let token = Arc::new(Mutex::new(token)); |
| 84 | + |
| 85 | + let bot = Bot { |
| 86 | + opts, |
| 87 | + client, |
| 88 | + token, |
| 89 | + config, |
| 90 | + broadcaster, |
| 91 | + }; |
| 92 | + bot.start().await?; |
| 93 | + Ok(()) |
| 94 | +} |
| 95 | + |
| 96 | +pub struct Bot { |
| 97 | + pub opts: Cli, |
| 98 | + pub client: HelixClient<'static, reqwest::Client>, |
| 99 | + pub token: Arc<Mutex<twitch_oauth2::UserToken>>, |
| 100 | + pub config: Config, |
| 101 | + pub broadcaster: twitch_api::types::UserId, |
| 102 | +} |
| 103 | + |
| 104 | +impl Bot { |
| 105 | + pub async fn start(&self) -> Result<(), eyre::Report> { |
| 106 | + // To make a connection to the chat we need to use a websocket connection. |
| 107 | + // This is a wrapper for the websocket connection that handles the reconnects and handles all messages from eventsub. |
| 108 | + let websocket = websocket::ChatWebsocketClient { |
| 109 | + session_id: None, |
| 110 | + token: self.token.clone(), |
| 111 | + client: self.client.clone(), |
| 112 | + connect_url: twitch_api::TWITCH_EVENTSUB_WEBSOCKET_URL.clone(), |
| 113 | + chats: vec![self.broadcaster.clone()], |
| 114 | + }; |
| 115 | + let refresh_token = async move { |
| 116 | + let token = self.token.clone(); |
| 117 | + let client = self.client.clone(); |
| 118 | + // We check constantly if the token is valid. |
| 119 | + // We also need to refresh the token if it's about to be expired. |
| 120 | + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); |
| 121 | + loop { |
| 122 | + interval.tick().await; |
| 123 | + let mut token = token.lock().await; |
| 124 | + if token.expires_in() < std::time::Duration::from_secs(60) { |
| 125 | + token |
| 126 | + .refresh_token(&self.client) |
| 127 | + .await |
| 128 | + .wrap_err("couldn't refresh token")?; |
| 129 | + } |
| 130 | + token |
| 131 | + .validate_token(&client) |
| 132 | + .await |
| 133 | + .wrap_err("couldn't validate token")?; |
| 134 | + } |
| 135 | + #[allow(unreachable_code)] |
| 136 | + Ok(()) |
| 137 | + }; |
| 138 | + let ws = websocket.run(|e, ts| async { self.handle_event(e, ts).await }); |
| 139 | + futures::future::try_join(ws, refresh_token).await?; |
| 140 | + Ok(()) |
| 141 | + } |
| 142 | + |
| 143 | + async fn handle_event( |
| 144 | + &self, |
| 145 | + event: Event, |
| 146 | + timestamp: twitch_api::types::Timestamp, |
| 147 | + ) -> Result<(), eyre::Report> { |
| 148 | + let token = self.token.lock().await; |
| 149 | + match event { |
| 150 | + Event::ChannelChatMessageV1(Payload { |
| 151 | + message: Message::Notification(payload), |
| 152 | + subscription, |
| 153 | + .. |
| 154 | + }) => { |
| 155 | + println!( |
| 156 | + "[{}] {}: {}", |
| 157 | + timestamp, payload.chatter_user_name, payload.message.text |
| 158 | + ); |
| 159 | + if let Some(command) = payload.message.text.strip_prefix("!") { |
| 160 | + let mut split_whitespace = command.split_whitespace(); |
| 161 | + let command = split_whitespace.next().unwrap(); |
| 162 | + let rest = split_whitespace.next(); |
| 163 | + |
| 164 | + self.command(&payload, &subscription, command, rest, &token) |
| 165 | + .await?; |
| 166 | + } |
| 167 | + } |
| 168 | + Event::ChannelChatNotificationV1(Payload { |
| 169 | + message: Message::Notification(payload), |
| 170 | + .. |
| 171 | + }) => { |
| 172 | + println!( |
| 173 | + "[{}] {}: {}", |
| 174 | + timestamp, |
| 175 | + match &payload.chatter { |
| 176 | + eventsub::channel::chat::notification::Chatter::Chatter { |
| 177 | + chatter_user_name: user, |
| 178 | + .. |
| 179 | + } => user.as_str(), |
| 180 | + _ => "anonymous", |
| 181 | + }, |
| 182 | + payload.message.text |
| 183 | + ); |
| 184 | + } |
| 185 | + _ => {} |
| 186 | + } |
| 187 | + Ok(()) |
| 188 | + } |
| 189 | + |
| 190 | + async fn command( |
| 191 | + &self, |
| 192 | + payload: &eventsub::channel::ChannelChatMessageV1Payload, |
| 193 | + subscription: &eventsub::EventSubscriptionInformation< |
| 194 | + eventsub::channel::ChannelChatMessageV1, |
| 195 | + >, |
| 196 | + command: &str, |
| 197 | + _rest: Option<&str>, |
| 198 | + token: &UserToken, |
| 199 | + ) -> Result<(), eyre::Report> { |
| 200 | + tracing::info!("Command: {}", command); |
| 201 | + if let Some(response) = self.config.command.iter().find(|c| c.trigger == command) { |
| 202 | + self.client |
| 203 | + .send_chat_message_reply( |
| 204 | + &subscription.condition.broadcaster_user_id, |
| 205 | + &subscription.condition.user_id, |
| 206 | + &payload.message_id, |
| 207 | + response |
| 208 | + .response |
| 209 | + .replace("{user}", &payload.chatter_user_name.as_str()) |
| 210 | + .as_str(), |
| 211 | + token, |
| 212 | + ) |
| 213 | + .await?; |
| 214 | + } |
| 215 | + Ok(()) |
| 216 | + } |
| 217 | +} |
0 commit comments