2121
2222import fiber
2323from fiber .backend import get_backend
24+ from typing import Any , NoReturn , Callable , Sequence , Dict , Iterator , Optional , List
25+
26+ _manager : Any
2427
2528
2629__all__ = [
3235_manager = None
3336
3437
35- def _get_manager ():
38+ def _get_manager () -> fiber . managers . BaseManager :
3639 global _manager
3740 if _manager is None :
38- _manager = fiber .Manager ()
41+ _manager = fiber .Manager () # type: ignore
3942
4043 return _manager
4144
@@ -48,7 +51,7 @@ class RingNode:
4851 :param rank: The id assigned to this node. Each node will be assigned a
4952 unique id called `rank`. Rank 0 is the control node of the `Ring`.
5053 """
51- def __init__ (self , rank ):
54+ def __init__ (self , rank ) -> None :
5255 self .rank = rank
5356 self .connected = False
5457 self .ip = None
@@ -68,7 +71,17 @@ class Ring:
6871 :param initargs: positional arguments that are passed to initializer.
6972 Currently this is not used.
7073 """
71- def __init__ (self , processes , func , initializer , initargs = None ):
74+
75+ __fiber_meta__ : Dict
76+
77+ def __init__ (
78+ self ,
79+ processes : int ,
80+ func : Callable [[int , int ], Any ],
81+ initializer : Callable [[Optional [Iterator [Any ]]], None ],
82+ initargs : Iterator [Any ] = None
83+ ) -> None :
84+
7285 self .size = processes
7386 self .initializer = initializer
7487 self .initargs = initargs
@@ -79,12 +92,12 @@ def __init__(self, processes, func, initializer, initargs=None):
7992 # Propogate meta info
8093 # We can't set attributes to bound/unbound methods (PEP 232),
8194 # so we set it to Ring object
82- self .__fiber_meta__ = func .__fiber_meta__
95+ self .__fiber_meta__ = func .__fiber_meta__ # type: ignore
8396
8497 manager = _get_manager ()
85- self .members = manager .list ([RingNode (i ) for i in range (self .size )])
98+ self .members = manager .list ([RingNode (i ) for i in range (self .size )]) # type: ignore
8699
87- def _target (self ):
100+ def _target (self ) -> None :
88101 rank = self .rank
89102 node = self .members [rank ]
90103
@@ -97,10 +110,10 @@ def _target(self):
97110 node .port = port
98111 self .members [rank ] = node
99112
100- self .initializer (self )
113+ self .initializer (self . initargs )
101114 self .func (rank , self .size )
102115
103- def run (self ):
116+ def run (self ) -> None :
104117 """
105118 Start this Ring. This will start the ring 0 process on the same machine
106119 and start all the other ring nodes with Fiber processes.
@@ -113,15 +126,15 @@ def run(self):
113126 # Start process rank 0
114127 self .rank = 0
115128 ctx = mp .get_context ("spawn" )
116- p = ctx .Process (target = self ._target )
117- p .start ()
118- procs .append (p )
129+ p0 = ctx .Process (target = self ._target )
130+ p0 .start ()
131+ procs .append (p0 )
119132
120133 for i in range (1 , self .size ):
121134 self .rank = i
122135 p = fiber .Process (target = self ._target )
123136 p .start ()
124- procs .append (p )
137+ procs .append (p ) # type: ignore[arg-type]
125138
126139 self .rank = rank
127140 # wait for all processes to finish
0 commit comments