Skip to content

Commit cf0dd31

Browse files
author
José Valim
committed
Add Stream.after/2, Stream.run/2 and File.stream_to!/3 for side effects
Closes #1831
1 parent e3e75e5 commit cf0dd31

File tree

5 files changed

+152
-27
lines changed

5 files changed

+152
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
* Enhancements
44
* [Exception] Allow `exception/1` to be overriden and promote it as the main mechanism to customize exceptions
5+
* [File] Add `File.stream_to/3`
56
* [Kernel] Add `List.delete_at/2` and `List.updated_at/3`
67
* [Kernel] Add `Enum.reverse/2`
78
* [Kernel] Implement `defmodule/2`, `@/1`, `def/2` and friends in Elixir itself. `case/2`, `try/2` and `receive/1` have been made special forms. `var!/1`, `var!/2` and `alias!/1` have also been implemented in Elixir and demoted from special forms
89
* [Record] Support dynamic fields in `defrecordp`
910
* [Stream] Add `Stream.resource/3`
10-
* [Stream] Add `Stream.zip/2`, `Stream.filter_map/3`, `Stream.each/2`, `Stream.take_every/2`, `Stream.chunks/2`, `Stream.chunks/3`, `Stream.chunks/4`, `Stream.chunks_by/2`, `Stream.scan/2`, `Stream.scan/3` and `Stream.uniq/2`
11+
* [Stream] Add `Stream.zip/2`, `Stream.filter_map/3`, `Stream.each/2`, `Stream.take_every/2`, `Stream.chunks/2`, `Stream.chunks/3`, `Stream.chunks/4`, `Stream.chunks_by/2`, `Stream.scan/2`, `Stream.scan/3`, `Stream.uniq/2`, `Stream.after/2` and `Stream.run/1`
1112
* [Stream] Support `Stream.take/2` and `Stream.drop/2` with negative counts
1213
* [Typespec] Support `is_var/1` in typespecs
1314

lib/elixir/lib/file.ex

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -954,12 +954,12 @@ defmodule File do
954954
end
955955

956956
@doc """
957-
Opens the given `file` with the given `mode` and returns
958-
a stream for each `:line` (default) or for a given number
957+
Opens the file at the given `path` with the given `modes` and
958+
returns a stream for each `:line` (default) or for a given number
959959
of bytes given by `line_or_bytes`.
960960
961961
The returned stream will fail for the same reasons as
962-
`File.open!`. Note that the file is opened only and every time
962+
`File.open!/2`. Note that the file is opened only and every time
963963
streaming begins.
964964
965965
Note that stream by default uses `IO.binread/2` unless
@@ -988,6 +988,42 @@ defmodule File do
988988
Stream.resource(start_fun, next_fun, &F.close/1)
989989
end
990990

991+
@doc """
992+
Receives a stream and returns a new stream that will open the file
993+
at the given `path` for write with the extra `modes` and write
994+
each value to the file.
995+
996+
The returned stream will fail for the same reasons as
997+
`File.open!/2`. Note that the file is opened only and every time
998+
streaming begins.
999+
1000+
Note that stream by default uses `IO.binwrite/2` unless
1001+
the file is opened with an encoding, then the slower `IO.write/2`
1002+
is used to do the proper data conversion and guarantees.
1003+
"""
1004+
def stream_to!(stream, path, modes // []) do
1005+
modes = open_defaults([:write|List.delete(modes, :write)], true)
1006+
bin = nil? List.keyfind(modes, :encoding, 0)
1007+
1008+
fn acc, f ->
1009+
case F.open(path, modes) do
1010+
{ :ok, device } ->
1011+
each =
1012+
case bin do
1013+
true -> &IO.binwrite(device, &1)
1014+
false -> &IO.write(device, &1)
1015+
end
1016+
1017+
stream
1018+
|> Stream.each(each)
1019+
|> Stream.after(fn -> F.close(device) end)
1020+
|> Enumerable.Stream.Lazy.reduce(acc, f)
1021+
{ :error, reason } ->
1022+
raise File.Error, reason: reason, action: "stream_to", path: to_string(path)
1023+
end
1024+
end
1025+
end
1026+
9911027
@doc false
9921028
def binstream!(file, mode // [], line_or_bytes // :line) do
9931029
IO.write "File.binstream! is deprecated, simply use File.stream! instead\n" <>

lib/elixir/lib/stream.ex

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ defmodule Stream do
8383
like `Stream.cycle/1`, `Stream.unfold/2`, `Stream.resource/3` and more.
8484
"""
8585

86-
defrecord Lazy, enum: nil, funs: [], accs: [], done: nil
86+
defrecord Lazy, enum: nil, funs: [], accs: [], after: [], last: nil
8787

8888
defimpl Enumerable, for: Lazy do
8989
@compile :inline_list_funs
@@ -103,35 +103,32 @@ defmodule Stream do
103103
{ :error, __MODULE__ }
104104
end
105105

106-
defp do_reduce(Lazy[enum: enum, funs: funs, accs: accs, done: done], acc, fun) do
106+
defp do_reduce(Lazy[enum: enum, funs: funs, accs: accs, last: last, after: after_funs], acc, fun) do
107107
composed = :lists.foldl(fn fun, acc -> fun.(acc) end, fun, funs)
108-
do_each(&Enumerable.reduce(enum, &1, composed), done && { done, fun }, :lists.reverse(accs), acc)
108+
do_each(&Enumerable.reduce(enum, &1, composed), after_funs,
109+
last && { last, fun }, :lists.reverse(accs), acc)
109110
end
110111

111-
defp do_each(_reduce, _done, _accs, { :halt, acc }) do
112-
{ :halted, acc }
113-
end
114-
115-
defp do_each(reduce, done, accs, { :suspend, acc }) do
116-
{ :suspended, acc, &do_each(reduce, done, accs, &1) }
117-
end
118-
119-
defp do_each(reduce, done, accs, { :cont, acc }) do
120-
case reduce.({ :cont, [acc|accs] }) do
112+
defp do_each(reduce, after_funs, last, accs, { command, acc }) do
113+
case reduce.({ command, [acc|accs] }) do
121114
{ :suspended, [acc|accs], continuation } ->
122-
{ :suspended, acc, &do_each(continuation, done, accs, &1) }
115+
{ :suspended, acc, &do_each(continuation, after_funs, last, accs, &1) }
123116
{ :halted, [acc|_] } ->
117+
lc fun inlist after_funs, do: fun.()
124118
{ :halted, acc }
125119
{ :done, [acc|_] = accs } ->
126-
case done do
120+
case last do
127121
nil ->
122+
lc fun inlist after_funs, do: fun.()
128123
{ :done, acc }
129-
{ done, fun } ->
130-
case done.(fun).(accs) do
124+
{ last, fun } ->
125+
res = case last.(fun).(accs) do
131126
{ :cont, [acc|_] } -> { :done, acc }
132127
{ :halt, [acc|_] } -> { :halted, acc }
133128
{ :suspend, [acc|_] } -> { :suspended, acc, &({ :done, &1 |> elem(1) }) }
134129
end
130+
lc fun inlist after_funs, do: fun.()
131+
res
135132
end
136133
end
137134
end
@@ -163,10 +160,30 @@ defmodule Stream do
163160
## Transformers
164161

165162
@doc """
166-
Shortcut to `chunks(coll, n, n)`.
163+
Executes the given function when the stream is done, halted
164+
or an error happened during streaming. Useful for resource
165+
clean up.
166+
167+
Callbacks registered later will be executed earlier.
168+
169+
## Examples
170+
171+
iex> stream = Stream.after [1,2,3], fn -> Process.put(:done, true) end
172+
iex> Enum.to_list(stream)
173+
[1,2,3]
174+
iex> Process.get(:done)
175+
true
176+
177+
"""
178+
@spec unquote(:after)(Enumerable.t, (() -> term)) :: Enumerable.t
179+
def unquote(:after)(Lazy[after: funs] = lazy, fun), do: lazy.after([fun|funs])
180+
def unquote(:after)(enum, fun), do: Lazy[enum: enum, after: [fun]]
181+
182+
@doc """
183+
Shortcut to `chunks(enum, n, n)`.
167184
"""
168185
@spec chunks(Enumerable.t, non_neg_integer) :: Enumerable.t
169-
def chunks(coll, n), do: chunks(coll, n, n, nil)
186+
def chunks(enum, n), do: chunks(enum, n, n, nil)
170187

171188
@doc """
172189
Streams the enumerable in chunks, containing `n` items each, where
@@ -487,6 +504,30 @@ defmodule Stream do
487504
lazy enum, fn(f1) -> R.reject(fun, f1) end
488505
end
489506

507+
@doc """
508+
Runs the given stream.
509+
510+
This is useful when a stream needs to be run, for side effects,
511+
and there is no interest in its return result.
512+
513+
## Examples
514+
515+
Open up a file, replace all # by % and stream to another file
516+
without loading the whole file in memory:
517+
518+
stream = File.stream!("code")
519+
|> Stream.map(&String.replace(&1, "#", "%"))
520+
|> File.stream_to!("new")
521+
522+
No computation will be done until we call one of the Enum functions
523+
or `Stream.run/1`.
524+
"""
525+
@spec run(Enumerable.t) :: :ok
526+
def run(stream) do
527+
Enumerable.reduce(stream, { :cont, nil }, fn(_, _) -> { :cont, nil } end)
528+
:ok
529+
end
530+
490531
@doc """
491532
Lazily takes the next `n` items from the enumerable and stops
492533
enumeration.
@@ -911,8 +952,8 @@ defmodule Stream do
911952
defp lazy(enum, acc, fun),
912953
do: Lazy[enum: enum, funs: [fun], accs: [acc]]
913954

914-
defp lazy(Lazy[done: nil, funs: funs, accs: accs] = lazy, acc, fun, done),
915-
do: lazy.funs([fun|funs]).accs([acc|accs]).done(done)
916-
defp lazy(enum, acc, fun, done),
917-
do: Lazy[enum: enum, funs: [fun], accs: [acc], done: done]
955+
defp lazy(Lazy[last: nil, funs: funs, accs: accs] = lazy, acc, fun, last),
956+
do: lazy.funs([fun|funs]).accs([acc|accs]).last(last)
957+
defp lazy(enum, acc, fun, last),
958+
do: Lazy[enum: enum, funs: [fun], accs: [acc], last: last]
918959
end

lib/elixir/test/elixir/file_test.exs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,23 @@ defmodule FileTest do
995995
end
996996
end
997997

998+
test :stream_to do
999+
src = fixture_path("file.txt")
1000+
dest = tmp_path("tmp_test.txt")
1001+
1002+
try do
1003+
stream = File.stream!(src)
1004+
|> Stream.map(&String.replace(&1, "O", "A"))
1005+
|> File.stream_to!(dest)
1006+
1007+
refute File.exists?(dest)
1008+
assert Stream.run(stream) == :ok
1009+
assert File.read(dest) == { :ok, "FAA\n" }
1010+
after
1011+
File.rm(dest)
1012+
end
1013+
end
1014+
9981015
test :stream_bytes do
9991016
src = fixture_path("file.txt")
10001017
dest = tmp_path("tmp_test.txt")

lib/elixir/test/elixir/stream_test.exs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,20 @@ defmodule StreamTest do
2525
assert Enum.to_list(stream) == [3,5,7]
2626
end
2727

28+
test "after" do
29+
stream = Stream.after([1,2,3], fn -> Process.put(:after, true) end)
30+
31+
# Done
32+
Process.put(:after, false)
33+
assert Enum.to_list(stream) == [1,2,3]
34+
assert Process.get(:after)
35+
36+
# Halted
37+
Process.put(:after, false)
38+
assert Enum.take(stream, 1) == [1]
39+
assert Process.get(:after)
40+
end
41+
2842
test "chunks" do
2943
assert Stream.chunks([1, 2, 3, 4, 5], 2) |> Enum.to_list ==
3044
[[1, 2], [3, 4]]
@@ -395,6 +409,22 @@ defmodule StreamTest do
395409
assert Enum.zip(list, list) == Enum.zip(stream, stream)
396410
end
397411

412+
test "run" do
413+
Process.put(:stream_each, [])
414+
Process.put(:stream_after, false)
415+
416+
stream = [1,2,3]
417+
|> Stream.after(fn -> Process.put(:stream_after, true) end)
418+
|> Stream.each(fn x ->
419+
Process.put(:stream_each, [x|Process.get(:stream_each)])
420+
end)
421+
422+
assert is_lazy(stream)
423+
assert Stream.run(stream) == :ok
424+
assert Process.get(:stream_after)
425+
assert Process.get(:stream_each) == [3,2,1]
426+
end
427+
398428
test "unfold only calculate values if needed" do
399429
stream = Stream.unfold(1, fn x -> if x > 0, do: {x, x-1}, else: throw(:boom) end)
400430
assert Enum.take(stream, 1) == [1]

0 commit comments

Comments
 (0)