@@ -11,16 +11,29 @@ use crate::registry::WalRegistry;
1111
1212pub ( crate ) type NotifyCheckpointer = mpsc:: Sender < NamespaceName > ;
1313
14- type LibsqlCheckpointer < IO , S > = Checkpointer < WalRegistry < IO , S > > ;
14+ pub enum CheckpointMessage {
15+ /// notify that a namespace may be checkpointable
16+ Namespace ( NamespaceName ) ,
17+ /// shutdown initiated
18+ Shutdown ,
19+ }
20+
21+ impl From < NamespaceName > for CheckpointMessage {
22+ fn from ( value : NamespaceName ) -> Self {
23+ Self :: Namespace ( value)
24+ }
25+ }
26+
27+ pub type LibsqlCheckpointer < IO , S > = Checkpointer < WalRegistry < IO , S > > ;
1528
1629impl < IO , S > LibsqlCheckpointer < IO , S >
1730where
1831 IO : Io ,
1932 S : Sync + Send + ' static ,
2033{
2134 pub fn new (
22- registry : WalRegistry < IO , S > ,
23- notifier : mpsc:: Receiver < NamespaceName > ,
35+ registry : Arc < WalRegistry < IO , S > > ,
36+ notifier : mpsc:: Receiver < CheckpointMessage > ,
2437 max_checkpointing_conccurency : usize ,
2538 ) -> Self {
2639 Self :: new_with_performer ( registry, notifier, max_checkpointing_conccurency)
@@ -70,7 +83,7 @@ pub struct Checkpointer<P> {
7083 checkpointing : HashSet < NamespaceName > ,
7184 /// the checkpointer is notifier whenever there is a change to a namespage that could trigger a
7285 /// checkpoint
73- recv : mpsc:: Receiver < NamespaceName > ,
86+ recv : mpsc:: Receiver < CheckpointMessage > ,
7487 max_checkpointing_conccurency : usize ,
7588 shutting_down : bool ,
7689 join_set : JoinSet < ( NamespaceName , crate :: error:: Result < ( ) > ) > ,
@@ -84,12 +97,12 @@ where
8497 P : PerformCheckpoint + Send + Sync + ' static ,
8598{
8699 fn new_with_performer (
87- perform_checkpoint : P ,
88- notifier : mpsc:: Receiver < NamespaceName > ,
100+ perform_checkpoint : Arc < P > ,
101+ notifier : mpsc:: Receiver < CheckpointMessage > ,
89102 max_checkpointing_conccurency : usize ,
90103 ) -> Self {
91104 Self {
92- perform_checkpoint : Arc :: new ( perform_checkpoint ) ,
105+ perform_checkpoint,
93106 scheduled : Default :: default ( ) ,
94107 checkpointing : Default :: default ( ) ,
95108 recv : notifier,
@@ -141,10 +154,10 @@ where
141154 }
142155 notified = self . recv. recv( ) , if !self . shutting_down => {
143156 match notified {
144- Some ( namespace) => {
157+ Some ( CheckpointMessage :: Namespace ( namespace) ) => {
145158 self . scheduled. insert( namespace) ;
146159 }
147- None => {
160+ None | Some ( CheckpointMessage :: Shutdown ) => {
148161 self . shutting_down = true ;
149162 }
150163 }
@@ -201,10 +214,11 @@ mod test {
201214 }
202215
203216 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
204- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 5 ) ;
217+ let mut checkpointer =
218+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
205219 let ns = NamespaceName :: from ( "test" ) ;
206220
207- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
221+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
208222
209223 checkpointer. step ( ) . await ;
210224
@@ -233,10 +247,11 @@ mod test {
233247 }
234248
235249 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
236- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 5 ) ;
250+ let mut checkpointer =
251+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
237252 let ns = NamespaceName :: from ( "test" ) ;
238253
239- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
254+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
240255
241256 checkpointer. step ( ) . await ;
242257 assert_eq ! ( checkpointer. errors, 0 ) ;
@@ -264,7 +279,8 @@ mod test {
264279 }
265280
266281 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
267- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 5 ) ;
282+ let mut checkpointer =
283+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
268284
269285 drop ( sender) ;
270286
@@ -290,7 +306,8 @@ mod test {
290306 }
291307
292308 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
293- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 5 ) ;
309+ let mut checkpointer =
310+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
294311
295312 drop ( sender) ;
296313
@@ -323,12 +340,13 @@ mod test {
323340 }
324341
325342 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
326- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 5 ) ;
343+ let mut checkpointer =
344+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
327345
328346 let ns: NamespaceName = "test" . into ( ) ;
329347
330- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
331- sender. send ( ns. clone ( ) ) . await . unwrap ( ) ;
348+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
349+ sender. send ( ns. clone ( ) . into ( ) ) . await . unwrap ( ) ;
332350
333351 checkpointer. step ( ) . await ;
334352
@@ -355,13 +373,14 @@ mod test {
355373 }
356374
357375 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
358- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 5 ) ;
376+ let mut checkpointer =
377+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 5 ) ;
359378
360379 let ns1: NamespaceName = "test1" . into ( ) ;
361380 let ns2: NamespaceName = "test2" . into ( ) ;
362381
363- sender. send ( ns1. clone ( ) ) . await . unwrap ( ) ;
364- sender. send ( ns2. clone ( ) ) . await . unwrap ( ) ;
382+ sender. send ( ns1. clone ( ) . into ( ) ) . await . unwrap ( ) ;
383+ sender. send ( ns2. clone ( ) . into ( ) ) . await . unwrap ( ) ;
365384
366385 checkpointer. step ( ) . await ;
367386
@@ -390,15 +409,16 @@ mod test {
390409 }
391410
392411 let ( sender, receiver) = mpsc:: channel ( 8 ) ;
393- let mut checkpointer = Checkpointer :: new_with_performer ( TestPerformCheckoint , receiver, 2 ) ;
412+ let mut checkpointer =
413+ Checkpointer :: new_with_performer ( TestPerformCheckoint . into ( ) , receiver, 2 ) ;
394414
395415 let ns1: NamespaceName = "test1" . into ( ) ;
396416 let ns2: NamespaceName = "test2" . into ( ) ;
397417 let ns3: NamespaceName = "test3" . into ( ) ;
398418
399- sender. send ( ns1. clone ( ) ) . await . unwrap ( ) ;
400- sender. send ( ns2. clone ( ) ) . await . unwrap ( ) ;
401- sender. send ( ns3. clone ( ) ) . await . unwrap ( ) ;
419+ sender. send ( ns1. clone ( ) . into ( ) ) . await . unwrap ( ) ;
420+ sender. send ( ns2. clone ( ) . into ( ) ) . await . unwrap ( ) ;
421+ sender. send ( ns3. clone ( ) . into ( ) ) . await . unwrap ( ) ;
402422
403423 checkpointer. step ( ) . await ;
404424 checkpointer. step ( ) . await ;
0 commit comments