From d59090dfc152e140e43a48f5f0ceba95ac725ff6 Mon Sep 17 00:00:00 2001 From: bdbai Date: Sat, 21 Jun 2025 16:03:23 +0800 Subject: [PATCH] refactor: (WIP) EventHandler trait --- mania-macros/src/lib.rs | 2 +- mania/src/core/business.rs | 115 +++++++++++---------- mania/src/core/business/caching_logic.rs | 6 +- mania/src/core/business/messaging_logic.rs | 22 ++-- mania/src/core/business/wt_logic.rs | 6 +- mania/src/core/event.rs | 2 +- mania/src/core/operation/cache_op.rs | 2 +- mania/src/core/operation/common_op.rs | 2 +- mania/src/core/operation/highway_op.rs | 2 +- mania/src/core/operation/wt_op.rs | 4 +- mania/src/event/mod.rs | 6 ++ mania/src/lib.rs | 40 ++----- 12 files changed, 103 insertions(+), 106 deletions(-) diff --git a/mania-macros/src/lib.rs b/mania-macros/src/lib.rs index 453edf9..ee7ebf7 100644 --- a/mania-macros/src/lib.rs +++ b/mania-macros/src/lib.rs @@ -229,7 +229,7 @@ pub fn handle_event(attr: TokenStream, item: TokenStream) -> TokenStream { fn #wrapper_fn_name<'a>( event: &'a mut dyn crate::core::event::ServerEvent, - handle: std::sync::Arc, + handle: std::sync::Arc>, flow: crate::core::business::LogicFlow, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(#fn_name(event, handle, flow)) diff --git a/mania/src/core/business.rs b/mania/src/core/business.rs index a33736a..108814f 100644 --- a/mania/src/core/business.rs +++ b/mania/src/core/business.rs @@ -61,7 +61,7 @@ impl Display for LogicFlow { type LogicHandleFn = for<'a> fn( &'a mut dyn ServerEvent, - Arc, + Arc>, LogicFlow, ) -> Pin< Box> + Send + 'a>, @@ -91,31 +91,36 @@ static LOGIC_MAP: Lazy = Lazy::new(|| { map }); -pub async fn dispatch_logic( +pub async fn dispatch_logic( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, flow: LogicFlow, ) -> Result<&dyn ServerEvent, BusinessError> { - let tid = event.as_any().type_id(); - if let Some(fns) = LOGIC_MAP.get(&tid) { - tracing::trace!("[{}] Found {} handlers for {:?}.", flow, fns.len(), event); - for handle_fn in fns.iter() { - handle_fn(event, handle.to_owned(), flow).await?; - } - } else { - tracing::trace!("[{}] No handler found for {:?}", flow, event); - } - Ok(event) + todo!() + //let tid = event.as_any().type_id(); + //if let Some(fns) = LOGIC_MAP.get(&tid) { + // tracing::trace!("[{}] Found {} handlers for {:?}.", flow, fns.len(), event); + // for handle_fn in fns.iter() { + // handle_fn(event, handle.to_owned(), flow).await?; + // } + //} else { + // tracing::trace!("[{}] No handler found for {:?}", flow, event); + //} + //Ok(event) } -pub struct Business { +pub struct Business { addr: SocketAddr, receiver: PacketReceiver, - handle: Arc, + handle: Arc>, } -impl Business { - pub async fn new(config: Arc, context: Arc) -> BusinessResult { +impl Business { + pub async fn new( + config: Arc, + context: Context, + handler: H, + ) -> BusinessResult { let addr = optimum_server(config.get_optimum_server, config.use_ipv6_network).await?; let (sender, receiver) = socket::connect(addr).await?; let event_dispatcher = EventDispatcher::new(); @@ -126,6 +131,7 @@ impl Business { pending_requests: DashMap::new(), context, cache: Arc::new(Cache::new(config.cache_mode)), // TODO: construct from context + event_handler: handler, event_dispatcher, event_listener, highway: Arc::new(Highway::default()), @@ -138,10 +144,44 @@ impl Business { }) } - pub fn handle(&self) -> Arc { + pub fn handle(&self) -> Arc> { self.handle.clone() } + async fn try_reconnect(&mut self) -> BusinessResult<()> { + tracing::info!("Reconnecting to server: {}", self.addr); + + let (sender, receiver) = socket::connect(self.addr).await?; + self.handle.sender.store(Arc::new(sender)); + self.receiver = receiver; + + todo!( + "await Collection.Business.WtExchangeLogic.BotOnline(BotOnlineEvent.OnlineReason.Reconnect);" + ) + } + + async fn reconnect(&mut self) { + let handle = self.handle.clone(); + let reconnecting = handle.set_reconnecting().await; + let mut try_interval = Duration::from_secs(1); + loop { + match self.try_reconnect().await { + Ok(_) => break, + Err(e) => { + tracing::error!("Reconnect failed: {}", e); + tracing::info!("Retrying in {} seconds", try_interval.as_secs()); + tokio::time::sleep(try_interval).await; + if try_interval < Duration::from_secs(30) { + try_interval *= 2; + } + } + } + } + drop(reconnecting); + } +} + +impl Business { // TODO: decouple pub async fn spawn(&mut self) { let handle_packets = async { @@ -174,52 +214,21 @@ impl Business { _ = handle_packets => {} } } - - async fn try_reconnect(&mut self) -> BusinessResult<()> { - tracing::info!("Reconnecting to server: {}", self.addr); - - let (sender, receiver) = socket::connect(self.addr).await?; - self.handle.sender.store(Arc::new(sender)); - self.receiver = receiver; - - todo!( - "await Collection.Business.WtExchangeLogic.BotOnline(BotOnlineEvent.OnlineReason.Reconnect);" - ) - } - - async fn reconnect(&mut self) { - let handle = self.handle.clone(); - let reconnecting = handle.set_reconnecting().await; - let mut try_interval = Duration::from_secs(1); - loop { - match self.try_reconnect().await { - Ok(_) => break, - Err(e) => { - tracing::error!("Reconnect failed: {}", e); - tracing::info!("Retrying in {} seconds", try_interval.as_secs()); - tokio::time::sleep(try_interval).await; - if try_interval < Duration::from_secs(30) { - try_interval *= 2; - } - } - } - } - drop(reconnecting); - } } -pub struct BusinessHandle { +pub struct BusinessHandle { sender: ArcSwap, reconnecting: Mutex<()>, pending_requests: DashMap>>, - pub(crate) context: Arc, + pub(crate) context: Context, pub(crate) cache: Arc, + event_handler: H, pub(crate) event_dispatcher: EventDispatcher, pub event_listener: EventListener, pub(crate) highway: Arc, } -impl BusinessHandle { +impl BusinessHandle { /// Wait if the client is reconnecting. async fn wait_reconnecting(&self) { drop(self.reconnecting.lock().await); diff --git a/mania/src/core/business/caching_logic.rs b/mania/src/core/business/caching_logic.rs index bfa5bae..305eeab 100644 --- a/mania/src/core/business/caching_logic.rs +++ b/mania/src/core/business/caching_logic.rs @@ -9,7 +9,7 @@ use std::sync::Arc; #[handle_event(GroupSysIncreaseEvent, GroupSysDecreaseEvent)] async fn caching_logic( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, flow: LogicFlow, ) -> Result<&dyn ServerEvent, BusinessError> { match flow { @@ -18,9 +18,9 @@ async fn caching_logic( } } -async fn caching_logic_incoming( +async fn caching_logic_incoming( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, ) -> &dyn ServerEvent { match event { _ if let Some(increase) = event.as_any_mut().downcast_mut::() => { diff --git a/mania/src/core/business/messaging_logic.rs b/mania/src/core/business/messaging_logic.rs index 1cb2624..4c66eb8 100644 --- a/mania/src/core/business/messaging_logic.rs +++ b/mania/src/core/business/messaging_logic.rs @@ -75,9 +75,9 @@ use std::sync::Arc; FriendSysRequestEvent, BotSysRenameEvent )] -async fn messaging_logic( +async fn messaging_logic( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, flow: LogicFlow, ) -> Result<&dyn ServerEvent, BusinessError> { tracing::trace!("[{}] Handling event: {:?}", flow, event); @@ -89,9 +89,9 @@ async fn messaging_logic( // FIXME: avoid take things from event // FIXME: (TODO) make it return Result(?) -async fn messaging_logic_incoming( +async fn messaging_logic_incoming( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, ) -> &dyn ServerEvent { { if let Some(msg) = event.as_any_mut().downcast_mut::() { @@ -647,7 +647,7 @@ async fn messaging_logic_incoming( event } -async fn resolve_incoming_chain(chain: &mut MessageChain, handle: Arc) { +async fn resolve_incoming_chain(chain: &mut MessageChain, handle: Arc>) { for entity in &mut chain.entities { match *entity { Entity::Image(ref mut image) => { @@ -845,9 +845,9 @@ async fn resolve_incoming_chain(chain: &mut MessageChain, handle: Arc( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, ) -> Result<&dyn ServerEvent, BusinessError> { match event { _ if let Some(send) = event.as_any_mut().downcast_mut::() => { @@ -861,9 +861,9 @@ async fn messaging_logic_outgoing( } // TODO: error handling -async fn resolve_outgoing_chain( +async fn resolve_outgoing_chain( chain: &mut MessageChain, - handle: Arc, + handle: Arc>, ) -> Result<(), BusinessError> { let entities: &mut Vec = chain.entities.as_mut(); for entity in entities { @@ -908,9 +908,9 @@ async fn resolve_outgoing_chain( } // TODO: return result!!! -async fn resolve_chain_metadata( +async fn resolve_chain_metadata( chain: &mut MessageChain, - handle: Arc, + handle: Arc>, ) -> &mut MessageChain { match chain.typ { MessageType::Group(ref mut grp) diff --git a/mania/src/core/business/wt_logic.rs b/mania/src/core/business/wt_logic.rs index a5caa70..582e19e 100644 --- a/mania/src/core/business/wt_logic.rs +++ b/mania/src/core/business/wt_logic.rs @@ -10,7 +10,7 @@ use std::sync::Arc; #[handle_event(KickNTEvent)] async fn messaging_logic( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, flow: LogicFlow, ) -> Result<&dyn ServerEvent, BusinessError> { match flow { @@ -19,9 +19,9 @@ async fn messaging_logic( } } -async fn messaging_logic_incoming( +async fn messaging_logic_incoming( event: &mut dyn ServerEvent, - handle: Arc, + handle: Arc>, ) -> &dyn ServerEvent { match event { _ if let Some(kick) = event.as_any_mut().downcast_mut::() => { diff --git a/mania/src/core/event.rs b/mania/src/core/event.rs index 9341549..1edd0e9 100644 --- a/mania/src/core/event.rs +++ b/mania/src/core/event.rs @@ -70,7 +70,7 @@ static EVENT_MAP: Lazy = Lazy::new(|| { map }); -pub async fn resolve_event(packet: SsoPacket, context: &Arc) -> CEParseResult { +pub async fn resolve_event(packet: SsoPacket, context: &Context) -> CEParseResult { // Lagrange.Core.Internal.Context.ServiceContext.ResolveEventByPacket let payload = PacketReader::new(packet.payload()).section(|p| p.bytes()); let Some(parse) = EVENT_MAP.get(packet.command()) else { diff --git a/mania/src/core/operation/cache_op.rs b/mania/src/core/operation/cache_op.rs index 2832792..fe35bf7 100644 --- a/mania/src/core/operation/cache_op.rs +++ b/mania/src/core/operation/cache_op.rs @@ -11,7 +11,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; -impl BusinessHandle { +impl BusinessHandle { pub async fn uin2uid( self: &Arc, uin: u32, diff --git a/mania/src/core/operation/common_op.rs b/mania/src/core/operation/common_op.rs index e2aeb1d..8188dfb 100644 --- a/mania/src/core/operation/common_op.rs +++ b/mania/src/core/operation/common_op.rs @@ -22,7 +22,7 @@ use futures::future::join_all; use std::sync::Arc; use tokio::join; -impl BusinessHandle { +impl BusinessHandle { pub async fn fetch_rkey(self: &Arc) -> ManiaResult<()> { let mut fetch_event = FetchRKeyEvent {}; let res = self.send_event(&mut fetch_event).await?; diff --git a/mania/src/core/operation/highway_op.rs b/mania/src/core/operation/highway_op.rs index a74e183..08a71fe 100644 --- a/mania/src/core/operation/highway_op.rs +++ b/mania/src/core/operation/highway_op.rs @@ -41,7 +41,7 @@ use std::io::Cursor; use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncSeekExt}; -impl BusinessHandle { +impl BusinessHandle { async fn fetch_sig_session(self: &Arc) -> ManiaResult { let mut req = FetchHighwayTicketEvent::default(); let req = self.send_event(&mut req).await?; diff --git a/mania/src/core/operation/wt_op.rs b/mania/src/core/operation/wt_op.rs index d50ab16..446bdfd 100644 --- a/mania/src/core/operation/wt_op.rs +++ b/mania/src/core/operation/wt_op.rs @@ -20,7 +20,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio::time::{sleep, timeout}; -impl BusinessHandle { +impl BusinessHandle { pub fn update_key_store(&self) -> &KeyStore { &self.context.key_store } @@ -182,7 +182,9 @@ impl BusinessHandle { )), } } +} +impl BusinessHandle { pub async fn online(self: &Arc) -> ManiaResult> { let (tx, mut rx) = watch::channel::<()>(()); let res = self.send_event(&mut InfoSyncEvent).await?; diff --git a/mania/src/event/mod.rs b/mania/src/event/mod.rs index c3082d7..1c83c88 100644 --- a/mania/src/event/mod.rs +++ b/mania/src/event/mod.rs @@ -6,6 +6,12 @@ pub mod system; pub trait ManiaEvent: std::fmt::Debug {} +use crate::event::system::bot_offline; + +pub trait EventHandler { + fn on_bot_offline(&self, ev: bot_offline::BotOfflineEvent); +} + pub(crate) struct EventDispatcher { pub(crate) system: watch::Sender>, pub(crate) friend: watch::Sender>, diff --git a/mania/src/lib.rs b/mania/src/lib.rs index 681e421..ea3809d 100644 --- a/mania/src/lib.rs +++ b/mania/src/lib.rs @@ -10,7 +10,7 @@ pub mod event; pub mod message; pub mod utility; -use crate::core::business::{Business, BusinessHandle}; +pub use crate::core::business::{Business, BusinessHandle}; pub use crate::core::cache::CacheMode; use crate::core::context::Protocol; pub use crate::core::context::{AppInfo, Context, DeviceInfo}; @@ -19,6 +19,7 @@ pub use crate::core::key_store::KeyStore; use crate::core::session::Session; use crate::core::sign::{SignProvider, default_sign_provider}; use crate::entity::bot_group_member::FetchGroupMemberStrategy; +use crate::event::EventHandler; use std::env; use std::sync::Arc; @@ -63,12 +64,11 @@ impl Default for ClientConfig { } pub struct Client { - business: Business, - handle: ClientHandle, + context: Context, } impl Client { - pub async fn new( + pub fn new( mut config: ClientConfig, device: DeviceInfo, key_store: KeyStore, @@ -96,34 +96,14 @@ impl Client { crypto: Default::default(), session: Session::new(), }; - let context = Arc::new(context); - let business = Business::new(config, context.clone()).await?; - let handle = ClientHandle { - business: business.handle(), - context, - }; - - Ok(Self { business, handle }) - } - - pub fn handle(&self) -> ClientHandle { - self.handle.clone() - } + let context = context; - pub async fn spawn(&mut self) { - self.business.spawn().await; + Ok(Self { context }) } -} - -#[derive(Clone)] -pub struct ClientHandle { - business: Arc, - context: Arc, -} -// TODO: (maybe) refactor structure to more user-friendly api? -impl ClientHandle { - pub fn operator(&self) -> Arc { - self.business.clone() + pub async fn connect(self, handler: H) -> Business { + let config = self.context.config.clone(); + Business::new(config, self.context, handler).await.unwrap(); + todo!("return result?") } }