diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 170c2fa..170f987 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -18,12 +18,6 @@ parameters: count: 1 path: src/Pull/InfinitePull.php - - - message: '#^Method Phunkie\\Streams\\Pull\\InfinitePull\:\:toArray\(\) should return array but returns Generator\.$#' - identifier: return.type - count: 1 - path: src/Pull/InfinitePull.php - - message: '#^Call to an undefined method Phunkie\\Streams\\IO\\Resource\:\:pull\(\)\.$#' identifier: method.notFound diff --git a/src/Compilable.php b/src/Compilable.php index 76d50f5..04f4682 100644 --- a/src/Compilable.php +++ b/src/Compilable.php @@ -14,9 +14,22 @@ use Phunkie\Effect\IO\IO; use Phunkie\Types\ImmList; +/** + * Contract for compiling a stream into a materialised collection. + */ interface Compilable { + /** + * Compile the stream into an ImmList, or an IO action for effectful streams. + * + * @return ImmList|IO + */ public function toList(): ImmList | IO; + /** + * Compile the stream into a plain PHP array. + * + * @return array + */ public function toArray(): array; } diff --git a/src/Functions/common.php b/src/Functions/common.php index ca00667..4adbaee 100644 --- a/src/Functions/common.php +++ b/src/Functions/common.php @@ -9,9 +9,15 @@ * file that was distributed with this source code. */ +/** + * Bootstraps all function files in this directory. + */ array_map(function ($file) { require_once $file; }, glob(__DIR__ .'/*')); +/** @var string Effect type constant for pure (non-effectful) streams. */ const Pure = 'Pure'; + +/** @var string Effect type constant for IO (effectful) streams. */ const IO = 'IO'; diff --git a/src/Functions/infinite.php b/src/Functions/infinite.php index 047a973..181afed 100644 --- a/src/Functions/infinite.php +++ b/src/Functions/infinite.php @@ -19,35 +19,78 @@ use Phunkie\Streams\Infinite\Timer; use Phunkie\Streams\Infinite\Unfold; + /** + * Create an infinite stream from a numeric range. + * + * @param int $start Starting value (inclusive) + * @param int $end Ending value (inclusive, defaults to PHP_INT_MAX) + * @param int $step Step increment between values + * @return Infinite + */ function fromRange(int $start, int $end = PHP_INT_MAX, int $step = 1): Infinite { return new Range($start, $end, $step); } - function iterate(int $start) + /** + * Create an infinite stream by repeatedly applying a function to a seed. + * + * Returns a curried function: iterate($start)($f) produces $start, $f($start), $f($f($start)), ... + * + * @param int $start The initial seed value + * @return \Closure(callable): Iterate + */ + function iterate(int $start): \Closure { return function (callable $f) use ($start) { return new Iterate($f, $start); }; } - function unfold($seed) + /** + * Create an infinite stream by unfolding a seed with a function. + * + * Returns a curried function: unfold($seed)($f) where $f returns the next value and new state. + * + * @param mixed $seed The initial state + * @return \Closure(callable): Unfold + */ + function unfold($seed): \Closure { return function (callable $f) use ($seed) { return new Unfold($f, $seed); }; } + /** + * Create an infinite stream that endlessly emits a constant value. + * + * @param mixed $pattern The value to repeat + * @return Infinite + */ function fromConstant(mixed $pattern): Infinite { return new Constant($pattern); } + /** + * Create an infinite stream that cycles through the given values repeatedly. + * + * @param mixed ...$values The values to cycle through + * @return Infinite + */ function repeat(...$values): Infinite { return new Repeat(...$values); } + /** + * Create an infinite stream that emits a tick at a fixed time interval. + * + * @param float $seconds Time interval in seconds between emissions + * @param int|null $stopAt Optional maximum number of ticks before stopping + * @return Infinite + */ function awakeEvery(float $seconds, $stopAt = null): Infinite { return new Timer($seconds, $stopAt); diff --git a/src/Functions/io.php b/src/Functions/io.php index 57f2c2c..1be35e0 100644 --- a/src/Functions/io.php +++ b/src/Functions/io.php @@ -13,6 +13,12 @@ use Phunkie\Effect\IO\IO; + /** + * Wrap a closure in an IO effect. + * + * @param callable $f The side-effecting closure to wrap + * @return IO + */ function io($f): IO { return new IO($f); diff --git a/src/Functions/network.php b/src/Functions/network.php index d5e1270..fe28940 100644 --- a/src/Functions/network.php +++ b/src/Functions/network.php @@ -73,6 +73,7 @@ function httpDelete(string $url, array $headers = []): Stream * Create a TCP socket connection with bracket for safe resource management * * @param SocketAddress $address The socket address to connect to + * @param float $timeout Connection timeout in seconds * @return IO IO action that creates a socket connection */ function socket(SocketAddress $address, float $timeout = 30.0): IO diff --git a/src/Functions/stream.php b/src/Functions/stream.php index d245621..707ca92 100644 --- a/src/Functions/stream.php +++ b/src/Functions/stream.php @@ -16,6 +16,19 @@ use Phunkie\Streams\IO\Resource; use Phunkie\Streams\Type\Stream; + /** + * Factory function for creating streams. + * + * Accepts a Pull, Path, Infinite, Resource, or plain values. + * - Pull: creates a stream backed by a pull-based source + * - Path: creates a stream that reads from a file path + * - Infinite: creates a stream from an infinite generator (range, iterate, unfold, etc.) + * - Resource: creates a stream from a resource object + * - Plain values: creates a finite stream emitting the given values + * + * @param mixed ...$t A single Pull|Path|Infinite|Resource, or one or more plain values + * @return Stream + */ function Stream(...$t): Stream { if (count($t) === 1 && $t[0] instanceof \Phunkie\Streams\Type\Pull) { diff --git a/src/Functions/text.php b/src/Functions/text.php index f1f6131..b779573 100644 --- a/src/Functions/text.php +++ b/src/Functions/text.php @@ -11,12 +11,26 @@ namespace Phunkie\Streams\text { const lines = "\\Phunkie\\Streams\\text\\lines"; + + /** + * Split a string chunk into lines by the platform EOL character. + * + * @param string $chunk The text to split + * @return array Array of lines + */ function lines($chunk): array { return explode(PHP_EOL, $chunk); } const utf8Encode = "\\Phunkie\\Streams\\text\\utf8Encode"; + + /** + * Encode a string chunk to UTF-8. Uses mbstring if available, otherwise a manual fallback. + * + * @param string $chunk The string to encode + * @return string UTF-8 encoded string + */ function utf8Encode(string $chunk): string { static $useMbstring = null; @@ -49,6 +63,13 @@ function utf8Encode(string $chunk): string } const utf8Decode = "\\Phunkie\\Streams\\text\\utf8Decode"; + + /** + * Decode a UTF-8 encoded string chunk. Uses mbstring if available, otherwise a manual fallback. + * + * @param string $chunk The UTF-8 string to decode + * @return string Decoded string + */ function utf8Decode(string $chunk): string { static $useMbstring = null; @@ -78,11 +99,24 @@ function utf8Decode(string $chunk): string } const trim = "\\Phunkie\\Streams\\text\\trim"; + + /** + * Trim whitespace from both ends of a string chunk. + * + * @param string $chunk The string to trim + * @return string Trimmed string + */ function trim($chunk): string { return \trim($chunk); } + /** + * Create a closure that splits a string chunk by the given delimiter. + * + * @param string $delimiter The delimiter to split on + * @return \Closure(string): array + */ function splitBy(string $delimiter): \Closure { return function ($chunk) use ($delimiter) { diff --git a/src/Functions/transformation.php b/src/Functions/transformation.php index 74e7979..516a26c 100644 --- a/src/Functions/transformation.php +++ b/src/Functions/transformation.php @@ -14,18 +14,39 @@ use Phunkie\Streams\Type\Transformation; const map = 'map'; + + /** + * Apply a function to each element in the stream. + * + * @param callable $f The mapping function + * @return Transformation + */ function map($f): Transformation { return new Transformation(fn ($chunk) => array_map($f, $chunk)); } const filter = 'filter'; + + /** + * Keep only elements that satisfy the predicate. + * + * @param callable $f The predicate function + * @return Transformation + */ function filter(callable $f): Transformation { return new Transformation(fn ($chunk) => array_filter($chunk, $f)); } const flatMap = 'flatMap'; + + /** + * Map each element to a Stream or array and flatten the results. + * + * @param callable $f Function returning a Stream, array, or scalar per element + * @return Transformation + */ function flatMap(callable $f): Transformation { return new Transformation(function ($chunk) use ($f) { @@ -47,6 +68,12 @@ function flatMap(callable $f): Transformation } const flatten = 'flatten'; + + /** + * Flatten one level of nested Streams or arrays. + * + * @return Transformation + */ function flatten(): Transformation { return new Transformation(function ($chunk) { @@ -67,6 +94,13 @@ function flatten(): Transformation } const interleave = 'interleave'; + + /** + * Interleave elements from this stream with one or more other streams, alternating round-robin. + * + * @param \Phunkie\Streams\Type\Stream ...$others Streams to interleave with + * @return Transformation + */ function interleave(...$others): Transformation { return new Transformation(function ($chunk) use ($others) { @@ -93,23 +127,50 @@ function interleave(...$others): Transformation } const evalMap = 'evalMap'; + + /** + * Map each element through an effectful function (returning IO) and run the effect. + * + * @param callable $f Function returning an IO per element + * @return Transformation + */ function evalMap(callable $f): Transformation { return new Transformation(fn ($chunk) => ImmList(...array_map(fn ($x) => $f($x)->unsafeRun(), $chunk))); } const evalFlatMap = 'evalFlatMap'; + + /** + * Map each element through a function and flatten the resulting Streams. + * + * @param callable $f Function returning a value to be wrapped in a Stream per element + * @return Transformation + */ function evalFlatMap(callable $f): Transformation { return new Transformation(fn ($chunk) => Stream(...array_map($f, $chunk))); } const evalFilter = 'evalFilter'; + + /** + * Filter elements using an effectful predicate (returning IO) and run the effect. + * + * @param callable $f Predicate function returning an IO per element + * @return Transformation + */ function evalFilter(callable $f): Transformation { return new Transformation(fn ($chunk) => ImmList(...array_filter($chunk, fn ($v) => $f($v)->unsafeRun()))); } + /** + * Run an effectful function (returning IO) on each element for its side effect, passing elements through unchanged. + * + * @param callable $f Function returning an IO per element (result is discarded) + * @return Transformation + */ function evalTap($f): Transformation { $transformation = new Transformation( @@ -128,6 +189,13 @@ function ($chunk) use ($f) { } const takeWhile = 'takeWhile'; + + /** + * Emit elements while the predicate holds, then stop. + * + * @param callable $predicate The predicate to test each element + * @return Transformation + */ function takeWhile(callable $predicate): Transformation { return new Transformation(function ($chunk) use ($predicate) { @@ -144,6 +212,13 @@ function takeWhile(callable $predicate): Transformation } const dropWhile = 'dropWhile'; + + /** + * Skip elements while the predicate holds, then emit the rest. + * + * @param callable $predicate The predicate to test each element + * @return Transformation + */ function dropWhile(callable $predicate): Transformation { $dropping = true; @@ -164,6 +239,13 @@ function dropWhile(callable $predicate): Transformation } const chunk = 'chunk'; + + /** + * Group elements into fixed-size sub-arrays (chunks). The last chunk may be smaller. + * + * @param int $size Number of elements per chunk + * @return Transformation + */ function chunk(int $size): Transformation { $buffer = []; diff --git a/src/IO/File/Path.php b/src/IO/File/Path.php index 24784c4..4762b41 100644 --- a/src/IO/File/Path.php +++ b/src/IO/File/Path.php @@ -11,12 +11,21 @@ namespace Phunkie\Streams\IO\File; +/** + * Value object representing a filesystem path. + */ class Path { + /** + * @param string $pathname The filesystem path + */ public function __construct(private readonly string $pathname) { } + /** + * @return string The filesystem path as a string + */ public function toString(): string { return $this->pathname; diff --git a/src/IO/Network/HttpRequest.php b/src/IO/Network/HttpRequest.php index 5c5b1c3..41db814 100644 --- a/src/IO/Network/HttpRequest.php +++ b/src/IO/Network/HttpRequest.php @@ -15,13 +15,25 @@ use Phunkie\Streams\Type\Stream; /** - * HTTP Request resource for making HTTP calls + * HTTP request as a pullable stream Resource. + * + * Supports GET, POST, PUT, DELETE, PATCH, and HEAD methods. + * The request is executed lazily on the first pull. Uses PHP stream + * contexts internally. */ class HttpRequest implements Resource { private $handle; private bool $executed = false; + /** + * @param string $url Request URL + * @param string $method HTTP method (GET, POST, PUT, DELETE, PATCH, HEAD) + * @param array $headers Request headers as strings or key-value pairs + * @param string|null $body Request body (used with POST, PUT, PATCH) + * @param float $timeout Connection timeout in seconds + * @throws \InvalidArgumentException If the HTTP method is not supported + */ public function __construct( private string $url, private string $method = 'GET', @@ -34,6 +46,7 @@ public function __construct( } } + /** Close the response stream if still open. */ public function __destruct() { if ($this->isOpen()) { @@ -41,27 +54,66 @@ public function __destruct() } } + /** + * Create a Stream for an HTTP GET request. + * + * @param string $url Request URL + * @param array $headers Optional request headers + * @return Stream + */ public static function get(string $url, array $headers = []): Stream { return Stream(new HttpRequest($url, 'GET', $headers)); } + /** + * Create a Stream for an HTTP POST request. + * + * @param string $url Request URL + * @param string $body Request body + * @param array $headers Optional request headers + * @return Stream + */ public static function post(string $url, string $body, array $headers = []): Stream { return Stream(new HttpRequest($url, 'POST', $headers, $body)); } + /** + * Create a Stream for an HTTP PUT request. + * + * @param string $url Request URL + * @param string $body Request body + * @param array $headers Optional request headers + * @return Stream + */ public static function put(string $url, string $body, array $headers = []): Stream { return Stream(new HttpRequest($url, 'PUT', $headers, $body)); } + /** + * Create a Stream for an HTTP DELETE request. + * + * @param string $url Request URL + * @param array $headers Optional request headers + * @return Stream + */ public static function delete(string $url, array $headers = []): Stream { return Stream(new HttpRequest($url, 'DELETE', $headers)); } - public function pull($bytes) + /** + * Pull the next chunk of response data. + * + * Executes the HTTP request on first call. Returns Resource::EOF + * when the response has been fully consumed. + * + * @param int $bytes Number of bytes to read + * @return string Data chunk, or Resource::EOF when the response is exhausted + */ + public function pull($bytes): string { if (!$this->executed) { $this->execute(); @@ -70,11 +122,21 @@ public function pull($bytes) return $this->read($bytes); } + /** + * Check whether the response handle is an open stream resource. + * + * @return bool + */ private function isOpen(): bool { return is_resource($this->handle) && get_resource_type($this->handle) === 'stream'; } + /** + * Execute the HTTP request and open the response stream. + * + * @return void + */ private function execute(): void { $options = [ @@ -100,6 +162,11 @@ private function execute(): void $this->executed = true; } + /** + * Format the headers array into an HTTP header string. + * + * @return string + */ private function formatHeaders(): string { if (empty($this->headers)) { @@ -121,7 +188,13 @@ private function formatHeaders(): string return implode("\r\n", $formatted); } - private function read($bytes) + /** + * Read up to $bytes from the response, returning Resource::EOF at end-of-stream. + * + * @param int $bytes Number of bytes to read. + * @return string + */ + private function read($bytes): string { if (!is_resource($this->handle)) { throw new \Error("HTTP handle is not a valid resource"); @@ -136,6 +209,11 @@ private function read($bytes) return $data; } + /** + * Close the response stream handle. + * + * @return void + */ private function close(): void { if (is_resource($this->handle)) { @@ -144,8 +222,11 @@ private function close(): void } /** - * Get response headers from the HTTP request - * Only available after execution + * Get response headers from the HTTP request. + * + * Only available after the request has been executed (after the first pull). + * + * @return array Response headers, or empty array if not yet executed */ public function getResponseHeaders(): array { diff --git a/src/IO/Network/SocketAddress.php b/src/IO/Network/SocketAddress.php index 0338579..419b9e5 100644 --- a/src/IO/Network/SocketAddress.php +++ b/src/IO/Network/SocketAddress.php @@ -12,10 +12,17 @@ namespace Phunkie\Streams\IO\Network; /** - * Represents a socket address (host:port combination) + * Value object representing a TCP socket address (host:port). + * + * Port is validated to be within the range 1-65535. */ class SocketAddress { + /** + * @param string $host Hostname or IP address + * @param int $port TCP port (1-65535) + * @throws \InvalidArgumentException If port is out of range + */ public function __construct( private readonly string $host, private readonly int $port @@ -25,21 +32,33 @@ public function __construct( } } + /** + * @return string The address formatted as a TCP URI (e.g. "tcp://host:port") + */ public function toString(): string { return "tcp://{$this->host}:{$this->port}"; } + /** + * @return string The hostname or IP address + */ public function getHost(): string { return $this->host; } + /** + * @return int The port number + */ public function getPort(): int { return $this->port; } + /** + * @return string The address formatted as a TCP URI + */ public function __toString(): string { return $this->toString(); diff --git a/src/IO/Network/SocketRead.php b/src/IO/Network/SocketRead.php index 21dd5e4..b91770c 100644 --- a/src/IO/Network/SocketRead.php +++ b/src/IO/Network/SocketRead.php @@ -15,13 +15,19 @@ use Phunkie\Streams\Type\Stream; /** - * TCP Socket client for reading data from a remote server + * TCP socket reader as a pullable stream Resource. + * + * Connects lazily on first pull and reads data with a configurable timeout. */ class SocketRead implements Resource { private $socket; private float $timeout; + /** + * @param SocketAddress $address Remote TCP address to connect to + * @param float $timeout Connection timeout in seconds + */ public function __construct( private SocketAddress $address, float $timeout = 30.0 @@ -29,6 +35,7 @@ public function __construct( $this->timeout = $timeout; } + /** Close the socket if still open. */ public function __destruct() { if ($this->isOpen()) { @@ -36,6 +43,14 @@ public function __destruct() } } + /** + * Create a Stream that reads all data from a TCP socket in chunks. + * + * @param SocketAddress $address Remote TCP address + * @param int $bytes Chunk size in bytes + * @param float $timeout Connection timeout in seconds + * @return Stream + */ public static function readAll(SocketAddress $address, int $bytes = 4096, float $timeout = 30.0): Stream { $stream = Stream(new SocketRead($address, $timeout)); @@ -43,7 +58,15 @@ public static function readAll(SocketAddress $address, int $bytes = 4096, float return $stream->setBytes($bytes); } - public function pull($bytes) + /** + * Pull the next chunk of bytes from the socket. + * + * Connects on first call. Returns Resource::EOF when the remote end closes. + * + * @param int $bytes Number of bytes to read + * @return string Data chunk, or Resource::EOF when the connection is closed + */ + public function pull($bytes): string { if (!$this->isOpen()) { $this->connect(); @@ -52,11 +75,21 @@ public function pull($bytes) return $this->read($bytes); } + /** + * Check whether the socket is an open stream resource. + * + * @return bool + */ private function isOpen(): bool { return is_resource($this->socket) && get_resource_type($this->socket) === 'stream'; } + /** + * Open a TCP connection to the remote address. + * + * @return void + */ private function connect(): void { $errno = 0; @@ -79,7 +112,13 @@ private function connect(): void stream_set_blocking($this->socket, false); } - private function read($bytes) + /** + * Read up to $bytes from the socket, returning Resource::EOF when closed. + * + * @param int $bytes Number of bytes to read. + * @return string + */ + private function read($bytes): string { if (!is_resource($this->socket)) { throw new \Error("Socket is not a valid resource"); @@ -97,6 +136,11 @@ private function read($bytes) return $data; } + /** + * Close the socket connection. + * + * @return void + */ private function close(): void { if (is_resource($this->socket)) { diff --git a/src/IO/Network/SocketServer.php b/src/IO/Network/SocketServer.php index ceaa7f9..969af86 100644 --- a/src/IO/Network/SocketServer.php +++ b/src/IO/Network/SocketServer.php @@ -15,14 +15,21 @@ use Phunkie\Streams\Type\Stream; /** - * TCP Socket server for accepting client connections + * TCP server listener as a pullable stream Resource. * - * Each pull() returns a client socket resource + * Binds to a host:port, listens for connections, and yields accepted + * client socket resources on each pull. */ class SocketServer implements Resource { private $serverSocket; + /** + * @param string $host Hostname or IP to bind to + * @param int $port TCP port to listen on (1-65535) + * @param int $backlog Maximum length of the pending connections queue + * @throws \InvalidArgumentException If port is out of range + */ public function __construct( private string $host, private int $port, @@ -33,6 +40,7 @@ public function __construct( } } + /** Close the server socket if still open. */ public function __destruct() { if ($this->isOpen()) { @@ -40,6 +48,14 @@ public function __destruct() } } + /** + * Create a Stream that listens for incoming TCP connections. + * + * @param string $host Hostname or IP to bind to + * @param int $port TCP port to listen on + * @param int $backlog Maximum pending connections queue length + * @return Stream + */ public static function listen(string $host, int $port, int $backlog = SOMAXCONN): Stream { $stream = Stream(new SocketServer($host, $port, $backlog)); @@ -47,7 +63,16 @@ public static function listen(string $host, int $port, int $backlog = SOMAXCONN) return $stream; } - public function pull($bytes) + /** + * Accept the next client connection. + * + * Binds and listens on first call. Returns a client socket resource, + * or Resource::EOF if the accept fails. + * + * @param int $bytes Unused (required by Resource interface) + * @return mixed Client socket resource, or Resource::EOF on failure + */ + public function pull($bytes): mixed { if (!$this->isOpen()) { $this->bind(); @@ -56,11 +81,21 @@ public function pull($bytes) return $this->accept(); } + /** + * Check whether the server socket is an open stream resource. + * + * @return bool + */ private function isOpen(): bool { return is_resource($this->serverSocket) && get_resource_type($this->serverSocket) === 'stream'; } + /** + * Create and bind a TCP server socket, then start listening. + * + * @return void + */ private function bind(): void { $errno = 0; @@ -84,7 +119,12 @@ private function bind(): void stream_set_blocking($this->serverSocket, false); } - private function accept() + /** + * Accept the next incoming connection, returning the client socket or Resource::EOF. + * + * @return mixed + */ + private function accept(): mixed { if (!is_resource($this->serverSocket)) { throw new \Error("Server socket is not a valid resource"); @@ -102,6 +142,11 @@ private function accept() return $client; } + /** + * Close the server socket. + * + * @return void + */ private function close(): void { if (is_resource($this->serverSocket)) { @@ -109,6 +154,9 @@ private function close(): void } } + /** + * @return string The bound address as a TCP URI (e.g. "tcp://host:port") + */ public function getAddress(): string { return "tcp://{$this->host}:{$this->port}"; diff --git a/src/IO/Read.php b/src/IO/Read.php index 2d6f371..ae80d81 100644 --- a/src/IO/Read.php +++ b/src/IO/Read.php @@ -13,14 +13,23 @@ use Phunkie\Streams\Type\Stream; +/** + * Reads from a file resource chunk by chunk. + * + * Opens the file lazily on first pull and closes it on destruction. + */ class Read implements Resource { private $handle; + /** + * @param string $path Filesystem path to read from + */ public function __construct(private string $path) { } + /** Close the file handle if still open. */ public function __destruct() { if ($this->isOpen()) { @@ -28,6 +37,13 @@ public function __destruct() } } + /** + * Create a Stream that reads the entire file in chunks. + * + * @param string $path Filesystem path to read from + * @param int $bytes Chunk size in bytes + * @return Stream + */ public static function readAll($path, $bytes = 256): Stream { $stream = Stream(new Read($path)); @@ -35,7 +51,15 @@ public static function readAll($path, $bytes = 256): Stream return $stream->setBytes($bytes); } - public function pull($bytes) + /** + * Pull the next chunk of bytes from the file. + * + * Opens the file on first call. Returns Resource::EOF at end-of-file. + * + * @param int $bytes Number of bytes to read + * @return string Data chunk, or Resource::EOF when the file is exhausted + */ + public function pull($bytes): string { if (!$this->isOpen()) { $this->open(); @@ -44,17 +68,33 @@ public function pull($bytes) return $this->read($bytes); } - private function isOpen() + /** + * Check whether the file handle is an open stream resource. + * + * @return bool + */ + private function isOpen(): bool { return is_resource($this->handle) && get_resource_type($this->handle) === 'stream'; } - private function open() + /** + * Open the file for reading. + * + * @return void + */ + private function open(): void { $this->handle = fopen($this->path, 'r'); } - private function read($bytes) + /** + * Read up to $bytes from the handle, returning Resource::EOF at end-of-file. + * + * @param int $bytes Number of bytes to read. + * @return string + */ + private function read($bytes): string { if (is_resource($this->handle)) { $data = fread($this->handle, $bytes); @@ -69,6 +109,11 @@ private function read($bytes) throw new \Error("Not a valid resource"); } + /** + * Close the file handle. + * + * @return void + */ private function close(): void { fclose($this->handle); diff --git a/src/IO/Resource.php b/src/IO/Resource.php index 46df710..1dabbbb 100644 --- a/src/IO/Resource.php +++ b/src/IO/Resource.php @@ -11,7 +11,14 @@ namespace Phunkie\Streams\IO; +/** + * Interface for pullable stream resources. + * + * Implementations provide a pull-based mechanism for reading data + * from external sources (files, sockets, HTTP, etc.). + */ interface Resource { + /** Sentinel value returned when a resource has reached end-of-stream. */ public const EOF = 'Phunkie@Reserverd@Constant@EOF'; } diff --git a/src/Infinite/Constant.php b/src/Infinite/Constant.php index 3b59768..b25dddf 100644 --- a/src/Infinite/Constant.php +++ b/src/Infinite/Constant.php @@ -11,12 +11,19 @@ namespace Phunkie\Streams\Infinite; +/** + * Infinite stream that yields a constant value indefinitely. + */ class Constant implements Infinite { + /** + * @param mixed $pattern The value to yield on every iteration + */ public function __construct(private mixed $pattern) { } + /** {@inheritdoc} */ public function getValues(): \Generator { while (true) { @@ -24,6 +31,7 @@ public function getValues(): \Generator } } + /** {@inheritdoc} */ public function reset(): void { // do nothing diff --git a/src/Infinite/Infinite.php b/src/Infinite/Infinite.php index 6dce02a..6f1bad1 100644 --- a/src/Infinite/Infinite.php +++ b/src/Infinite/Infinite.php @@ -11,9 +11,22 @@ namespace Phunkie\Streams\Infinite; +/** + * Interface for infinite stream sources (generators). + * + * Implementations produce an unbounded sequence of values via a Generator. + */ interface Infinite { + /** + * Get the values from this infinite source as a Generator. + * + * @return \Generator + */ public function getValues(): \Generator; + /** + * Reset the source to its initial state. + */ public function reset(): void; } diff --git a/src/Infinite/Iterate.php b/src/Infinite/Iterate.php index 4996f2c..a4cecdf 100644 --- a/src/Infinite/Iterate.php +++ b/src/Infinite/Iterate.php @@ -11,20 +11,33 @@ namespace Phunkie\Streams\Infinite; +/** + * Infinite stream that repeatedly applies a function to a seed value. + * + * Produces the sequence: seed, f(seed), f(f(seed)), ... + */ class Iterate implements Infinite { private \Generator $values; + /** + * @param \Closure|string $f Function applied to produce the next value + * @param int $start Initial seed value + */ public function __construct(private readonly \Closure|string $f, private readonly int $start) { $this->values = $this->generate(); } + /** {@inheritdoc} */ public function getValues(): \Generator { return $this->values; } + /** + * @return \Generator Yields seed, f(seed), f(f(seed)), ... + */ private function generate(): \Generator { for ($i = $this->start; true; $i = ($this->f)($i)) { @@ -32,6 +45,7 @@ private function generate(): \Generator } } + /** {@inheritdoc} */ public function reset(): void { $this->values = $this->generate(); diff --git a/src/Infinite/Range.php b/src/Infinite/Range.php index 3e69b9c..f73f972 100644 --- a/src/Infinite/Range.php +++ b/src/Infinite/Range.php @@ -11,15 +11,29 @@ namespace Phunkie\Streams\Infinite; +/** + * Generates a numeric range from start (inclusive) to end (exclusive). + * + * Can represent very large ranges (up to PHP_INT_MAX) without + * allocating memory for all values at once. + */ final class Range implements Infinite { private \Generator $values; + /** + * @param int $start Start of the range (inclusive) + * @param int $end End of the range (exclusive) + * @param int $step Step increment between values + */ public function __construct(private int $start, private int $end, private int $step = 1) { $this->values = $this->generate(); } + /** + * @return \Generator Yields integers from start to end-1 by step + */ private function generate(): \Generator { for ($i = $this->start; $i < $this->end; $i += $this->step) { @@ -27,11 +41,13 @@ private function generate(): \Generator } } + /** {@inheritdoc} */ public function getValues(): \Generator { return $this->values; } + /** {@inheritdoc} */ public function reset(): void { $this->values = $this->generate(); diff --git a/src/Infinite/Repeat.php b/src/Infinite/Repeat.php index 567f2ec..f7bf0cf 100644 --- a/src/Infinite/Repeat.php +++ b/src/Infinite/Repeat.php @@ -11,15 +11,25 @@ namespace Phunkie\Streams\Infinite; +/** + * Infinite stream that cycles through a pattern of values repeatedly. + * + * Given values (a, b, c), yields a, b, c, a, b, c, ... indefinitely. + */ class Repeat implements Infinite { + /** @var array The pattern of values to cycle through */ private array $pattern; + /** + * @param mixed ...$pattern Values to cycle through + */ public function __construct(...$pattern) { $this->pattern = $pattern; } + /** {@inheritdoc} */ public function getValues(): \Generator { while (true) { @@ -31,6 +41,7 @@ public function getValues(): \Generator } } + /** {@inheritdoc} */ public function reset(): void { reset($this->pattern); diff --git a/src/Infinite/Timer.php b/src/Infinite/Timer.php index 44ebe77..2208d5d 100644 --- a/src/Infinite/Timer.php +++ b/src/Infinite/Timer.php @@ -11,6 +11,12 @@ namespace Phunkie\Streams\Infinite; +/** + * Timer-based infinite stream that yields elapsed nanoseconds at regular intervals. + * + * When the interval is a fractional second, stopAt is treated as an emission count. + * When the interval is a whole second, stopAt is treated as a duration in seconds. + */ class Timer implements Infinite { private ?float $stopAt; @@ -22,6 +28,10 @@ class Timer implements Infinite private ?int $limit; private bool $isFraction; + /** + * @param float $seconds Interval between emissions in seconds + * @param float|null $stopAt Stop condition: emission count (fractional interval) or duration in seconds (whole interval), null for infinite + */ public function __construct(float $seconds, ?float $stopAt) { $this->start = microtime(true); @@ -38,6 +48,11 @@ public function __construct(float $seconds, ?float $stopAt) $this->limit = $this->isFraction ? $this->stopAt : $stopAt; } + /** + * Yield elapsed nanoseconds (as a string) at each interval tick. + * + * {@inheritdoc} + */ public function getValues(): \Generator { while ($this->stopAt === null || $this->end()) { @@ -51,18 +66,24 @@ public function getValues(): \Generator } } + /** {@inheritdoc} */ public function reset(): void { $this->start = microtime(true); } + /** + * @return float The interval in seconds between emissions + */ public function getSeconds(): float { return $this->seconds; } /** - * @return bool + * Check whether the timer should continue emitting. + * + * @return bool True if the timer has not yet reached its stop condition */ public function end(): bool { diff --git a/src/Infinite/Unfold.php b/src/Infinite/Unfold.php index dde1618..923bc12 100644 --- a/src/Infinite/Unfold.php +++ b/src/Infinite/Unfold.php @@ -46,6 +46,7 @@ public function getValues(): \Generator return $this->values; } + /** @return \Generator */ private function generate(): \Generator { $currentSeed = $this->seed; @@ -69,7 +70,7 @@ public function reset(): void * * @return mixed The seed value */ - public function getSeed() + public function getSeed(): mixed { return $this->seed; } @@ -79,7 +80,7 @@ public function getSeed() * * @return callable The unfold function */ - public function getUnfoldFn() + public function getUnfoldFn(): callable { return $this->f; } @@ -90,7 +91,7 @@ public function getUnfoldFn() * @param mixed $seed The current seed value * @return array [currentValue, nextSeed] */ - public function next($seed) + public function next($seed): array { $pair = ($this->f)($seed); diff --git a/src/Ops/Pull/EffectfulOps.php b/src/Ops/Pull/EffectfulOps.php index e2ef550..e5798d4 100644 --- a/src/Ops/Pull/EffectfulOps.php +++ b/src/Ops/Pull/EffectfulOps.php @@ -33,28 +33,46 @@ trait EffectfulOps * @param callable $f A => IO * @return $this */ - public function evalMap($f) + public function evalMap($f): static { $this->appendTransformation(evalMap($f)[IO::class]); return $this; } - public function evalTap($f) + /** + * Apply an effectful function for side effects, keeping the original element. + * + * @param callable $f A => IO + * @return $this + */ + public function evalTap($f): static { $this->appendTransformation(evalTap($f)[IO::class]); return $this; } - public function evalFilter($f) + /** + * Filter elements using an effectful predicate that returns IO. + * + * @param callable $f A => IO + * @return $this + */ + public function evalFilter($f): static { $this->appendTransformation(evalFilter($f)[IO::class]); return $this; } - public function evalFlatMap($f) + /** + * FlatMap with an effectful function. Delegates to evalFilter transformation. + * + * @param callable $f A => IO> + * @return $this + */ + public function evalFlatMap($f): static { $this->appendTransformation(evalFilter($f)[IO::class]); diff --git a/src/Ops/Pull/InfinitePull/CompileOps.php b/src/Ops/Pull/InfinitePull/CompileOps.php index 90d4b98..b850fa1 100644 --- a/src/Ops/Pull/InfinitePull/CompileOps.php +++ b/src/Ops/Pull/InfinitePull/CompileOps.php @@ -17,8 +17,22 @@ use Phunkie\Streams\IO\Resource; use Phunkie\Types\ImmList; +/** + * Compilation operations for InfinitePull. Materialises infinite sequences + * with truncation (first 10 elements + "...") to prevent unbounded evaluation. + * + * @method \Phunkie\Streams\Infinite\Infinite getInfinite() + * @method array getValues() + * @method mixed pull() + */ trait CompileOps { + /** + * Compile into an ImmList, truncating to 10 elements for infinite sources. + * Returns IO wrapping the list for infinite generators. + * + * @return ImmList|IO + */ public function toList(): ImmList | IO { $list = $this->runTransformations($this->getInfinite()->getValues()); @@ -43,12 +57,23 @@ public function toList(): ImmList | IO return $list instanceof IO ? $list : new ImmList(...$list); } + /** + * Compile into a plain PHP array. + * + * @return array + */ public function toArray(): array { return $this->getValues(); } - public function runLog($bytes) + /** + * Run the pull and log output as IO, capped at 10 elements for safety. + * + * @param mixed $bytes Byte size hint + * @return IO + */ + public function runLog($bytes): IO { return new IO(function () use ($bytes) { $log = []; diff --git a/src/Ops/Pull/InfinitePull/FunctorOps.php b/src/Ops/Pull/InfinitePull/FunctorOps.php index 616d2e2..1b0d81c 100644 --- a/src/Ops/Pull/InfinitePull/FunctorOps.php +++ b/src/Ops/Pull/InfinitePull/FunctorOps.php @@ -16,15 +16,29 @@ use Phunkie\Streams\Type\Scope; /** + * Functor operations for InfinitePull. Registers map transformations on the scope. + * * @method Scope getScope() */ trait FunctorOps { + /** + * Alias for map. Apply a function to each output element. + * + * @param callable $f A => B + * @return static + */ public function mapOutput($f): static { return $this->map($f); } + /** + * Register a map transformation to be applied when the pull is compiled. + * + * @param callable $f A => B + * @return static + */ public function map($f): static { $this->appendTransformation(map($f)); diff --git a/src/Ops/Pull/InfinitePull/ImmListOps.php b/src/Ops/Pull/InfinitePull/ImmListOps.php index 466387e..5c33d7b 100644 --- a/src/Ops/Pull/InfinitePull/ImmListOps.php +++ b/src/Ops/Pull/InfinitePull/ImmListOps.php @@ -15,8 +15,23 @@ use Phunkie\Streams\Pull\InfinitePull; use Phunkie\Streams\Pull\ValuesPull; +/** + * List-like operations for InfinitePull. Converts to finite ValuesPull when bounded. + * + * @method \Phunkie\Streams\Infinite\Infinite getInfinite() + * @method \Phunkie\Streams\Type\Scope getScope() + * @method bool valid() + * @method mixed pull() + */ trait ImmListOps { + /** + * Take the first $n elements from the infinite source. + * Returns InfinitePull for Timer sources (preserving timing), or ValuesPull otherwise. + * + * @param int $n Number of elements to take + * @return InfinitePull|ValuesPull + */ public function take(int $n): InfinitePull | ValuesPull { $infinite = $this->getInfinite(); diff --git a/src/Ops/Pull/InfinitePull/IteratorOps.php b/src/Ops/Pull/InfinitePull/IteratorOps.php index 57109c2..887124e 100644 --- a/src/Ops/Pull/InfinitePull/IteratorOps.php +++ b/src/Ops/Pull/InfinitePull/IteratorOps.php @@ -14,30 +14,57 @@ use Phunkie\Streams\Infinite\Infinite; /** + * Iterator interface implementation for InfinitePull. Delegates to the underlying Infinite generator. + * * @method Infinite getInfinite() */ trait IteratorOps { + /** + * Return the current element from the infinite source. + * + * @return mixed + */ public function current(): mixed { return $this->getInfinite()->getValues()->current(); } + /** + * Advance to the next element. + * + * @return void + */ public function next(): void { $this->getInfinite()->getValues()->next(); } + /** + * Return the current key. + * + * @return mixed + */ public function key(): mixed { return $this->getInfinite()->getValues()->key(); } + /** + * Check if the current position is valid. + * + * @return bool + */ public function valid(): bool { return $this->getInfinite()->getValues()->valid(); } + /** + * Reset the iterator to the beginning. + * + * @return void + */ public function rewind(): void { $this->getInfinite()->getValues()->rewind(); diff --git a/src/Ops/Pull/InfinitePull/ShowOps.php b/src/Ops/Pull/InfinitePull/ShowOps.php index 9b2a07c..fe8d429 100644 --- a/src/Ops/Pull/InfinitePull/ShowOps.php +++ b/src/Ops/Pull/InfinitePull/ShowOps.php @@ -11,13 +11,29 @@ namespace Phunkie\Streams\Ops\Pull\InfinitePull; +/** + * Show operations for InfinitePull. Previews the first 10 elements for display. + * + * @method \Phunkie\Streams\Infinite\Infinite getInfinite() + */ trait ShowOps { + /** + * Return the type representation. + * + * @return string + */ public function showType(): string { return 'Byte'; } + /** + * Return a string showing the first 10 elements followed by "...". + * Resets the infinite source after peeking. + * + * @return string + */ public function toString(): string { $values = []; diff --git a/src/Ops/Pull/ResourcePull/CompileOps.php b/src/Ops/Pull/ResourcePull/CompileOps.php index 2aa48b0..2a5040e 100644 --- a/src/Ops/Pull/ResourcePull/CompileOps.php +++ b/src/Ops/Pull/ResourcePull/CompileOps.php @@ -15,8 +15,20 @@ use Phunkie\Streams\IO\Resource; use Phunkie\Types\ImmList; +/** + * Compilation operations for ResourcePull. Materialises resource-backed data into collections. + * + * @method array getValues() + * @method \Phunkie\Streams\Type\Scope getScope() + * @method mixed pull() + */ trait CompileOps { + /** + * Compile into an ImmList, applying any scope-registered maps. + * + * @return ImmList + */ public function toList(): ImmList { $list = ImmList(...$this->getValues()); @@ -28,12 +40,23 @@ public function toList(): ImmList return $list; } + /** + * Compile into a plain PHP array. + * + * @return array + */ public function toArray(): array { return $this->getValues(); } - public function runLog($bytes) + /** + * Run the resource pull and log output as IO, reading up to 10 chunks. + * + * @param mixed $bytes Byte size hint for reading + * @return IO + */ + public function runLog($bytes): IO { return new IO(function () use ($bytes) { $log = []; diff --git a/src/Ops/Pull/ResourcePull/FunctorOps.php b/src/Ops/Pull/ResourcePull/FunctorOps.php index 6469674..1a7dee8 100644 --- a/src/Ops/Pull/ResourcePull/FunctorOps.php +++ b/src/Ops/Pull/ResourcePull/FunctorOps.php @@ -14,10 +14,18 @@ use Phunkie\Streams\Type\Scope; /** + * Functor operations for ResourcePull. Registers maps on the scope for deferred execution. + * * @method Scope getScope() */ trait FunctorOps { + /** + * Register a mapping function to be applied to each output element at compile time. + * + * @param callable $f A => B + * @return static + */ public function mapOutput($f): static { $this->getScope()->addMap($f); diff --git a/src/Ops/Pull/ResourcePull/ShowOps.php b/src/Ops/Pull/ResourcePull/ShowOps.php index 294c4e6..20691ad 100644 --- a/src/Ops/Pull/ResourcePull/ShowOps.php +++ b/src/Ops/Pull/ResourcePull/ShowOps.php @@ -11,13 +11,26 @@ namespace Phunkie\Streams\Ops\Pull\ResourcePull; +/** + * Show operations for ResourcePull. Displays opaque representation since resource data is lazy. + */ trait ShowOps { + /** + * Return the type representation for resource-backed pulls. + * + * @return string + */ public function showType(): string { return 'Byte'; } + /** + * Return an opaque string representation (resource content is not eagerly available). + * + * @return string + */ public function toString(): string { return '...'; diff --git a/src/Ops/Pull/ResourcePullConcat/CompileOps.php b/src/Ops/Pull/ResourcePullConcat/CompileOps.php index 5d7c1a2..dd510c1 100644 --- a/src/Ops/Pull/ResourcePullConcat/CompileOps.php +++ b/src/Ops/Pull/ResourcePullConcat/CompileOps.php @@ -17,10 +17,19 @@ use Phunkie\Types\ImmList; /** + * Compilation operations for ResourcePullConcat. Materialises concatenated resource pulls. + * * @method Pull pull() + * @method array getValues() + * @method \Phunkie\Streams\Type\Scope getScope() */ trait CompileOps { + /** + * Compile into an ImmList, applying any scope-registered maps. + * + * @return ImmList + */ public function toList(): ImmList { $list = ImmList(...$this->getValues()); @@ -32,12 +41,23 @@ public function toList(): ImmList return $list; } + /** + * Compile into a plain PHP array. + * + * @return array + */ public function toArray(): array { return $this->getValues(); } - public function runLog($bytes) + /** + * Run the concatenated resource pulls and log output as IO, reading up to 10 chunks. + * + * @param mixed $bytes Byte size hint for reading + * @return IO + */ + public function runLog($bytes): IO { return new IO(function () use ($bytes) { $log = []; diff --git a/src/Ops/Pull/ResourcePullConcat/FunctorOps.php b/src/Ops/Pull/ResourcePullConcat/FunctorOps.php index e608cc6..5d7d619 100644 --- a/src/Ops/Pull/ResourcePullConcat/FunctorOps.php +++ b/src/Ops/Pull/ResourcePullConcat/FunctorOps.php @@ -14,10 +14,18 @@ use Phunkie\Streams\Type\Scope; /** + * Functor operations for ResourcePullConcat. Registers maps on the scope for deferred execution. + * * @method Scope getScope() */ trait FunctorOps { + /** + * Register a mapping function to be applied to each output element at compile time. + * + * @param callable $f A => B + * @return static + */ public function mapOutput($f): static { $this->getScope()->addMap($f); diff --git a/src/Ops/Pull/ResourcePullConcat/ShowOps.php b/src/Ops/Pull/ResourcePullConcat/ShowOps.php index e02cc25..de4df5b 100644 --- a/src/Ops/Pull/ResourcePullConcat/ShowOps.php +++ b/src/Ops/Pull/ResourcePullConcat/ShowOps.php @@ -14,16 +14,28 @@ use Phunkie\Streams\Type\Pull; /** + * Show operations for ResourcePullConcat. Displays concatenated pull representations. + * * @method Pull getPull1() * @method Pull getPull2() */ trait ShowOps { + /** + * Return the type representation for concatenated resource pulls. + * + * @return string + */ public function showType(): string { return 'Byte'; } + /** + * Return a string showing both pulls joined by "++". + * + * @return string + */ public function toString(): string { return $this->getPull1()->toString() . " ++ " . $this->getPull2()->toString(); diff --git a/src/Ops/Pull/TransformationOps.php b/src/Ops/Pull/TransformationOps.php index 9f5f73a..f434158 100644 --- a/src/Ops/Pull/TransformationOps.php +++ b/src/Ops/Pull/TransformationOps.php @@ -16,10 +16,19 @@ use Phunkie\Streams\Type\Transformation; /** + * Transformation pipeline operations for Pull. Manages the lazy transformation chain + * that is applied when the pull is compiled. + * * @method Scope getScope() */ trait TransformationOps { + /** + * Append a transformation to the scope's pipeline, to be applied at compile time. + * + * @param Transformation $transformation The transformation to append + * @return static + */ public function appendTransformation(Transformation $transformation): static { $this->getScope()->appendTransformation($transformation); @@ -27,6 +36,14 @@ public function appendTransformation(Transformation $transformation): static return $this; } + /** + * Execute all pending transformations against the given chunk of data. + * Returns IO if effectful transformations are present and $acceptIo is true. + * + * @param iterable $chunk The input data to transform + * @param bool $acceptIo Whether to allow IO return (false forces array) + * @return iterable|IO + */ public function runTransformations(iterable $chunk, $acceptIo = true): iterable | IO { return $this->getScope()->runTransformations($chunk, $acceptIo); diff --git a/src/Ops/Pull/ValuesPull/CompileOps.php b/src/Ops/Pull/ValuesPull/CompileOps.php index 38a503a..95db2d1 100644 --- a/src/Ops/Pull/ValuesPull/CompileOps.php +++ b/src/Ops/Pull/ValuesPull/CompileOps.php @@ -17,12 +17,20 @@ use Phunkie\Types\ImmList; /** + * Compilation operations for ValuesPull. Materialises pull values into collections. + * * @method array getValues() * @method Scope getScope() * @method Pull pull() */ trait CompileOps { + /** + * Compile values into an ImmList, applying all pending transformations. + * Returns IO if effectful transformations are present. + * + * @return ImmList|IO + */ public function toList(): ImmList | IO { $list = $this->runTransformations($this->getValues(), false); @@ -30,16 +38,32 @@ public function toList(): ImmList | IO return $list instanceof IO ? $list : new ImmList(...$list); } + /** + * Compile values into a plain PHP array, applying all pending transformations. + * + * @return array + */ public function toArray(): array { return $this->runTransformations($this->getValues()); } + /** + * Run and log output. For ValuesPull, delegates to toArray. + * + * @param mixed $bytes Byte size (unused for value pulls) + * @return array + */ public function runLog($bytes): array { return $this->toArray(); } + /** + * Run all transformations for side effects, discarding output. Returns IO. + * + * @return IO + */ public function drain(): IO { return new IO(function () { diff --git a/src/Ops/Pull/ValuesPull/FunctorOps.php b/src/Ops/Pull/ValuesPull/FunctorOps.php index 55a98cd..6aec3a9 100644 --- a/src/Ops/Pull/ValuesPull/FunctorOps.php +++ b/src/Ops/Pull/ValuesPull/FunctorOps.php @@ -16,15 +16,29 @@ use Phunkie\Streams\Type\Scope; /** + * Functor operations for ValuesPull. Registers map transformations on the scope. + * * @method Scope getScope() */ trait FunctorOps { + /** + * Alias for map. Apply a function to each output element. + * + * @param callable $f A => B + * @return static + */ public function mapOutput($f): static { return $this->map($f); } + /** + * Register a map transformation to be applied when the pull is compiled. + * + * @param callable $f A => B + * @return static + */ public function map($f): static { $this->appendTransformation(map($f)); diff --git a/src/Ops/Pull/ValuesPull/ImmListOps.php b/src/Ops/Pull/ValuesPull/ImmListOps.php index 33894bc..c241a41 100644 --- a/src/Ops/Pull/ValuesPull/ImmListOps.php +++ b/src/Ops/Pull/ValuesPull/ImmListOps.php @@ -19,8 +19,20 @@ use Phunkie\Streams\Pull\ValuesPull; +/** + * List-like operations for ValuesPull. Provides take, filter, interleave, and more. + * + * @method array getValues() + * @method \Phunkie\Streams\Type\Scope getScope() + */ trait ImmListOps { + /** + * Take the first $n elements, returning a new ValuesPull with the same scope. + * + * @param int $n Number of elements to take + * @return ValuesPull + */ public function take(int $n): ValuesPull { $valuesPull = new ValuesPull( @@ -31,6 +43,12 @@ public function take(int $n): ValuesPull return $valuesPull; } + /** + * Register a filter transformation to keep elements matching the predicate. + * + * @param callable $f A => bool + * @return ValuesPull + */ public function filter(callable $f): ValuesPull { $this->appendTransformation(filter($f)); @@ -38,6 +56,12 @@ public function filter(callable $f): ValuesPull return $this; } + /** + * Register an interleave transformation to alternate elements with other pulls. + * + * @param \Phunkie\Streams\Type\Pull ...$other Pulls to interleave with + * @return ValuesPull + */ public function interleave(...$other): ValuesPull { $this->appendTransformation(interleave(...$other)); @@ -45,6 +69,12 @@ public function interleave(...$other): ValuesPull return $this; } + /** + * Register a takeWhile transformation to emit elements while the predicate holds. + * + * @param callable $predicate A => bool + * @return ValuesPull + */ public function takeWhile(callable $predicate): ValuesPull { $this->appendTransformation(takeWhile($predicate)); @@ -52,6 +82,12 @@ public function takeWhile(callable $predicate): ValuesPull return $this; } + /** + * Register a dropWhile transformation to skip elements while the predicate holds. + * + * @param callable $predicate A => bool + * @return ValuesPull + */ public function dropWhile(callable $predicate): ValuesPull { $this->appendTransformation(dropWhile($predicate)); @@ -59,6 +95,12 @@ public function dropWhile(callable $predicate): ValuesPull return $this; } + /** + * Register a chunk transformation to group elements into arrays of the given size. + * + * @param int $size Number of elements per chunk + * @return ValuesPull + */ public function chunk(int $size): ValuesPull { $this->appendTransformation(chunk($size)); diff --git a/src/Ops/Pull/ValuesPull/IteratorOps.php b/src/Ops/Pull/ValuesPull/IteratorOps.php index 0ffc9b2..9ee3ee9 100644 --- a/src/Ops/Pull/ValuesPull/IteratorOps.php +++ b/src/Ops/Pull/ValuesPull/IteratorOps.php @@ -11,28 +11,61 @@ namespace Phunkie\Streams\Ops\Pull\ValuesPull; +/** + * Iterator interface implementation for ValuesPull. Enables sequential element access. + * + * @method array getValues() + * @method int getIndex() + * @method void setIndex(int $index) + */ trait IteratorOps { + /** + * Reset the iterator to the first element. + * + * @return void + */ public function rewind(): void { $this->setIndex(0); } + /** + * Return the current element. + * + * @return mixed + */ public function current(): mixed { return $this->getValues()[$this->index]; } + /** + * Return the current index. + * + * @return mixed + */ public function key(): mixed { return $this->getIndex(); } + /** + * Check if the current position is valid. + * + * @return bool + */ public function valid(): bool { return $this->getIndex() < count($this->getValues()); } + /** + * Advance to the next element. + * + * @return void + * @throws \OutOfBoundsException If no more elements are available + */ public function next(): void { if (!$this->valid()) { diff --git a/src/Ops/Pull/ValuesPull/MonadOps.php b/src/Ops/Pull/ValuesPull/MonadOps.php index c453b5e..9a95e3c 100644 --- a/src/Ops/Pull/ValuesPull/MonadOps.php +++ b/src/Ops/Pull/ValuesPull/MonadOps.php @@ -17,10 +17,18 @@ use Phunkie\Streams\Type\Scope; /** + * Monad operations for ValuesPull. Registers flatMap and flatten transformations. + * * @method Scope getScope() */ trait MonadOps { + /** + * Register a flatMap transformation. The function should return an iterable for each element. + * + * @param callable $f A => iterable + * @return static + */ public function flatMap(callable $f): static { $this->appendTransformation(flatMap($f)); @@ -28,6 +36,11 @@ public function flatMap(callable $f): static return $this; } + /** + * Register a flatten transformation to unwrap nested iterables. + * + * @return static + */ public function flatten(): static { $this->appendTransformation(flatten()); diff --git a/src/Ops/Pull/ValuesPull/ShowOps.php b/src/Ops/Pull/ValuesPull/ShowOps.php index 1cc7d1b..b55d171 100644 --- a/src/Ops/Pull/ValuesPull/ShowOps.php +++ b/src/Ops/Pull/ValuesPull/ShowOps.php @@ -14,13 +14,28 @@ use function Phunkie\Functions\show\showArrayType; use function Phunkie\Functions\show\showValue; +/** + * Show operations for ValuesPull. Provides type and string representations. + * + * @method array getValues() + */ trait ShowOps { + /** + * Return the type representation based on the contained values. + * + * @return string + */ public function showType(): string { return showArrayType($this->getValues()); } + /** + * Return a comma-separated string of the contained values. + * + * @return string + */ public function toString(): string { return join(', ', array_map(fn ($x) => showValue($x), $this->getValues())); diff --git a/src/Ops/Stream/CompileOps.php b/src/Ops/Stream/CompileOps.php index fe7d25d..9374bdf 100644 --- a/src/Ops/Stream/CompileOps.php +++ b/src/Ops/Stream/CompileOps.php @@ -16,21 +16,39 @@ use Phunkie\Types\ImmList; /** + * Compilation operations for Stream. Materialises the stream into concrete values. + * * @method Pull getPull() + * @method int getBytes() */ trait CompileOps { + /** + * Compile the stream into an ImmList, or an IO if effectful transformations are present. + * + * @return ImmList|IO + */ public function toList(): ImmList | IO { return $this->getPull()->toList(); } + /** + * Compile the stream into a plain PHP array. + * + * @return array + */ public function toArray(): array { return $this->getPull()->toArray(); } - public function runLog() + /** + * Run the stream and collect output as a log. Used for resource-based streams. + * + * @return array|IO + */ + public function runLog(): array|IO { return $this->getPull()->runLog($this->getBytes()); } diff --git a/src/Ops/Stream/EffectfulOps.php b/src/Ops/Stream/EffectfulOps.php index 32285d0..f918a82 100644 --- a/src/Ops/Stream/EffectfulOps.php +++ b/src/Ops/Stream/EffectfulOps.php @@ -13,8 +13,19 @@ use Phunkie\Streams\Type\Stream; +/** + * Effectful operations for Stream. Wraps IO-producing functions into the stream pipeline. + * + * @method \Phunkie\Streams\Type\Pull getPull() + */ trait EffectfulOps { + /** + * Apply an effectful function to each element, using the IO result as the new value. + * + * @param callable $f A => IO + * @return Stream + */ public function evalMap($f): Stream { $this->getPull()->evalMap($f); @@ -22,6 +33,12 @@ public function evalMap($f): Stream return $this; } + /** + * Apply an effectful function for side effects, keeping the original element. + * + * @param callable $f A => IO + * @return Stream + */ public function evalTap($f): Stream { $this->getPull()->evalTap($f); @@ -29,6 +46,12 @@ public function evalTap($f): Stream return $this; } + /** + * Filter elements using an effectful predicate that returns IO. + * + * @param callable $f A => IO + * @return Stream + */ public function evalFilter($f): Stream { $this->getPull()->evalFilter($f); @@ -36,6 +59,12 @@ public function evalFilter($f): Stream return $this; } + /** + * FlatMap with an effectful function that returns IO. + * + * @param callable $f A => IO> + * @return Stream + */ public function evalFlatMap($f): Stream { $this->getPull()->evalFlatMap($f); diff --git a/src/Ops/Stream/FunctorOps.php b/src/Ops/Stream/FunctorOps.php index 3234925..7024eba 100644 --- a/src/Ops/Stream/FunctorOps.php +++ b/src/Ops/Stream/FunctorOps.php @@ -15,6 +15,8 @@ use Phunkie\Types\Kind; /** + * Functor operations for Stream. Provides element-wise mapping over stream values. + * * @method getPull() Phunkie\Streams\Type\Pull * @method as($b) Phunkie\Streams\Type\Stream */ @@ -22,6 +24,12 @@ trait FunctorOps { use \Phunkie\Ops\FunctorOps; + /** + * Apply a function to each element of the stream. + * + * @param callable $f A => B + * @return Kind|Stream + */ public function map($f): Kind | Stream { $this->getPull()->map($f); @@ -29,6 +37,13 @@ public function map($f): Kind | Stream return $this; } + /** + * Invariant map: apply covariant function $f (ignores contravariant $g). + * + * @param callable $f A => B + * @param callable $g B => A (unused, kept for interface compliance) + * @return Kind|Stream + */ public function imap(callable $f, callable $g): Kind | Stream { return $this->map($f); diff --git a/src/Ops/Stream/ImmListOps.php b/src/Ops/Stream/ImmListOps.php index df75629..7ac875a 100644 --- a/src/Ops/Stream/ImmListOps.php +++ b/src/Ops/Stream/ImmListOps.php @@ -18,10 +18,20 @@ use Phunkie\Streams\Type\Stream; /** + * List-like operations for Stream. Provides concat, interleave, take, filter, and more. + * * @method Pull getPull() + * @method int getBytes() */ trait ImmListOps { + /** + * Concatenate this stream with another stream of the same type. + * + * @param Stream $stream The stream to append + * @return Stream + * @throws \Error If mixing pure and non-pure streams + */ public function concat(Stream $stream): Stream { return match(get_class($this->getPull())) { @@ -36,6 +46,12 @@ public function concat(Stream $stream): Stream }; } + /** + * Interleave elements from this stream with elements from other streams. + * + * @param Stream ...$streams Streams to interleave with + * @return Stream + */ public function interleave(...$streams): Stream { $pulls = array_map(fn ($x) => $x->getPull(), $streams); @@ -43,36 +59,78 @@ public function interleave(...$streams): Stream return new Stream($this->getPull()->interleave(...$pulls), $this->getBytes()); } + /** + * Take the first $n elements from the stream. + * + * @param int $n Number of elements to take + * @return Stream + */ public function take(int $n): Stream { return new Stream($this->getPull()->take($n), $this->getBytes()); } + /** + * Keep only elements that satisfy the predicate. + * + * @param callable $f A => bool + * @return Stream + */ public function filter(callable $f): Stream { return new Stream($this->getPull()->filter($f), $this->getBytes()); } + /** + * Pass this stream through a pipe function (Stream => Stream). + * + * @param callable $pipe Stream => Stream + * @return Stream + */ public function through(callable $pipe): Stream { return $pipe($this); } + /** + * Take elements while the predicate holds true, then stop. + * + * @param callable $predicate A => bool + * @return Stream + */ public function takeWhile(callable $predicate): Stream { return new Stream($this->getPull()->takeWhile($predicate), $this->getBytes()); } + /** + * Drop elements while the predicate holds true, then emit the rest. + * + * @param callable $predicate A => bool + * @return Stream + */ public function dropWhile(callable $predicate): Stream { return new Stream($this->getPull()->dropWhile($predicate), $this->getBytes()); } + /** + * Group elements into chunks of the given size. + * + * @param int $size Number of elements per chunk + * @return Stream + */ public function chunk(int $size): Stream { return new Stream($this->getPull()->chunk($size), $this->getBytes()); } + /** + * Merge this stream with other streams by concatenating all values sequentially. + * + * @param Stream ...$streams Streams to merge + * @return Stream + */ public function merge(Stream ...$streams): Stream { // Merge is similar to concat but combines all streams @@ -84,6 +142,13 @@ public function merge(Stream ...$streams): Stream return Stream(...$allValues); } + /** + * Zip this stream with another, pairing corresponding elements as [a, b] arrays. + * Stops at the length of the shorter stream. + * + * @param Stream $other The stream to zip with + * @return Stream + */ public function zip(Stream $other): Stream { $thisValues = $this->getPull()->getValues(); diff --git a/src/Ops/Stream/MonadOps.php b/src/Ops/Stream/MonadOps.php index a8cbcc8..55197e9 100644 --- a/src/Ops/Stream/MonadOps.php +++ b/src/Ops/Stream/MonadOps.php @@ -15,10 +15,18 @@ use Phunkie\Types\Kind; /** + * Monad operations for Stream. Provides flatMap, flatten, ap, and bind. + * * @method getPull() Phunkie\Streams\Type\Pull */ trait MonadOps { + /** + * Apply a function that returns a stream to each element, then flatten the results. + * + * @param callable $f A => Stream + * @return Kind|Stream + */ public function flatMap(callable $f): Kind | Stream { $this->getPull()->flatMap($f); @@ -26,6 +34,11 @@ public function flatMap(callable $f): Kind | Stream return $this; } + /** + * Flatten a stream of streams into a single stream. + * + * @return Kind|Stream + */ public function flatten(): Kind | Stream { $this->getPull()->flatten(); @@ -33,11 +46,23 @@ public function flatten(): Kind | Stream return $this; } + /** + * Applicative apply: apply a stream of functions to this stream of values. + * + * @param Kind $f Stream of callable (A => B) + * @return Kind|Stream + */ public function ap(Kind $f): Kind | Stream { return $f->flatMap(fn ($g) => $this->map($g)); } + /** + * Alias for flatMap. + * + * @param callable $f A => Stream + * @return Kind|Stream + */ public function bind(callable $f): Kind | Stream { return $this->flatMap($f); diff --git a/src/Ops/Stream/ShowOps.php b/src/Ops/Stream/ShowOps.php index 567f288..8de22bc 100644 --- a/src/Ops/Stream/ShowOps.php +++ b/src/Ops/Stream/ShowOps.php @@ -14,20 +14,38 @@ use Phunkie\Streams\Type\Pull; /** + * Show operations for Stream. Provides type and string representations. + * * @method Pull getPull() + * @method string getEffect() */ trait ShowOps { + /** + * Return the type representation, e.g. "Stream". + * + * @return string + */ public function showType(): string { return sprintf("Stream<%s, %s>", $this->getTypeVariables()[0], $this->getTypeVariables()[1]); } + /** + * Return the type variables [effect, pullType] for this stream. + * + * @return array{0: string, 1: string} + */ public function getTypeVariables(): array { return [$this->getEffect(), $this->getPull()->showType()]; } + /** + * Return a short string representation of the stream. + * + * @return string + */ public function toString(): string { return 'Stream(..)'; diff --git a/src/Pull/InfinitePull.php b/src/Pull/InfinitePull.php index 4310817..29f8b48 100644 --- a/src/Pull/InfinitePull.php +++ b/src/Pull/InfinitePull.php @@ -25,6 +25,12 @@ use Phunkie\Streams\Type\Scope; use Phunkie\Streams\Type\Stream; +/** + * Pull backed by an Infinite generator source. + * + * Produces values lazily from a generator, suitable for unbounded or + * computationally-defined streams (e.g. repeat, iterate, unfold). + */ class InfinitePull implements Pull { use CompileOps; @@ -39,6 +45,10 @@ class InfinitePull implements Pull private int $bytes; private Scope $scope; + /** + * @param Infinite $infinite The generator-based infinite source. + * @param int $bytes Chunk size hint for downstream consumers. + */ public function __construct(Infinite $infinite, int $bytes = 256) { $this->infinite = $infinite; @@ -46,7 +56,15 @@ public function __construct(Infinite $infinite, int $bytes = 256) $this->scope = new Scope(identity); } - public function pull() + /** + * Returns the current value and advances to the next element. + * + * Unlike ValuesPull::pull(), this consumes the element so that successive + * calls yield successive generator values. + * + * @return mixed The value before advancing. + */ + public function pull(): mixed { $current = $this->current(); @@ -55,11 +73,21 @@ public function pull() return $current; } + /** + * Returns the underlying generator of values. + * + * @return \Generator + */ public function getValues(): \Generator { return $this->infinite->getValues(); } + /** + * Converts this pull into an infinite Stream, preserving the current scope. + * + * @return Stream + */ public function toStream(): Stream { $stream = Stream::fromInfinite($this->infinite, $this->bytes); @@ -68,16 +96,32 @@ public function toStream(): Stream return $stream; } + /** + * Returns the wrapped Infinite source. + * + * @return Infinite + */ public function getInfinite(): Infinite { return $this->infinite; } + /** + * Returns the current scope. + * + * @return Scope + */ public function getScope(): Scope { return $this->scope; } + /** + * Replace the current scope. + * + * @param Scope $scope The new scope. + * @return static + */ public function setScope(Scope $scope): static { $this->scope = $scope; diff --git a/src/Pull/PDOPull.php b/src/Pull/PDOPull.php index 1fba934..0ff510e 100644 --- a/src/Pull/PDOPull.php +++ b/src/Pull/PDOPull.php @@ -18,6 +18,12 @@ use Phunkie\Streams\Type\Scope; use Phunkie\Streams\Type\Stream; +/** + * Pull backed by a PDOStatement result set. + * + * Iterates over query results row-by-row, fetching each row as an + * associative array (PDO::FETCH_ASSOC). + */ class PDOPull implements Pull { use CompileOps; @@ -27,21 +33,39 @@ class PDOPull implements Pull private int $key = 0; private Scope $scope; + /** + * @param PDOStatement $stmt An already-executed statement to iterate over. + */ public function __construct(private PDOStatement $stmt) { $this->scope = new Scope(); } - public function pull() + /** + * Returns the current row without advancing. + * + * @return array|null The current row as an associative array, or null if exhausted. + */ + public function pull(): mixed { return $this->current(); } + /** + * Returns the current scope. + * + * @return Scope + */ public function getScope(): Scope { return $this->scope; } + /** + * Converts this pull into a Stream, preserving the current scope. + * + * @return Stream + */ public function toStream(): Stream { $stream = \Stream($this); @@ -50,6 +74,12 @@ public function toStream(): Stream return $stream; } + /** + * Replace the current scope. + * + * @param Scope $scope The new scope. + * @return static + */ public function setScope(Scope $scope): static { $this->scope = $scope; @@ -57,29 +87,53 @@ public function setScope(Scope $scope): static return $this; } + /** + * Returns the current row. + * + * @return mixed + */ #[\ReturnTypeWillChange] public function current() { return $this->current; } + /** + * Alias for valid(); returns true while rows remain. + * + * @return bool + */ public function hasNext(): bool { return $this->valid(); } - // Iterator interface implementation + /** + * Resets the key and fetches the first row. + * + * @return void + */ public function rewind(): void { $this->key = 0; $this->next(); // Fetch first row } + /** + * Returns the current row index (1-based after first next()). + * + * @return int + */ public function key(): int { return $this->key; } + /** + * Fetches the next row as an associative array, or sets current to null at end of results. + * + * @return void + */ public function next(): void { $result = $this->stmt->fetch(\PDO::FETCH_ASSOC); @@ -91,11 +145,23 @@ public function next(): void } } + /** + * Returns true while the current row is not null. + * + * @return bool + */ public function valid(): bool { return $this->current !== null; } + /** + * Consumes the remaining result set and returns all rows. + * + * If iteration has not started yet, rewinds first. + * + * @return array> All remaining rows. + */ public function getValues(): array { $values = []; diff --git a/src/Pull/ResourceObjectPull.php b/src/Pull/ResourceObjectPull.php index f6db441..879079c 100644 --- a/src/Pull/ResourceObjectPull.php +++ b/src/Pull/ResourceObjectPull.php @@ -20,9 +20,11 @@ use Phunkie\Streams\Type\Stream; /** - * Pull for Resource objects (like SocketRead, HttpRequest, etc.) - * Unlike ResourcePull which expects a raw stream resource, - * this works with Resource interface implementations + * Pull backed by a Resource interface object (SocketRead, HttpRequest, etc.). + * + * Unlike ResourcePull which wraps a raw PHP stream resource, this class + * works with Resource interface implementations that define their own + * pull($chunkSize) method and EOF sentinel. */ class ResourceObjectPull implements Pull { @@ -35,6 +37,10 @@ class ResourceObjectPull implements Pull private Scope $scope; private bool $eof = false; + /** + * @param Resource $resource The Resource implementation to read from. + * @param int $chunkSize Bytes to request per pull. + */ public function __construct( private Resource $resource, private int $chunkSize = 4096 @@ -43,21 +49,42 @@ public function __construct( $this->scope = new Scope(); } - public function pull() + /** + * Returns the current chunk without advancing. + * + * @return mixed The current chunk, or null if EOF reached or before first next(). + */ + public function pull(): mixed { return $this->current(); } + /** + * Returns the current scope. + * + * @return Scope + */ public function getScope(): Scope { return $this->scope; } + /** + * Converts this pull into a Stream backed by the same Resource object. + * + * @return Stream + */ public function toStream(): Stream { return Stream::fromResourceObject($this->resource); } + /** + * Replace the current scope. + * + * @param Scope $scope The new scope. + * @return static + */ public function setScope(Scope $scope): static { $this->scope = $scope; @@ -65,27 +92,54 @@ public function setScope(Scope $scope): static return $this; } + /** + * Returns the last-read chunk. + * + * @return mixed + */ public function current(): mixed { return $this->current; } + /** + * Returns true while EOF has not been reached. + * + * @return bool + */ public function hasNext(): bool { return !$this->eof; } + /** + * Resets the key counter. Resource objects typically cannot rewind. + * + * @return void + */ public function rewind(): void { // Resource objects typically can't rewind $this->key = 0; } + /** + * Returns the current chunk index. + * + * @return int + */ public function key(): int { return $this->key; } + /** + * Pulls the next chunk from the Resource object. + * + * Sets EOF when Resource::EOF is returned. + * + * @throws \OutOfBoundsException If already at EOF. + */ public function next(): void { if ($this->eof) { @@ -103,11 +157,21 @@ public function next(): void } } + /** + * Returns true while EOF has not been reached. + * + * @return bool + */ public function valid(): bool { return $this->hasNext(); } + /** + * Consumes the entire resource, applying scope maps and filters to each chunk. + * + * @return array All transformed and filtered chunks. + */ public function getValues(): array { $values = []; @@ -141,6 +205,12 @@ public function getValues(): array return $values; } + /** + * Registers a mapping function to be applied on getValues(). + * + * @param callable $f The mapping function. + * @return static + */ public function map($f): static { $this->getScope()->addMap($f); @@ -148,6 +218,12 @@ public function map($f): static return $this; } + /** + * Registers a filter predicate to be applied on getValues(). + * + * @param callable $f The filter predicate. + * @return static + */ public function filter(callable $f): static { $this->getScope()->addFilter($f); diff --git a/src/Pull/ResourcePull.php b/src/Pull/ResourcePull.php index c4cfef8..d80acd8 100644 --- a/src/Pull/ResourcePull.php +++ b/src/Pull/ResourcePull.php @@ -17,6 +17,12 @@ use Phunkie\Streams\Type\Scope; use Phunkie\Streams\Type\Stream; +/** + * Pull backed by a raw PHP stream resource (file handles, php://memory, etc.). + * + * Reads data in fixed-size chunks via fread(). The resource is closed + * automatically when this object is destroyed. + */ class ResourcePull implements Pull { use CompileOps; @@ -27,6 +33,12 @@ class ResourcePull implements Pull private $key; private Scope $scope; + /** + * @param resource $resource A PHP stream resource (e.g. from fopen()). + * @param int $chunkSize Bytes to read per iteration. + * + * @throws \InvalidArgumentException If $resource is not a valid stream resource. + */ public function __construct($resource, int $chunkSize = 1024) { if (!is_resource($resource) || get_resource_type($resource) !== 'stream') { @@ -39,16 +51,33 @@ public function __construct($resource, int $chunkSize = 1024) $this->scope = new Scope(); } - public function pull() + /** + * Returns the current chunk without advancing the iterator. + * + * Call next() first to populate the current value after construction. + * + * @return string|null The current chunk, or null before the first next(). + */ + public function pull(): mixed { return $this->current(); } + /** + * Returns the current scope. + * + * @return Scope + */ public function getScope(): Scope { return $this->scope; } + /** + * Converts this pull into a Stream, preserving the current scope. + * + * @return Stream + */ public function toStream(): Stream { $stream = Stream($this->resource, $this->chunkSize); @@ -57,6 +86,12 @@ public function toStream(): Stream return $stream; } + /** + * Replace the current scope. + * + * @param Scope $scope The new scope. + * @return static + */ public function setScope(Scope $scope): static { $this->scope = $scope; @@ -64,17 +99,32 @@ public function setScope(Scope $scope): static return $this; } + /** + * Returns the last-read chunk. + * + * @return mixed + */ #[\ReturnTypeWillChange] public function current() { return $this->current; } + /** + * Checks whether the underlying resource has more data. + * + * @return bool + */ public function hasNext(): bool { return !feof($this->resource); } + /** + * Rewinds the resource pointer and resets the key counter. + * + * @return void + */ public function rewind(): void { if (is_resource($this->resource)) { @@ -83,11 +133,22 @@ public function rewind(): void $this->key = 0; } + /** + * Returns the current chunk index. + * + * @return int + */ public function key(): int { return $this->key; } + /** + * Reads the next chunk from the resource. + * + * @throws \OutOfBoundsException If no more data is available. + * @throws \RuntimeException If fread() fails. + */ public function next(): void { if (!$this->hasNext()) { @@ -104,11 +165,23 @@ public function next(): void $this->key++; } + /** + * Returns true while the resource has not reached EOF. + * + * @return bool + */ public function valid(): bool { return $this->hasNext(); } + /** + * Consumes the entire resource and returns all chunks as an array. + * + * Rewinds the resource before reading. + * + * @return array + */ public function getValues(): array { $values = []; @@ -124,6 +197,7 @@ public function getValues(): array return $values; } + /** Closes the underlying resource handle. */ public function __destruct() { if (is_resource($this->resource)) { diff --git a/src/Pull/ResourcePullConcat.php b/src/Pull/ResourcePullConcat.php index 3387c6f..013e98b 100644 --- a/src/Pull/ResourcePullConcat.php +++ b/src/Pull/ResourcePullConcat.php @@ -17,6 +17,12 @@ use Phunkie\Streams\Type\Scope; use Phunkie\Streams\Type\Stream; +/** + * Concatenation of two ResourcePulls into a single Pull. + * + * Drains the first ResourcePull completely before switching to the second. + * Delegates all Iterator operations to the currently active pull. + */ class ResourcePullConcat implements Pull { use CompileOps; @@ -27,6 +33,10 @@ class ResourcePullConcat implements Pull private ResourcePull $currentPull; private Scope $scope; + /** + * @param ResourcePull $pull1 The first pull to drain. + * @param ResourcePull $pull2 The second pull, consumed after $pull1 is exhausted. + */ public function __construct(ResourcePull $pull1, ResourcePull $pull2) { $this->pull1 = $pull1; @@ -35,7 +45,14 @@ public function __construct(ResourcePull $pull1, ResourcePull $pull2) $this->scope = new Scope(); } - public function pull() + /** + * Pulls the current value, switching to pull2 when pull1 is exhausted. + * + * @return mixed The current chunk from the active pull. + * + * @throws \OutOfBoundsException If both pulls are exhausted. + */ + public function pull(): mixed { try { return $this->currentPull->pull(); @@ -50,11 +67,21 @@ public function pull() } } + /** + * Returns true if either pull still has data. + * + * @return bool + */ public function hasNext(): bool { return $this->currentPull->hasNext() || ($this->currentPull === $this->pull1 && $this->pull2->hasNext()); } + /** + * Rewinds both underlying pulls and resets to pull1. + * + * @return void + */ public function rewind(): void { $this->pull1->rewind(); @@ -62,11 +89,22 @@ public function rewind(): void $this->currentPull = $this->pull1; } + /** + * Returns the current scope. + * + * @return Scope + */ public function getScope(): Scope { return $this->scope; } + /** + * Replace the current scope. + * + * @param Scope $scope The new scope. + * @return static + */ public function setScope(Scope $scope): static { $this->scope = $scope; @@ -74,6 +112,11 @@ public function setScope(Scope $scope): static return $this; } + /** + * Converts this concatenated pull into a Stream, preserving the current scope. + * + * @return Stream + */ public function toStream(): Stream { $stream = Stream($this->pull1, $this->pull2); @@ -82,26 +125,51 @@ public function toStream(): Stream return $stream; } + /** + * Returns the first ResourcePull. + * + * @return ResourcePull + */ public function getPull1(): ResourcePull { return $this->pull1; } + /** + * Returns the second ResourcePull. + * + * @return ResourcePull + */ public function getPull2(): ResourcePull { return $this->pull2; } + /** + * Returns the current value from the active pull. + * + * @return mixed + */ public function current(): mixed { return $this->currentPull->current(); } + /** + * Returns the current key from the active pull. + * + * @return mixed + */ public function key(): mixed { return $this->currentPull->key(); } + /** + * Advances the active pull, switching to pull2 when pull1 is exhausted. + * + * @throws \OutOfBoundsException If both pulls are exhausted. + */ public function next(): void { try { @@ -116,11 +184,23 @@ public function next(): void } } + /** + * Returns true if either pull still has data. + * + * @return bool + */ public function valid(): bool { return $this->hasNext(); } + /** + * Consumes both pulls and returns all chunks as a single array. + * + * Rewinds before reading. + * + * @return array + */ public function getValues(): array { $values = []; diff --git a/src/Pull/ValuesPull.php b/src/Pull/ValuesPull.php index 484a164..402efc8 100644 --- a/src/Pull/ValuesPull.php +++ b/src/Pull/ValuesPull.php @@ -23,6 +23,12 @@ use Phunkie\Streams\Type\Scope; use Phunkie\Streams\Type\Stream; +/** + * Pull backed by an in-memory array of values. + * + * Wraps a variadic list of values and exposes them through the Pull/Iterator interface. + * Supports functor, monad, and compilation operations via traits. + */ class ValuesPull implements Pull { use ShowOps; @@ -39,9 +45,7 @@ class ValuesPull implements Pull private Scope $scope; /** - * ValuesPull constructor. - * - * @param mixed ...$values The values to be pulled from. + * @param mixed ...$values Values to expose through this pull. */ public function __construct(...$values) { @@ -50,21 +54,41 @@ public function __construct(...$values) $this->scope = new Scope(); } - public function pull() + /** + * Returns the current value without advancing the iterator. + * + * @return mixed The value at the current index. + */ + public function pull(): mixed { return $this->current(); } - public function getValues() + /** + * Returns the underlying values array. + * + * @return array + */ + public function getValues(): array { return $this->values; } + /** + * Returns the current scope. + * + * @return Scope + */ public function getScope(): Scope { return $this->scope; } + /** + * Converts this pull into a Stream, preserving the current scope. + * + * @return Stream + */ public function toStream(): Stream { $stream = Stream(...$this->values); @@ -73,6 +97,12 @@ public function toStream(): Stream return $stream; } + /** + * Replace the current scope. + * + * @param Scope $scope The new scope. + * @return static + */ public function setScope(Scope $scope): static { $this->scope = $scope; @@ -80,11 +110,22 @@ public function setScope(Scope $scope): static return $this; } + /** + * Returns the current iterator index. + * + * @return int + */ public function getIndex(): int { return $this->index; } + /** + * Set the iterator index to a specific position. + * + * @param int $index The new index. + * @return static + */ public function setIndex(int $index): static { $this->index = $index; diff --git a/src/Showable.php b/src/Showable.php index 80b46fe..932e8fa 100644 --- a/src/Showable.php +++ b/src/Showable.php @@ -11,9 +11,22 @@ namespace Phunkie\Streams; +/** + * Contract for types that can render themselves as human-readable strings. + */ interface Showable { + /** + * Return the value representation as a string. + * + * @return string + */ public function toString(): string; + /** + * Return a string describing this type (e.g. "Stream[IO, String]"). + * + * @return string + */ public function showType(): string; } diff --git a/src/Stream/Compiler.php b/src/Stream/Compiler.php index 40e78af..7b6f7ce 100644 --- a/src/Stream/Compiler.php +++ b/src/Stream/Compiler.php @@ -17,20 +17,35 @@ use Phunkie\Types\ImmList; /** - * @property ImmList|IO $toList - * @property array $toArray - * @property array $runLog + * Compiles a stream's Pull into a materialised value (list, array, or execution log). * + * Accessible as a property on Stream via $stream->compile, then chained + * with ->toList, ->toArray, ->drain, or ->runLog (as properties or methods). + * + * @property ImmList|IO $toList Compile the stream to an ImmList (or IO for effectful streams). + * @property array $toArray Compile the stream to a plain PHP array. + * @property array $runLog Execute the stream and return the execution log. */ class Compiler { use CompileOps; + /** + * @param Pull $pull The pull source to compile. + * @param int $bytes Chunk size in bytes for resource-based pulls. + */ public function __construct(private Pull $pull, private int $bytes) { } - public function __get($property) + /** + * Magic property accessor for drain, toList, toArray, and runLog. + * + * @param string $property The property name. + * @return mixed + * @throws \Error If the property is not recognised. + */ + public function __get($property): mixed { return match($property) { 'drain' => $this->drain(), @@ -41,17 +56,32 @@ public function __get($property) }; } + /** + * Return the Pull being compiled. + * + * @return Pull + */ public function getPull(): Pull { return $this->pull; } + /** + * Return the configured chunk size in bytes. + * + * @return int + */ public function getBytes(): int { return $this->bytes; } - private function drain() + /** + * Execute the stream for its side effects, discarding the output. + * + * @return \Phunkie\Effect\IO\IO + */ + private function drain(): IO { return $this->getPull()->drain(); } diff --git a/src/Type/Pull.php b/src/Type/Pull.php index 963f195..e30a741 100644 --- a/src/Type/Pull.php +++ b/src/Type/Pull.php @@ -15,7 +15,11 @@ use Phunkie\Streams\Showable; /** - * Pull represents a pull-based stream that can be compiled to produce values. + * Pull-based stream interface that lazily produces values on demand. + * + * Combines Iterator for traversal, Showable for display, and Compilable + * for materialising results. Concrete pulls (values, resources, infinite) + * implement the actual data-sourcing logic. * * @method static map(callable $f) * @method static flatMap(callable $f) @@ -35,7 +39,17 @@ */ interface Pull extends Showable, Compilable, \Iterator { - public function pull(); + /** + * Pull the next value from the underlying source. + * + * @return mixed + */ + public function pull(): mixed; + /** + * Lift this Pull back into a Stream. + * + * @return Stream + */ public function toStream(): Stream; } diff --git a/src/Type/Scope.php b/src/Type/Scope.php index 428456c..46c0fa2 100644 --- a/src/Type/Scope.php +++ b/src/Type/Scope.php @@ -13,13 +13,35 @@ use Phunkie\Effect\IO\IO; +/** + * Manages the transformation pipeline and filtering state for a stream. + * + * A Scope accumulates map, filter, and composed Transformation operations + * that are applied to each chunk when the stream is evaluated. + */ class Scope { + /** @var callable[] */ private array $callables = []; + + /** @var callable[] Accumulated map functions. */ private array $maps = []; + + /** @var callable[] Accumulated filter predicates. */ private array $filters = []; + + /** @var Transformation Composed transformation pipeline. */ private Transformation $transformation; + /** + * Append a transformation to the pipeline. + * + * If no transformation exists yet, the given one becomes the root; + * otherwise it is composed after the existing pipeline via andThen(). + * + * @param Transformation $transformation The transformation to append. + * @return void + */ public function appendTransformation(Transformation $transformation): void { if (!isset($this->transformation)) { @@ -30,26 +52,55 @@ public function appendTransformation(Transformation $transformation): void $this->transformation = $this->transformation->andThen($transformation); } + /** + * Register a map function to be applied to stream elements. + * + * @param callable $f The mapping function. + * @return void + */ public function addMap(callable $f): void { $this->maps[] = $f; } + /** + * @return callable[] + */ public function getMaps(): array { return $this->maps; } + /** + * Register a filter predicate to be applied to stream elements. + * + * @param callable $f The filter predicate. + * @return void + */ public function addFilter(callable $f): void { $this->filters[] = $f; } + /** + * @return callable[] + */ public function getFilters(): array { return $this->filters; } + /** + * Run the composed transformation pipeline against a chunk of data. + * + * For passthrough transformations (side-effect-only), the IO action is + * executed immediately. When $acceptIo is false, the IO result is + * unwrapped synchronously instead of being returned as an IO value. + * + * @param iterable $chunk The data chunk to transform. + * @param bool $acceptIo When true, passthrough results may be returned as IO. + * @return iterable|IO + */ public function runTransformations(iterable $chunk, $acceptIo = true): iterable | IO { if (!isset($this->transformation)) { diff --git a/src/Type/Stream.php b/src/Type/Stream.php index 1e3727b..a5c2d02 100644 --- a/src/Type/Stream.php +++ b/src/Type/Stream.php @@ -60,18 +60,38 @@ private function __construct(private Pull $pull, private int $bytes) { } + /** + * Create a pure stream from in-memory values. + * + * @param mixed ...$pull The values to stream. + * @return Stream + */ public static function fromValues(...$pull): Stream { return new Stream(new ValuesPull(...$pull), 256); } - public static function fromResource(Path $path, int $bytes = 256) + /** + * Create an effectful stream that reads from a file-system resource. + * + * @param Path $path Path to the resource to read. + * @param int $bytes Chunk size in bytes for each pull. + * @return Stream + */ + public static function fromResource(Path $path, int $bytes = 256): Stream { $resourcePull = new ResourcePull($path, $bytes); return new Stream($resourcePull, $bytes); } + /** + * Create an effectful stream from an already-opened Resource object. + * + * @param \Phunkie\Streams\IO\Resource $resource An open resource handle. + * @param int $bytes Chunk size in bytes for each pull. + * @return Stream + */ public static function fromResourceObject(\Phunkie\Streams\IO\Resource $resource, int $bytes = 4096): Stream { $resourceObjectPull = new ResourceObjectPull($resource, $bytes); @@ -79,22 +99,48 @@ public static function fromResourceObject(\Phunkie\Streams\IO\Resource $resource return new Stream($resourceObjectPull, $bytes); } + /** + * Create a stream from an arbitrary Pull implementation. + * + * @param Pull $pull The pull source to wrap. + * @param int $bytes Chunk size in bytes. + * @return Stream + */ public static function fromPull(Pull $pull, int $bytes = 256): Stream { return new Stream($pull, $bytes); } + /** + * Create a stream backed by an infinite generator. + * + * @param Infinite $infinite The infinite source. + * @param int $bytes Chunk size in bytes. + * @return Stream + */ public static function fromInfinite(Infinite $infinite, int $bytes = 256): Stream { return new Stream(new InfinitePull($infinite, $bytes), $bytes); } + /** + * Return the underlying Pull that drives this stream. + * + * @return Pull + */ protected function getPull(): Pull { return $this->pull; } - public function __get($property) + /** + * Magic property accessor for compile, repeat, runLog, toList, and toArray. + * + * @param string $property The property name. + * @return mixed + * @throws \Error If the property is not recognised. + */ + public function __get($property): mixed { return match($property) { 'compile' => $this->compile(), @@ -106,16 +152,32 @@ public function __get($property) }; } + /** + * Create a Compiler to materialise this stream's output. + * + * @return Compiler + */ public function compile(): Compiler { return new Compiler($this->getPull(), $this->getBytes()); } + /** + * Return a new stream that infinitely repeats this stream's values. + * + * @return Stream + */ public function repeat(): Stream { return self::fromInfinite(repeat(...$this->getPull()->getValues()), $this->getBytes()); } + /** + * Execute an effectful (resource-backed) stream and return its execution log. + * + * @return array + * @throws \Error If called on a pure stream. + */ public function runLog(): array { return $this->getPull() instanceof ResourcePull ? @@ -123,11 +185,23 @@ public function runLog(): array throw new \Error("Cannot call runlog on Pure Streams"); } - public function setScope(Scope $scope) + /** + * Assign a Scope (transformation pipeline) to this stream's pull. + * + * @param Scope $scope The scope to assign. + * @return void + */ + public function setScope(Scope $scope): void { $this->getPull()->setScope($scope); } + /** + * Compile a pure (values-backed) stream into an ImmList. + * + * @return ImmList + * @throws \Error If called on a resource-backed stream. + */ public function toList(): ImmList { if ($this->getPull() instanceof ValuesPull) { @@ -137,16 +211,32 @@ public function toList(): ImmList } } + /** + * Return the Kind type arity (always 2: effect type + element type). + * + * @return int + */ public function getTypeArity(): int { return 2; } + /** + * Get the chunk size in bytes used for internal processing. + * + * @return int + */ public function getBytes(): int { return $this->bytes; } + /** + * Set the chunk size in bytes used for internal processing. + * + * @param int $bytes The chunk size in bytes. + * @return static + */ public function setBytes(int $bytes): static { $this->bytes = $bytes; @@ -154,11 +244,32 @@ public function setBytes(int $bytes): static return $this; } + /** + * Return the underlying values from this stream's pull. + * + * @return array + */ + public function getValues(): array + { + return $this->pull->getValues(); + } + + /** + * Return the effect type identifier: "IO" for resource streams, "Pure" otherwise. + * + * @return string + */ public function getEffect(): string { return ($this->getPull() instanceof ResourcePull || $this->getPull() instanceof ResourceObjectPull) ? IO : Pure; } + /** + * Compile a pure (values-backed) stream into a plain PHP array. + * + * @return array + * @throws \Error If called on a resource-backed stream. + */ public function toArray(): array { if ($this->getPull() instanceof ValuesPull) { diff --git a/src/Type/Transformation.php b/src/Type/Transformation.php index 4424e20..6c7c161 100644 --- a/src/Type/Transformation.php +++ b/src/Type/Transformation.php @@ -13,18 +13,41 @@ use Phunkie\Effect\IO\IO; +/** + * Wraps a closure as a composable stream transformation. + * + * Transformations can be chained via andThen() and optionally bound to an + * effect class (e.g. IO) using array-access syntax: $transformation[IO::class]. + * Passthrough mode marks a transformation as side-effect-only, meaning the + * original data passes through unchanged after the effect executes. + */ class Transformation implements \ArrayAccess { + /** @var \Closure The transformation function applied to each chunk. */ private \Closure $f; + + /** @var class-string|null The effect class to wrap the result in, or null for pure transforms. */ private mixed $effect = null; + /** @var bool When true, the transformation is side-effect-only (data passes through). */ private bool $isPassthrough = false; + /** @param \Closure $f The transformation function to apply to each chunk. */ public function __construct(\Closure $f) { $this->f = $f; } + /** + * Apply this transformation to a chunk of data. + * + * If an effect class is set, the result is wrapped in a new instance + * of that effect. Otherwise the closure is invoked directly. + * + * @param iterable $chunk The data chunk to transform. + * @return iterable|IO + * @throws \Error If the configured effect class cannot be instantiated. + */ public function run(iterable $chunk): iterable | IO { if ($this->isEffectful()) { @@ -40,16 +63,29 @@ public function run(iterable $chunk): iterable | IO return ($this->f)($chunk); } + /** + * Compose this transformation with another, producing a new pipeline. + * + * @param Transformation $transformation The transformation to chain after this one. + * @return Transformation + */ public function andThen(Transformation $transformation): Transformation { return new Transformation(fn ($chunk) => $transformation->run($this->run($chunk))); } + /** @throws \Error Always -- Transformation does not support isset via ArrayAccess. */ public function offsetExists(mixed $offset): bool { throw new \Error('Transformation is not an array'); } + /** + * Set the effect class via array-access syntax (e.g. $t[IO::class]). + * + * @param class-string $offset Fully-qualified effect class name. + * @return $this + */ public function offsetGet(mixed $offset): mixed { $this->effect = $offset; @@ -57,36 +93,61 @@ public function offsetGet(mixed $offset): mixed return $this; } + /** @throws \Error Always -- Transformation does not support []= assignment. */ public function offsetSet(mixed $offset, mixed $value): void { throw new \Error('Transformation is not an array'); } + /** @throws \Error Always -- Transformation does not support unset via ArrayAccess. */ public function offsetUnset(mixed $offset): void { throw new \Error('Transformation is not an array'); } + /** + * Whether this transformation wraps its result in an effect. + * + * @return bool + */ private function isEffectful(): bool { return !is_null($this->effect); } + /** + * Whether this transformation is side-effect-only (data passes through). + * + * @return bool + */ public function isPassthrough(): bool { return $this->isPassthrough; } + /** + * Mark this transformation as passthrough (side-effect-only) or not. + * + * @param bool $isPassthrough Whether this transformation is passthrough. + * @return void + */ public function setPassthrough(bool $isPassthrough): void { $this->isPassthrough = $isPassthrough; } + /** + * @return class-string|null The effect class name, or null if pure. + */ public function getEffect(): mixed { return $this->effect; } + /** + * @param class-string|null $effect The effect class name to wrap results in. + * @return $this + */ public function setEffect(mixed $effect): Transformation { $this->effect = $effect;