@@ -20,11 +20,6 @@ defmodule Task.Supervisor do
2020 | { :restart , :supervisor . restart ( ) }
2121 | { :shutdown , :supervisor . shutdown ( ) }
2222
23- @ typedoc "Supervisor spec used by `async_stream`"
24- @ type async_stream_supervisor ::
25- Supervisor . supervisor ( )
26- | ( term -> Supervisor . supervisor ( ) )
27-
2823 @ doc false
2924 def child_spec ( arg ) do
3025 % {
@@ -158,15 +153,6 @@ defmodule Task.Supervisor do
158153 own task. The tasks will be spawned under the given `supervisor` and
159154 linked to the current process, similarly to `async/4`.
160155
161- You may also provide a function as the `supervisor`. Before each task is
162- started, the function will be invoked (in a new process which is linked to
163- the current process) with the stream entry that the to-be-spawned task will
164- process as its argument. The function should return a supervisor pid or name,
165- which will be used to spawn the task. This allows one to dynamically start
166- tasks in different locations in the supervision tree(s) on the local (or
167- another) node. Notably, this enables the distribution of concurrent stream
168- tasks over multiple nodes.
169-
170156 When streamed, each task will emit `{:ok, value}` upon successful
171157 completion or `{:exit, reason}` if the caller is trapping exits.
172158 Results are emitted in the same order as the original `enumerable`.
@@ -207,7 +193,7 @@ defmodule Task.Supervisor do
207193 Enum.to_list(stream)
208194
209195 """
210- @ spec async_stream ( async_stream_supervisor , Enumerable . t ( ) , module , atom , [ term ] , keyword ) ::
196+ @ spec async_stream ( Supervisor . supervisor ( ) , Enumerable . t ( ) , module , atom , [ term ] , keyword ) ::
211197 Enumerable . t ( )
212198 def async_stream ( supervisor , enumerable , module , function , args , options \\ [ ] )
213199 when is_atom ( module ) and is_atom ( function ) and is_list ( args ) do
@@ -224,7 +210,7 @@ defmodule Task.Supervisor do
224210
225211 See `async_stream/6` for discussion, options, and examples.
226212 """
227- @ spec async_stream ( async_stream_supervisor , Enumerable . t ( ) , ( term -> term ) , keyword ) ::
213+ @ spec async_stream ( Supervisor . supervisor ( ) , Enumerable . t ( ) , ( term -> term ) , keyword ) ::
228214 Enumerable . t ( )
229215 def async_stream ( supervisor , enumerable , fun , options \\ [ ] ) when is_function ( fun , 1 ) do
230216 build_stream ( supervisor , :link , enumerable , fun , options )
@@ -241,7 +227,7 @@ defmodule Task.Supervisor do
241227 See `async_stream/6` for discussion, options, and examples.
242228 """
243229 @ spec async_stream_nolink (
244- async_stream_supervisor ,
230+ Supervisor . supervisor ( ) ,
245231 Enumerable . t ( ) ,
246232 module ,
247233 atom ,
@@ -263,7 +249,7 @@ defmodule Task.Supervisor do
263249
264250 See `async_stream/6` for discussion and examples.
265251 """
266- @ spec async_stream_nolink ( async_stream_supervisor , Enumerable . t ( ) , ( term -> term ) , keyword ) ::
252+ @ spec async_stream_nolink ( Supervisor . supervisor ( ) , Enumerable . t ( ) , ( term -> term ) , keyword ) ::
267253 Enumerable . t ( )
268254 def async_stream_nolink ( supervisor , enumerable , fun , options \\ [ ] ) when is_function ( fun , 1 ) do
269255 build_stream ( supervisor , :nolink , enumerable , fun , options )
@@ -354,27 +340,12 @@ defmodule Task.Supervisor do
354340 % Task { pid: pid , ref: ref , owner: owner }
355341 end
356342
357- defp supervisor_fun ( supervisor , { _module , _fun , _args } )
358- when is_function ( supervisor , 1 ) do
359- fn { _module , _fun , [ entry | _rest_args ] } -> supervisor . ( entry ) end
360- end
361-
362- defp supervisor_fun ( supervisor , fun )
363- when is_function ( supervisor , 1 ) and is_function ( fun , 1 ) do
364- fn { _erlang , _apply , [ _fun , [ entry ] ] } -> supervisor . ( entry ) end
365- end
366-
367- defp supervisor_fun ( supervisor , _fun ) do
368- fn _mfa -> supervisor end
369- end
370-
371343 defp build_stream ( supervisor , link_type , enumerable , fun , options ) do
372- supervisor_fun = supervisor_fun ( supervisor , fun )
373344 shutdown = options [ :shutdown ]
374345
375346 & Task.Supervised . stream ( enumerable , & 1 , & 2 , fun , options , fn owner , mfa ->
376347 args = [ owner , :monitor , get_info ( owner ) , mfa ]
377- { :ok , pid } = start_child_with_spec ( supervisor_fun . ( mfa ) , args , :temporary , shutdown )
348+ { :ok , pid } = start_child_with_spec ( supervisor , args , :temporary , shutdown )
378349 if link_type == :link , do: Process . link ( pid )
379350 { link_type , pid }
380351 end )
0 commit comments