You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+46-7Lines changed: 46 additions & 7 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -144,8 +144,28 @@ def say_bid_stream_hello(request, materializer) do
144
144
|>GRPC.Stream.run_with(materializer)
145
145
end
146
146
```
147
-
148
-
The Stream API supports composable stream transformations via `ask`, `map`, `run` and other functions, enabling clean and declarative stream pipelines. For a complete list of available operators and detailed documentation, see [`GRPC.Stream`](lib/grpc/stream.ex).
147
+
The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below:
|**`from(input, opts \\\\ [])`**| Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. |**Parameters:**<br>• `input` — stream, list, or gRPC struct.<br>**Options:**<br>• `:join_with` — PID or name of an external `GenStage` producer.<br>• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).<br>• `:propagate_context` — if `true`, propagates the materializer context.<br>• `:materializer` — the current `%GRPC.Server.Stream{}`.<br>• Other options supported by `Flow`. |
152
+
|**`unary(input, opts \\\\ [])`**| Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. |**Parameters:**<br>• `input` — single gRPC message.<br>**Options:** same as `from/2`. |
153
+
|**`to_flow(stream)`**| Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}` struct. |
154
+
|**`run(stream)`**| Executes the `Flow` for a unary stream and returns the first materialized result. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}` with `unary: true` option. |
155
+
|**`run_with(stream, materializer, opts \\\\ [])`**| Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `materializer` — `%GRPC.Server.Stream{}`.<br>**Options:**<br>• `:dry_run` — if `true`, responses are not sent. |
156
+
|**`ask(stream, target, timeout \\\\ 5000)`**| Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `target` — PID or atom.<br>• `timeout` — in milliseconds. |
157
+
|**`ask!(stream, target, timeout \\\\ 5000)`**| Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. |
158
+
|**`filter(stream, fun)`**| Filters items in the stream by applying a concurrent predicate function. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — function `(item -> boolean)`. |
159
+
|**`flat_map(stream, fun)`**| Applies a function returning a list or enumerable, flattening the results. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(item -> Enumerable.t())`. |
160
+
|**`map(stream, fun)`**| Applies a transformation function to each item in the stream. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(item -> term)`. |
161
+
|**`map_with_context(stream, fun)`**| Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(context, item -> term)`. |
162
+
|**`partition(stream, opts \\\\ [])`**| Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `opts` — partitioning options (`Flow.partition/2`). |
163
+
|**`reduce(stream, acc_fun, reducer_fun)`**| Reduces the stream using an accumulator, useful for aggregations. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `acc_fun` — initializer function `() -> acc`.<br>• `reducer_fun` — `(item, acc -> acc)`. |
164
+
|**`uniq(stream)`**| Emits only distinct items from the stream (no custom uniqueness criteria). |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`. |
165
+
|**`uniq_by(stream, fun)`**| Emits only unique items based on the return value of the provided function. |**Parameters:**<br>• `stream` — `%GRPC.Stream{}`.<br>• `fun` — `(item -> term)` for uniqueness determination. |
166
+
|**`get_headers(stream)`**| Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. |**Parameters:**<br>• `stream` — `%GRPC.Server.Stream{}`.<br>**Returns:**`map` containing decoded headers. |
167
+
168
+
For a complete list of available operators see [here](lib/grpc/stream.ex).
149
169
150
170
---
151
171
@@ -186,6 +206,25 @@ This section demonstrates how to establish client connections and perform RPC ca
186
206
187
207
## Basic Connection and RPC
188
208
209
+
210
+
Typically, you start this client supervisor as part of your application's supervision tree:
0 commit comments