@@ -26,17 +26,26 @@ defmodule StreamTest do
2626 end
2727
2828 test "after" do
29- stream = Stream . after ( [ 1 , 2 , 3 ] , fn -> Process . put ( :after , true ) end )
29+ stream = Stream . after ( [ 1 , 2 , 3 ] , fn -> Process . put ( :stream_after , true ) end )
3030
3131 # Done
32- Process . put ( :after , false )
32+ Process . put ( :stream_after , false )
3333 assert Enum . to_list ( stream ) == [ 1 , 2 , 3 ]
34- assert Process . get ( :after )
34+ assert Process . get ( :stream_after )
3535
3636 # Halted
37- Process . put ( :after , false )
37+ Process . put ( :stream_after , false )
3838 assert Enum . take ( stream , 1 ) == [ 1 ]
39- assert Process . get ( :after )
39+ assert Process . get ( :stream_after )
40+ end
41+
42+ test "after closes on errors" do
43+ stream = Stream . after ( [ 1 , 2 , 3 ] , fn -> Process . put ( :stream_after , true ) end )
44+
45+ Process . put ( :stream_after , false )
46+ stream = Stream . map ( stream , fn x -> if x > 2 , do: throw ( :error ) , else: x end )
47+ assert catch_throw ( Enum . to_list ( stream ) ) == :error
48+ assert Process . get ( :stream_after )
4049 end
4150
4251 test "chunk" do
@@ -256,7 +265,7 @@ defmodule StreamTest do
256265 assert Enum . zip ( list , list ) == Enum . zip ( stream , stream )
257266 end
258267
259- test "flat_map does not leave stream suspended" do
268+ test "flat_map does not leave inner stream suspended" do
260269 stream = Stream . flat_map [ 1 , 2 , 3 ] ,
261270 fn i ->
262271 Stream . resource ( fn -> i end ,
@@ -267,7 +276,9 @@ defmodule StreamTest do
267276 Process . put ( :stream_flat_map , false )
268277 assert stream |> Enum . take ( 3 ) == [ 1 , 2 , 3 ]
269278 assert Process . get ( :stream_flat_map )
279+ end
270280
281+ test "flat_map does not leave outer stream suspended" do
271282 stream = Stream . resource ( fn -> 1 end ,
272283 fn acc -> { acc , acc + 1 } end ,
273284 fn _ -> Process . put ( :stream_flat_map , true ) end )
@@ -278,6 +289,17 @@ defmodule StreamTest do
278289 assert Process . get ( :stream_flat_map )
279290 end
280291
292+ test "flat_map closes on error" do
293+ stream = Stream . resource ( fn -> 1 end ,
294+ fn acc -> { acc , acc + 1 } end ,
295+ fn _ -> Process . put ( :stream_flat_map , true ) end )
296+ stream = Stream . flat_map ( stream , fn _ -> throw ( :error ) end )
297+
298+ Process . put ( :stream_flat_map , false )
299+ assert catch_throw ( Enum . to_list ( stream ) ) == :error
300+ assert Process . get ( :stream_flat_map )
301+ end
302+
281303 test "iterate" do
282304 stream = Stream . iterate ( 0 , & ( & 1 + 2 ) )
283305 assert Enum . take ( stream , 5 ) == [ 0 , 2 , 4 , 6 , 8 ]
@@ -402,6 +424,17 @@ defmodule StreamTest do
402424 assert Enum . to_list ( stream ) == [ 5 , 4 , 3 , 2 , 1 ]
403425 end
404426
427+ test "resource closes on errors" do
428+ stream = Stream . resource ( fn -> 1 end ,
429+ fn acc -> { acc , acc + 1 } end ,
430+ fn _ -> Process . put ( :stream_resource , true ) end )
431+
432+ Process . put ( :stream_resource , false )
433+ stream = Stream . map ( stream , fn x -> if x > 2 , do: throw ( :error ) , else: x end )
434+ assert catch_throw ( Enum . to_list ( stream ) ) == :error
435+ assert Process . get ( :stream_resource )
436+ end
437+
405438 test "resource is zippable" do
406439 # File.stream! uses Stream.resource underneath
407440 stream = File . stream! ( __FILE__ )
@@ -479,6 +512,25 @@ defmodule StreamTest do
479512 assert Process . get ( :stream_zip ) == :done
480513 end
481514
515+ test "zip/2 closes on inner error" do
516+ stream = Stream . after ( [ 1 , 2 , 3 ] , fn -> Process . put ( :stream_zip , true ) end )
517+ stream = Stream . zip ( stream , Stream . map ( [ :a , :b , :c ] , fn _ -> throw ( :error ) end ) )
518+
519+ Process . put ( :stream_zip , false )
520+ assert catch_throw ( Enum . to_list ( stream ) ) == :error
521+ assert Process . get ( :stream_zip )
522+ end
523+
524+ test "zip/2 closes on outer error" do
525+ stream = Stream . after ( [ 1 , 2 , 3 ] , fn -> Process . put ( :stream_zip , true ) end )
526+ |> Stream . zip ( [ :a , :b , :c ] )
527+ |> Stream . map ( fn _ -> throw ( :error ) end )
528+
529+ Process . put ( :stream_zip , false )
530+ assert catch_throw ( Enum . to_list ( stream ) ) == :error
531+ assert Process . get ( :stream_zip )
532+ end
533+
482534 test "with_index" do
483535 stream = Stream . with_index ( [ 1 , 2 , 3 ] )
484536 assert is_lazy ( stream )
0 commit comments