Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 114 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@

# FLAME: FLow frAMEwork

A proof of concept flow processing library built on Go Generics, which were
introduced in 1.18.
A flow processing library built on Go Generics


## Basic Map example
```
## Map example
```go
func Inc(a int) int {
return a + 1
}
Expand Down Expand Up @@ -38,4 +37,114 @@ func main() {
fmt.Printf("out: %s\n", o)
}
}
```
```

## Reduce example
```go
func Sum(a, b int) int { return a + b }

func main() {
in := make(chan int, 5)
wf := flame.NewWorkflow()
src := flame.AddSourceChan(wf, in)
// Reduce starts with initial value 0 and applies Sum to each element.
red := flame.AddReducer[int, int](wf, Sum, 0)
red.Connect(src)
out := red.GetOutput()
wf.Start()
go func(){
for _, n := range []int{1,2,3,4,5} { in <- n }
close(in)
}()
fmt.Printf("sum: %d\n", <-out)
}
```

## FlatMap example
```go
func Split(s string) []string { return strings.Split(s, "") }

func main() {
in := make(chan string, 1)
wf := flame.NewWorkflow()
src := flame.AddSourceChan(wf, in)
fm := flame.AddFlatMapper(wf, Split)
fm.Connect(src)
out := fm.GetOutput()
wf.Start()
in <- "hello"
close(in)
for v := range out { fmt.Println(v) }
}
```

## Join (zip) example
```go
func main() {
// Create source channels
aChan := make(chan int)
bChan := make(chan string)
wf := flame.NewWorkflow()
// Add source nodes
a := flame.AddSourceChan(wf, aChan)
b := flame.AddSourceChan(wf, bChan)
// Join node that zips int and string into a struct
j := flame.AddJoin[int, string, struct{I int; S string}](wf, func(l <-chan int, r <-chan string, out chan struct{I int; S string}) {
for {
lv, lok := <-l
rv, rok := <-r
if !lok || !rok { break }
out <- struct{I int; S string}{lv, rv}
}
})
j.ConnectLeft(a)
j.ConnectRight(b)
out := j.GetOutput()
wf.Start()
// Feed data
go func(){
for _, v := range []int{1,2,3} { aChan <- v }
close(aChan)
}()
go func(){
for _, v := range []string{"a","b","c"} { bChan <- v }
close(bChan)
}()
for v := range out { fmt.Printf("%v\n", v) }
}
```

## Join Example
```go
func main() {
a := flame.AddSourceSlice([]int{1,2,3})
b := flame.AddSourceSlice([]string{"a","b","c"})
wf := flame.NewWorkflow()
j := flame.AddJoin[int, string, struct{I int; S string}](wf, func(l <-chan int, r <-chan string, out chan struct{I int; S string}) {
for {
lv, lok := <-l
rv, rok := <-r
if !lok || !rok { break }
out <- struct{I int; S string}{lv, rv}
}
})
j.ConnectLeft(a)
j.ConnectRight(b)
out := j.GetOutput()
wf.Start()
for v := range out { fmt.Printf("%v\n", v) }
}
```

## Source → sink example
```go
func main() {
wf := flame.NewWorkflow()
src := flame.AddSourceSlice([]int{1,2,3,4})
sink := flame.AddSink[int](wf, func(v int){ fmt.Println("got", v) })
sink.Connect(src)
wf.Start()
wf.Wait()
}
```

4 changes: 4 additions & 0 deletions accumulate.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Package flame provides a lightweight, generic flow‑processing framework.
// It defines a set of nodes that can be wired together to build data pipelines.
// The nodes are generic over key and value types, enabling type‑safe collection
// operations such as map, reduce, join, and accumulation.
package flame

import "golang.org/x/exp/constraints"
Expand Down
53 changes: 39 additions & 14 deletions flame.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Package flame provides a lightweight, generic flow‑processing framework.
// It offers building blocks (nodes) for constructing concurrent data pipelines
// using Go generics. Nodes are wired together via channels and executed in goroutines
// managed by a Workflow.

package flame

import (
Expand All @@ -8,76 +13,92 @@ import (
"golang.org/x/exp/constraints"
)

// Workflow represents a collection of processing nodes that will be executed concurrently.
// It holds a WaitGroup to synchronize node goroutines and an optional working directory.
type Workflow struct {
WaitGroup *sync.WaitGroup
Nodes []Process
WorkDir string
}

// KeyValue is a generic pair used by keyed operations (e.g., join, accumulate).
// K must be an ordered type, V can be any type.
type KeyValue[K constraints.Ordered, V any] struct {
Key K
Value V
}

// Node represents a processing element that can be connected to an upstream emitter and produce an output channel.
// X is the input type, Y is the output type.
type Node[X, Y any] interface {
GetOutput() chan Y
Connect(e Emitter[X])
}

// Receiver is a node that only consumes input; it does not produce output.
// It is used for sink‑like nodes.
type Receiver[X any] interface {
Connect(e Emitter[X])
}

// Emitter produces a channel of values of type X.
type Emitter[X any] interface {
GetOutput() chan X
}

// Process is an internal interface implemented by all node types; start launches the node.
type Process interface {
start(wf *Workflow)
}

// NewWorkflow constructs an empty workflow ready for nodes to be added.
func NewWorkflow() *Workflow {
return &Workflow{}
}

// Start initializes the WaitGroup and launches all nodes in the workflow.
func (wf *Workflow) Start() {
wf.WaitGroup = &sync.WaitGroup{}
for i := range wf.Nodes {
wf.Nodes[i].start(wf)
}
}

// SetWorkDir sets the working directory for the workflow, converting it to an absolute path.
func (wf *Workflow) SetWorkDir(path string) {
wf.WorkDir, _ = filepath.Abs(path)
}

// Wait blocks until all node goroutines have completed.
func (wf *Workflow) Wait() {
wf.WaitGroup.Wait()
}

// GetTmpDir creates a temporary directory inside the workflow's working directory.
func (wf *Workflow) GetTmpDir() (string, error) {
return os.MkdirTemp(wf.WorkDir, "flame_")
}

/**************************/
// Source Chan
/**************************/

// SourceChanNode receives values from a user‑provided channel and forwards them downstream.
type SourceChanNode[X, Y any] struct {
Source chan Y
Outputs []chan Y
}

// AddSourceChan adds a source node that reads from the supplied channel `i`.
// It returns a Node that can be connected to downstream processors.
func AddSourceChan[Y any](w *Workflow, i chan Y) Node[any, Y] {
n := &SourceChanNode[any, Y]{Source: i, Outputs: []chan Y{}}
w.Nodes = append(w.Nodes, n)
return n
}

// Connect is a no‑op for SourceChanNode because it is a source; attempting to connect will panic.
func (n *SourceChanNode[X, Y]) Connect(e Emitter[X]) {
//this should throw an error
panic("cannot connect source node")
}

// start launches a goroutine that forwards values from the source channel to all outputs.
func (n *SourceChanNode[X, Y]) start(wf *Workflow) {
wf.WaitGroup.Add(1)
go func() {
Expand All @@ -93,31 +114,33 @@ func (n *SourceChanNode[X, Y]) start(wf *Workflow) {
}()
}

// GetOutput creates a new output channel for downstream nodes.
func (n *SourceChanNode[X, Y]) GetOutput() chan Y {
m := make(chan Y)
n.Outputs = append(n.Outputs, m)
return m
}

/**************************/
// Source
/**************************/

// SourceNode creates values by calling a user‑provided function that returns (Y, error).
// The node stops when the function returns an error.
type SourceNode[X, Y any] struct {
Source func() (Y, error)
Outputs []chan Y
}

// AddSource adds a source node that invokes the supplied function to produce values.
func AddSource[Y any](w *Workflow, i func() (Y, error)) Node[any, Y] {
n := &SourceNode[any, Y]{Source: i, Outputs: []chan Y{}}
w.Nodes = append(w.Nodes, n)
return n
}

// Connect is a no‑op for SourceNode; sources cannot be wired upstream.
func (n *SourceNode[X, Y]) Connect(e Emitter[X]) {
//this should throw an error
panic("cannot connect source node")
}

// start runs the source function repeatedly until it returns an error, forwarding each value.
func (n *SourceNode[X, Y]) start(wf *Workflow) {
wf.WaitGroup.Add(1)
go func() {
Expand All @@ -137,32 +160,33 @@ func (n *SourceNode[X, Y]) start(wf *Workflow) {
}()
}

// GetOutput creates and registers an output channel for downstream nodes.
func (n *SourceNode[X, Y]) GetOutput() chan Y {
m := make(chan Y)
n.Outputs = append(n.Outputs, m)
return m
}

/**************************/
// Sink
/**************************/

// SinkNode consumes input values and passes them to a user‑provided sink function.
type SinkNode[X, Y any] struct {
Input chan X
Sink func(X)
}

// AddSink adds a sink node that calls the provided function `i` for each received value.
func AddSink[X any](w *Workflow, i func(X)) Node[X, any] {
n := &SinkNode[X, any]{Sink: i}
w.Nodes = append(w.Nodes, n)
return n
}

// Connect wires the sink's input to the output of an upstream emitter.
func (n *SinkNode[X, Y]) Connect(e Emitter[X]) {
o := e.GetOutput()
n.Input = o
}

// start launches a goroutine that reads from the input channel and calls the sink function.
func (n *SinkNode[X, Y]) start(wf *Workflow) {
wf.WaitGroup.Add(1)
go func() {
Expand All @@ -175,6 +199,7 @@ func (n *SinkNode[X, Y]) start(wf *Workflow) {
}()
}

// GetOutput returns nil because sink nodes do not produce downstream output.
func (n *SinkNode[X, Y]) GetOutput() chan Y {
return nil
}
10 changes: 10 additions & 0 deletions map.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Package flame provides a lightweight, generic flow‑processing framework.
// A Node represents a step in a directed acyclic graph; each node consumes a channel of inputs and emits one or more output channels.
// The framework is deliberately minimal – it only coordinates goroutine launch, channel wiring and WaitGroup tracking.

package flame

/**************************/
Expand All @@ -6,13 +10,19 @@ package flame

// MapNode represents a flow step that takes an input X calls a function f(X) that
// returns Y
// MapNode represents a flow step that transforms values of type X into values
// of type Y using the provided function `Proc`. It receives input values on the
// `Input` channel and forwards the transformed results to all registered output
// channels.

type MapNode[X, Y any] struct {
Input chan X
Outputs []chan Y
Proc func(X) Y
}

// AddMapper adds a MapNode step to the workflow
// AddMapper adds a MapNode step to the workflow. The supplied function `f` is applied to every element that arrives on the input channel. The returned node can be connected to an upstream emitter and its output retrieved via GetOutput.
func AddMapper[X, Y any](w *Workflow, f func(X) Y) Node[X, Y] {
n := &MapNode[X, Y]{Proc: f, Outputs: []chan Y{}}
w.Nodes = append(w.Nodes, n)
Expand Down
2 changes: 2 additions & 0 deletions reduce.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package flame provides a lightweight, generic flow‑processing framework.
// It defines nodes for reducing streams of data using user‑supplied functions.
package flame

/**************************/
Expand Down
Loading