From 2aeecc22532be4a6c0cb39c65f0b558446519c15 Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 20 Jan 2026 23:36:36 -0800 Subject: [PATCH 1/2] Adding AI generated source comments --- accumulate.go | 4 ++++ flame.go | 53 +++++++++++++++++++++++++++++++++++++-------------- map.go | 10 ++++++++++ reduce.go | 2 ++ 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/accumulate.go b/accumulate.go index afef868..2da0bcd 100644 --- a/accumulate.go +++ b/accumulate.go @@ -1,3 +1,7 @@ +// Package flame provides a lightweight, generic flow‑processing framework. +// It defines a set of nodes that can be wired together to build data pipelines. +// The nodes are generic over key and value types, enabling type‑safe collection +// operations such as map, reduce, join, and accumulation. package flame import "golang.org/x/exp/constraints" diff --git a/flame.go b/flame.go index 7256f68..4919f60 100644 --- a/flame.go +++ b/flame.go @@ -1,3 +1,8 @@ +// Package flame provides a lightweight, generic flow‑processing framework. +// It offers building blocks (nodes) for constructing concurrent data pipelines +// using Go generics. Nodes are wired together via channels and executed in goroutines +// managed by a Workflow. + package flame import ( @@ -8,38 +13,50 @@ import ( "golang.org/x/exp/constraints" ) +// Workflow represents a collection of processing nodes that will be executed concurrently. +// It holds a WaitGroup to synchronize node goroutines and an optional working directory. type Workflow struct { WaitGroup *sync.WaitGroup Nodes []Process WorkDir string } +// KeyValue is a generic pair used by keyed operations (e.g., join, accumulate). +// K must be an ordered type, V can be any type. type KeyValue[K constraints.Ordered, V any] struct { Key K Value V } +// Node represents a processing element that can be connected to an upstream emitter and produce an output channel. +// X is the input type, Y is the output type. type Node[X, Y any] interface { GetOutput() chan Y Connect(e Emitter[X]) } +// Receiver is a node that only consumes input; it does not produce output. +// It is used for sink‑like nodes. type Receiver[X any] interface { Connect(e Emitter[X]) } +// Emitter produces a channel of values of type X. type Emitter[X any] interface { GetOutput() chan X } +// Process is an internal interface implemented by all node types; start launches the node. type Process interface { start(wf *Workflow) } +// NewWorkflow constructs an empty workflow ready for nodes to be added. func NewWorkflow() *Workflow { return &Workflow{} } +// Start initializes the WaitGroup and launches all nodes in the workflow. func (wf *Workflow) Start() { wf.WaitGroup = &sync.WaitGroup{} for i := range wf.Nodes { @@ -47,37 +64,41 @@ func (wf *Workflow) Start() { } } +// SetWorkDir sets the working directory for the workflow, converting it to an absolute path. func (wf *Workflow) SetWorkDir(path string) { wf.WorkDir, _ = filepath.Abs(path) } +// Wait blocks until all node goroutines have completed. func (wf *Workflow) Wait() { wf.WaitGroup.Wait() } +// GetTmpDir creates a temporary directory inside the workflow's working directory. func (wf *Workflow) GetTmpDir() (string, error) { return os.MkdirTemp(wf.WorkDir, "flame_") } -/**************************/ -// Source Chan -/**************************/ - +// SourceChanNode receives values from a user‑provided channel and forwards them downstream. type SourceChanNode[X, Y any] struct { Source chan Y Outputs []chan Y } +// AddSourceChan adds a source node that reads from the supplied channel `i`. +// It returns a Node that can be connected to downstream processors. func AddSourceChan[Y any](w *Workflow, i chan Y) Node[any, Y] { n := &SourceChanNode[any, Y]{Source: i, Outputs: []chan Y{}} w.Nodes = append(w.Nodes, n) return n } +// Connect is a no‑op for SourceChanNode because it is a source; attempting to connect will panic. func (n *SourceChanNode[X, Y]) Connect(e Emitter[X]) { - //this should throw an error + panic("cannot connect source node") } +// start launches a goroutine that forwards values from the source channel to all outputs. func (n *SourceChanNode[X, Y]) start(wf *Workflow) { wf.WaitGroup.Add(1) go func() { @@ -93,31 +114,33 @@ func (n *SourceChanNode[X, Y]) start(wf *Workflow) { }() } +// GetOutput creates a new output channel for downstream nodes. func (n *SourceChanNode[X, Y]) GetOutput() chan Y { m := make(chan Y) n.Outputs = append(n.Outputs, m) return m } -/**************************/ -// Source -/**************************/ - +// SourceNode creates values by calling a user‑provided function that returns (Y, error). +// The node stops when the function returns an error. type SourceNode[X, Y any] struct { Source func() (Y, error) Outputs []chan Y } +// AddSource adds a source node that invokes the supplied function to produce values. func AddSource[Y any](w *Workflow, i func() (Y, error)) Node[any, Y] { n := &SourceNode[any, Y]{Source: i, Outputs: []chan Y{}} w.Nodes = append(w.Nodes, n) return n } +// Connect is a no‑op for SourceNode; sources cannot be wired upstream. func (n *SourceNode[X, Y]) Connect(e Emitter[X]) { - //this should throw an error + panic("cannot connect source node") } +// start runs the source function repeatedly until it returns an error, forwarding each value. func (n *SourceNode[X, Y]) start(wf *Workflow) { wf.WaitGroup.Add(1) go func() { @@ -137,32 +160,33 @@ func (n *SourceNode[X, Y]) start(wf *Workflow) { }() } +// GetOutput creates and registers an output channel for downstream nodes. func (n *SourceNode[X, Y]) GetOutput() chan Y { m := make(chan Y) n.Outputs = append(n.Outputs, m) return m } -/**************************/ -// Sink -/**************************/ - +// SinkNode consumes input values and passes them to a user‑provided sink function. type SinkNode[X, Y any] struct { Input chan X Sink func(X) } +// AddSink adds a sink node that calls the provided function `i` for each received value. func AddSink[X any](w *Workflow, i func(X)) Node[X, any] { n := &SinkNode[X, any]{Sink: i} w.Nodes = append(w.Nodes, n) return n } +// Connect wires the sink's input to the output of an upstream emitter. func (n *SinkNode[X, Y]) Connect(e Emitter[X]) { o := e.GetOutput() n.Input = o } +// start launches a goroutine that reads from the input channel and calls the sink function. func (n *SinkNode[X, Y]) start(wf *Workflow) { wf.WaitGroup.Add(1) go func() { @@ -175,6 +199,7 @@ func (n *SinkNode[X, Y]) start(wf *Workflow) { }() } +// GetOutput returns nil because sink nodes do not produce downstream output. func (n *SinkNode[X, Y]) GetOutput() chan Y { return nil } diff --git a/map.go b/map.go index 88cd6e5..72c347a 100644 --- a/map.go +++ b/map.go @@ -1,3 +1,7 @@ +// Package flame provides a lightweight, generic flow‑processing framework. +// A Node represents a step in a directed acyclic graph; each node consumes a channel of inputs and emits one or more output channels. +// The framework is deliberately minimal – it only coordinates goroutine launch, channel wiring and WaitGroup tracking. + package flame /**************************/ @@ -6,6 +10,11 @@ package flame // MapNode represents a flow step that takes an input X calls a function f(X) that // returns Y +// MapNode represents a flow step that transforms values of type X into values +// of type Y using the provided function `Proc`. It receives input values on the +// `Input` channel and forwards the transformed results to all registered output +// channels. + type MapNode[X, Y any] struct { Input chan X Outputs []chan Y @@ -13,6 +22,7 @@ type MapNode[X, Y any] struct { } // AddMapper adds a MapNode step to the workflow +// AddMapper adds a MapNode step to the workflow. The supplied function `f` is applied to every element that arrives on the input channel. The returned node can be connected to an upstream emitter and its output retrieved via GetOutput. func AddMapper[X, Y any](w *Workflow, f func(X) Y) Node[X, Y] { n := &MapNode[X, Y]{Proc: f, Outputs: []chan Y{}} w.Nodes = append(w.Nodes, n) diff --git a/reduce.go b/reduce.go index fbc16b4..c78e413 100644 --- a/reduce.go +++ b/reduce.go @@ -1,3 +1,5 @@ +// Package flame provides a lightweight, generic flow‑processing framework. +// It defines nodes for reducing streams of data using user‑supplied functions. package flame /**************************/ From 8ec53605091fd2fd2aa5240f10fb42f746acdf2f Mon Sep 17 00:00:00 2001 From: Kyle Ellrott Date: Tue, 20 Jan 2026 23:48:45 -0800 Subject: [PATCH 2/2] More examples in the README.md --- README.md | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 114 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 2f1d7e8..9317e81 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,11 @@ # FLAME: FLow frAMEwork -A proof of concept flow processing library built on Go Generics, which were -introduced in 1.18. +A flow processing library built on Go Generics -## Basic Map example -``` +## Map example +```go func Inc(a int) int { return a + 1 } @@ -38,4 +37,114 @@ func main() { fmt.Printf("out: %s\n", o) } } -``` \ No newline at end of file +``` + +## Reduce example +```go +func Sum(a, b int) int { return a + b } + +func main() { + in := make(chan int, 5) + wf := flame.NewWorkflow() + src := flame.AddSourceChan(wf, in) + // Reduce starts with initial value 0 and applies Sum to each element. + red := flame.AddReducer[int, int](wf, Sum, 0) + red.Connect(src) + out := red.GetOutput() + wf.Start() + go func(){ + for _, n := range []int{1,2,3,4,5} { in <- n } + close(in) + }() + fmt.Printf("sum: %d\n", <-out) +} +``` + +## FlatMap example +```go +func Split(s string) []string { return strings.Split(s, "") } + +func main() { + in := make(chan string, 1) + wf := flame.NewWorkflow() + src := flame.AddSourceChan(wf, in) + fm := flame.AddFlatMapper(wf, Split) + fm.Connect(src) + out := fm.GetOutput() + wf.Start() + in <- "hello" + close(in) + for v := range out { fmt.Println(v) } +} +``` + +## Join (zip) example +```go +func main() { + // Create source channels + aChan := make(chan int) + bChan := make(chan string) + wf := flame.NewWorkflow() + // Add source nodes + a := flame.AddSourceChan(wf, aChan) + b := flame.AddSourceChan(wf, bChan) + // Join node that zips int and string into a struct + j := flame.AddJoin[int, string, struct{I int; S string}](wf, func(l <-chan int, r <-chan string, out chan struct{I int; S string}) { + for { + lv, lok := <-l + rv, rok := <-r + if !lok || !rok { break } + out <- struct{I int; S string}{lv, rv} + } + }) + j.ConnectLeft(a) + j.ConnectRight(b) + out := j.GetOutput() + wf.Start() + // Feed data + go func(){ + for _, v := range []int{1,2,3} { aChan <- v } + close(aChan) + }() + go func(){ + for _, v := range []string{"a","b","c"} { bChan <- v } + close(bChan) + }() + for v := range out { fmt.Printf("%v\n", v) } +} +``` + +## Join Example +```go +func main() { + a := flame.AddSourceSlice([]int{1,2,3}) + b := flame.AddSourceSlice([]string{"a","b","c"}) + wf := flame.NewWorkflow() + j := flame.AddJoin[int, string, struct{I int; S string}](wf, func(l <-chan int, r <-chan string, out chan struct{I int; S string}) { + for { + lv, lok := <-l + rv, rok := <-r + if !lok || !rok { break } + out <- struct{I int; S string}{lv, rv} + } + }) + j.ConnectLeft(a) + j.ConnectRight(b) + out := j.GetOutput() + wf.Start() + for v := range out { fmt.Printf("%v\n", v) } +} +``` + +## Source → sink example +```go +func main() { + wf := flame.NewWorkflow() + src := flame.AddSourceSlice([]int{1,2,3,4}) + sink := flame.AddSink[int](wf, func(v int){ fmt.Println("got", v) }) + sink.Connect(src) + wf.Start() + wf.Wait() +} +``` +