@@ -159,14 +159,24 @@ impl WorkerState {
159159 let follower_timeout_ms = self . follower_timeout_ms ;
160160 spawn_local ( async move {
161161 const POLL_INTERVAL_MS : f64 = 250.0 ;
162- let max_attempts = if follower_timeout_ms. is_finite ( ) && follower_timeout_ms > 0.0 {
163- ( follower_timeout_ms / POLL_INTERVAL_MS ) . ceil ( ) as u32
162+ let mut remaining_ms = if follower_timeout_ms. is_finite ( ) {
163+ follower_timeout_ms. max ( 0.0 )
164164 } else {
165- 1
165+ f64 :: INFINITY
166166 } ;
167- let mut attempts = 0 ;
168- while attempts < max_attempts {
169- attempts += 1 ;
167+
168+ if remaining_ms <= 0.0 {
169+ if !* has_leader. borrow ( ) {
170+ let message = format ! (
171+ "Leader election timed out after {:.0}ms" ,
172+ follower_timeout_ms. max( 0.0 )
173+ ) ;
174+ let _ = send_worker_error_message ( & message) ;
175+ }
176+ return ;
177+ }
178+
179+ while remaining_ms. is_infinite ( ) || remaining_ms > 0.0 {
170180 if * has_leader. borrow ( ) {
171181 break ;
172182 }
@@ -177,7 +187,22 @@ impl WorkerState {
177187 let _ = send_worker_error_message ( & err_msg) ;
178188 break ;
179189 }
180- sleep_ms ( POLL_INTERVAL_MS as i32 ) . await ;
190+ if * has_leader. borrow ( ) {
191+ break ;
192+ }
193+
194+ let sleep_duration = if remaining_ms. is_infinite ( ) {
195+ POLL_INTERVAL_MS
196+ } else {
197+ remaining_ms. min ( POLL_INTERVAL_MS )
198+ } ;
199+ if sleep_duration <= 0.0 {
200+ break ;
201+ }
202+ sleep_ms ( sleep_duration. ceil ( ) as i32 ) . await ;
203+ if remaining_ms. is_finite ( ) {
204+ remaining_ms -= sleep_duration;
205+ }
181206 }
182207 if !* has_leader. borrow ( ) {
183208 let timeout = follower_timeout_ms. max ( 0.0 ) ;
0 commit comments