Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ parameters:
count: 1
path: src/Pull/InfinitePull.php

-
message: '#^Method Phunkie\\Streams\\Pull\\InfinitePull\:\:toArray\(\) should return array but returns Generator\.$#'
identifier: return.type
count: 1
path: src/Pull/InfinitePull.php

-
message: '#^Call to an undefined method Phunkie\\Streams\\IO\\Resource\:\:pull\(\)\.$#'
identifier: method.notFound
Expand Down
13 changes: 13 additions & 0 deletions src/Compilable.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,22 @@
use Phunkie\Effect\IO\IO;
use Phunkie\Types\ImmList;

/**
* Contract for compiling a stream into a materialised collection.
*/
interface Compilable
{
/**
* Compile the stream into an ImmList, or an IO action for effectful streams.
*
* @return ImmList|IO
*/
public function toList(): ImmList | IO;

/**
* Compile the stream into a plain PHP array.
*
* @return array
*/
public function toArray(): array;
}
6 changes: 6 additions & 0 deletions src/Functions/common.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@
* file that was distributed with this source code.
*/

/**
* Bootstraps all function files in this directory.
*/
array_map(function ($file) {
require_once $file;
}, glob(__DIR__ .'/*'));

/** @var string Effect type constant for pure (non-effectful) streams. */
const Pure = 'Pure';

/** @var string Effect type constant for IO (effectful) streams. */
const IO = 'IO';
47 changes: 45 additions & 2 deletions src/Functions/infinite.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,78 @@
use Phunkie\Streams\Infinite\Timer;
use Phunkie\Streams\Infinite\Unfold;

/**
* Create an infinite stream from a numeric range.
*
* @param int $start Starting value (inclusive)
* @param int $end Ending value (inclusive, defaults to PHP_INT_MAX)
* @param int $step Step increment between values
* @return Infinite
*/
function fromRange(int $start, int $end = PHP_INT_MAX, int $step = 1): Infinite
{
return new Range($start, $end, $step);
}

function iterate(int $start)
/**
* Create an infinite stream by repeatedly applying a function to a seed.
*
* Returns a curried function: iterate($start)($f) produces $start, $f($start), $f($f($start)), ...
*
* @param int $start The initial seed value
* @return \Closure(callable): Iterate
*/
function iterate(int $start): \Closure
{
return function (callable $f) use ($start) {
return new Iterate($f, $start);
};
}

function unfold($seed)
/**
* Create an infinite stream by unfolding a seed with a function.
*
* Returns a curried function: unfold($seed)($f) where $f returns the next value and new state.
*
* @param mixed $seed The initial state
* @return \Closure(callable): Unfold
*/
function unfold($seed): \Closure
{
return function (callable $f) use ($seed) {
return new Unfold($f, $seed);
};
}

/**
* Create an infinite stream that endlessly emits a constant value.
*
* @param mixed $pattern The value to repeat
* @return Infinite
*/
function fromConstant(mixed $pattern): Infinite
{
return new Constant($pattern);
}

/**
* Create an infinite stream that cycles through the given values repeatedly.
*
* @param mixed ...$values The values to cycle through
* @return Infinite
*/
function repeat(...$values): Infinite
{
return new Repeat(...$values);
}

/**
* Create an infinite stream that emits a tick at a fixed time interval.
*
* @param float $seconds Time interval in seconds between emissions
* @param int|null $stopAt Optional maximum number of ticks before stopping
* @return Infinite
*/
function awakeEvery(float $seconds, $stopAt = null): Infinite
{
return new Timer($seconds, $stopAt);
Expand Down
6 changes: 6 additions & 0 deletions src/Functions/io.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

use Phunkie\Effect\IO\IO;

/**
* Wrap a closure in an IO effect.
*
* @param callable $f The side-effecting closure to wrap
* @return IO
*/
function io($f): IO
{
return new IO($f);
Expand Down
1 change: 1 addition & 0 deletions src/Functions/network.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ function httpDelete(string $url, array $headers = []): Stream
* Create a TCP socket connection with bracket for safe resource management
*
* @param SocketAddress $address The socket address to connect to
* @param float $timeout Connection timeout in seconds
* @return IO<resource> IO action that creates a socket connection
*/
function socket(SocketAddress $address, float $timeout = 30.0): IO
Expand Down
13 changes: 13 additions & 0 deletions src/Functions/stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@
use Phunkie\Streams\IO\Resource;
use Phunkie\Streams\Type\Stream;

/**
* Factory function for creating streams.
*
* Accepts a Pull, Path, Infinite, Resource, or plain values.
* - Pull: creates a stream backed by a pull-based source
* - Path: creates a stream that reads from a file path
* - Infinite: creates a stream from an infinite generator (range, iterate, unfold, etc.)
* - Resource: creates a stream from a resource object
* - Plain values: creates a finite stream emitting the given values
*
* @param mixed ...$t A single Pull|Path|Infinite|Resource, or one or more plain values
* @return Stream
*/
function Stream(...$t): Stream
{
if (count($t) === 1 && $t[0] instanceof \Phunkie\Streams\Type\Pull) {
Expand Down
34 changes: 34 additions & 0 deletions src/Functions/text.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@

namespace Phunkie\Streams\text {
const lines = "\\Phunkie\\Streams\\text\\lines";

/**
* Split a string chunk into lines by the platform EOL character.
*
* @param string $chunk The text to split
* @return array<string> Array of lines
*/
function lines($chunk): array
{
return explode(PHP_EOL, $chunk);
}

const utf8Encode = "\\Phunkie\\Streams\\text\\utf8Encode";

/**
* Encode a string chunk to UTF-8. Uses mbstring if available, otherwise a manual fallback.
*
* @param string $chunk The string to encode
* @return string UTF-8 encoded string
*/
function utf8Encode(string $chunk): string
{
static $useMbstring = null;
Expand Down Expand Up @@ -49,6 +63,13 @@ function utf8Encode(string $chunk): string
}

const utf8Decode = "\\Phunkie\\Streams\\text\\utf8Decode";

/**
* Decode a UTF-8 encoded string chunk. Uses mbstring if available, otherwise a manual fallback.
*
* @param string $chunk The UTF-8 string to decode
* @return string Decoded string
*/
function utf8Decode(string $chunk): string
{
static $useMbstring = null;
Expand Down Expand Up @@ -78,11 +99,24 @@ function utf8Decode(string $chunk): string
}

const trim = "\\Phunkie\\Streams\\text\\trim";

/**
* Trim whitespace from both ends of a string chunk.
*
* @param string $chunk The string to trim
* @return string Trimmed string
*/
function trim($chunk): string
{
return \trim($chunk);
}

/**
* Create a closure that splits a string chunk by the given delimiter.
*
* @param string $delimiter The delimiter to split on
* @return \Closure(string): array<string>
*/
function splitBy(string $delimiter): \Closure
{
return function ($chunk) use ($delimiter) {
Expand Down
82 changes: 82 additions & 0 deletions src/Functions/transformation.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,39 @@
use Phunkie\Streams\Type\Transformation;

const map = 'map';

/**
* Apply a function to each element in the stream.
*
* @param callable $f The mapping function
* @return Transformation
*/
function map($f): Transformation
{
return new Transformation(fn ($chunk) => array_map($f, $chunk));
}

const filter = 'filter';

/**
* Keep only elements that satisfy the predicate.
*
* @param callable $f The predicate function
* @return Transformation
*/
function filter(callable $f): Transformation
{
return new Transformation(fn ($chunk) => array_filter($chunk, $f));
}

const flatMap = 'flatMap';

/**
* Map each element to a Stream or array and flatten the results.
*
* @param callable $f Function returning a Stream, array, or scalar per element
* @return Transformation
*/
function flatMap(callable $f): Transformation
{
return new Transformation(function ($chunk) use ($f) {
Expand All @@ -47,6 +68,12 @@ function flatMap(callable $f): Transformation
}

const flatten = 'flatten';

/**
* Flatten one level of nested Streams or arrays.
*
* @return Transformation
*/
function flatten(): Transformation
{
return new Transformation(function ($chunk) {
Expand All @@ -67,6 +94,13 @@ function flatten(): Transformation
}

const interleave = 'interleave';

/**
* Interleave elements from this stream with one or more other streams, alternating round-robin.
*
* @param \Phunkie\Streams\Type\Stream ...$others Streams to interleave with
* @return Transformation
*/
function interleave(...$others): Transformation
{
return new Transformation(function ($chunk) use ($others) {
Expand All @@ -93,23 +127,50 @@ function interleave(...$others): Transformation
}

const evalMap = 'evalMap';

/**
* Map each element through an effectful function (returning IO) and run the effect.
*
* @param callable $f Function returning an IO per element
* @return Transformation
*/
function evalMap(callable $f): Transformation
{
return new Transformation(fn ($chunk) => ImmList(...array_map(fn ($x) => $f($x)->unsafeRun(), $chunk)));
}

const evalFlatMap = 'evalFlatMap';

/**
* Map each element through a function and flatten the resulting Streams.
*
* @param callable $f Function returning a value to be wrapped in a Stream per element
* @return Transformation
*/
function evalFlatMap(callable $f): Transformation
{
return new Transformation(fn ($chunk) => Stream(...array_map($f, $chunk)));
}

const evalFilter = 'evalFilter';

/**
* Filter elements using an effectful predicate (returning IO<bool>) and run the effect.
*
* @param callable $f Predicate function returning an IO<bool> per element
* @return Transformation
*/
function evalFilter(callable $f): Transformation
{
return new Transformation(fn ($chunk) => ImmList(...array_filter($chunk, fn ($v) => $f($v)->unsafeRun())));
}

/**
* Run an effectful function (returning IO) on each element for its side effect, passing elements through unchanged.
*
* @param callable $f Function returning an IO per element (result is discarded)
* @return Transformation
*/
function evalTap($f): Transformation
{
$transformation = new Transformation(
Expand All @@ -128,6 +189,13 @@ function ($chunk) use ($f) {
}

const takeWhile = 'takeWhile';

/**
* Emit elements while the predicate holds, then stop.
*
* @param callable $predicate The predicate to test each element
* @return Transformation
*/
function takeWhile(callable $predicate): Transformation
{
return new Transformation(function ($chunk) use ($predicate) {
Expand All @@ -144,6 +212,13 @@ function takeWhile(callable $predicate): Transformation
}

const dropWhile = 'dropWhile';

/**
* Skip elements while the predicate holds, then emit the rest.
*
* @param callable $predicate The predicate to test each element
* @return Transformation
*/
function dropWhile(callable $predicate): Transformation
{
$dropping = true;
Expand All @@ -164,6 +239,13 @@ function dropWhile(callable $predicate): Transformation
}

const chunk = 'chunk';

/**
* Group elements into fixed-size sub-arrays (chunks). The last chunk may be smaller.
*
* @param int $size Number of elements per chunk
* @return Transformation
*/
function chunk(int $size): Transformation
{
$buffer = [];
Expand Down
Loading