From b185a7b750c4660df967d2fd5f0c1d8953e4446d Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Mon, 6 Apr 2026 11:00:31 +0530 Subject: [PATCH 1/3] CHORE: AGENTS.md added --- AGENTS.md | 315 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..d6c480d --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,315 @@ +# caterpillar + +A Go CLI that reads a YAML pipeline configuration and executes an ordered chain (or DAG) of typed data-processing tasks. Tasks receive `*record.Record` values from an upstream buffered channel, transform or route them, and emit results downstream. + +The binary can operate in two modes depending on the pipeline: +- **Batch mode** (most pipelines) — runs, processes all records, and exits. +- **Server mode** — when the pipeline starts with an `http_server` task, the CLI acts as a long-running HTTP server. Incoming requests are converted to `*record.Record` values and emitted downstream to the rest of the pipeline. The binary does not exit until the server shuts down (or `end_after` is configured). + +Apply these instructions to `/Users/prasadlohakpure/Desktop/go_projects/src/github.com/patterninc/caterpillar`. Treat paths and commands below as relative to that location unless explicitly stated otherwise. + +## Tech Stack + +- **Language:** Go 1.22+ +- **Config format:** YAML (gopkg.in/yaml.v3) +- **Validation:** go-playground/validator/v10 +- **Testing:** standard `go test` +- **CI:** GitHub Actions (`.github/workflows/ci.yaml`) — runs `go build ./cmd/caterpillar/caterpillar.go` on every PR to `main`; does not run tests + +## Commands + +| Action | Command | +|---|---| +| Build | `go build ./cmd/caterpillar/caterpillar.go` | +| Run a pipeline | `./caterpillar -conf ` | + +**Do NOT run `go build ./...`** — three packages currently fail to build (see Known Broken Packages). + +## Directory Map + +| Directory | Purpose | +|---|---| +| `cmd/caterpillar/` | Entry point — parses `-conf` flag, loads YAML into `Pipeline`, calls `p.Run()` | +| `internal/pkg/pipeline/` | Core runtime: `Pipeline` struct, `Run()`, channel wiring, DAG execution engine | +| `internal/pkg/pipeline/tasks.go` | Task registry (`supportedTasks` map) — add new task types here | +| `internal/pkg/pipeline/task/` | `Task` interface and `Base` struct shared by all task packages | +| `internal/pkg/pipeline/task//` | One directory per task type; each exports `New() (task.Task, error)` | +| `internal/pkg/pipeline/task/aws/` | AWS-specific tasks (currently `parameter_store`) | +| `internal/pkg/pipeline/record/` | `Record` type — the unit of data flowing between tasks | +| `internal/pkg/config/` | `config.String` — Go template string that expands `{{ secret }}` and `{{ context }}` at runtime | +| `internal/pkg/jq/` | JQ query wrapper used by `Base.Context` to extract per-record context values | +| `test/pipelines/` | Example YAML pipeline files (most require live AWS/Kafka — not safe to run without dependencies) | +| `docs/` | Onboarding checklist, code-mint framework docs, outcomes, skills-status | +| `.agents/` | code-mint AI infrastructure (skills, rules, reports, status JSON) | + +## How to Add a New Task Type + +Use `internal/pkg/pipeline/task/echo/echo.go` as the canonical minimal example. + +### Step 1 — Create the package + +``` +internal/pkg/pipeline/task// + .go + README.md (document all YAML fields — follow echo/README.md format) +``` + +### Step 2 — Embed `task.Base` and implement `Run` + +```go +package mytask + +import ( + "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" +) + +type myTask struct { + task.Base `yaml:",inline" json:",inline"` // required — enables YAML decoding and satisfies interface + MyOption string `yaml:"my_option,omitempty" json:"my_option,omitempty"` +} + +func New() (task.Task, error) { + return &myTask{}, nil +} + +func (t *myTask) Run(input <-chan *record.Record, output chan<- *record.Record) error { + for { + r, ok := t.GetRecord(input) + if !ok { + break + } + // transform r.Data here + t.SendRecord(r, output) // also evaluates context: JQ expressions + } + return nil +} +``` + +Rules: +- `task.Base` **must** be embedded with `yaml:",inline" json:",inline"` — YAML decoding breaks without it. +- `New()` signature is always `func() (task.Task, error)` — keep it consistent with the registry even if init cannot fail. +- `Init()` is called once after unmarshaling, before `Run`. Override it only when you need one-time setup (e.g., creating a client). The `Base.Init()` no-op covers the default case. +- No `input` channel: task is a **source** (generates records). No `output` channel: task is a **sink**. Both present: transformer. +- `t.GetRecord(input)` safely reads from the channel and handles nil and close. `t.SendRecord(r, output)` evaluates `context:` JQ expressions and forwards the record. + +### Step 3 — Register in `tasks.go` + +Add an import and one entry to `supportedTasks` in `internal/pkg/pipeline/tasks.go`: + +```go +import ( + "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/mytask" +) + +var supportedTasks = map[string]func() (task.Task, error){ + // existing entries ... + `my_task_name`: mytask.New, +} +``` + +The map key becomes the `type:` value in pipeline YAML. + +### Step 4 — Verify + +```bash +go build ./cmd/caterpillar/caterpillar.go +``` + +## Task Interface + +Defined in `internal/pkg/pipeline/task/task.go`: + +```go +type Task interface { + Run(<-chan *record.Record, chan<- *record.Record) error + GetName() string + GetFailOnError() bool + GetTaskConcurrency() int + Init() error // called once after YAML unmarshal, before pipeline starts +} +``` + +`task.Base` satisfies every method except `Run`. Only override a method if you need non-default behavior. + +## Pipeline YAML Structure + +### Linear pipeline (implicit ordering by declaration) + +```yaml +channel_size: 10000 # optional; default 10000; buffer size between tasks + +tasks: + - name: source # unique within the pipeline; used in DAG expressions + type: file # must match a key in supportedTasks + path: ./input.txt + fail_on_error: false # optional; default false + task_concurrency: 1 # optional; default 1 — runs N concurrent workers for this task + context: + user_id: ".data | fromjson | .id" # JQ expressions stored per-record as context + + - name: transform + type: jq + path: '{ id: "{{ context "user_id" }}", raw: .data }' + + - name: sink + type: file + path: output/{{ context "user_id" }}.json +``` + +The last declared task is the sink (no output channel). Execution flows top-to-bottom when no `dag:` key is present. + +### DAG pipeline (experimental, v2.1.0+) + +Add a `dag:` key. All names in the expression must match `name:` values declared under `tasks:`. + +```yaml +tasks: + - name: read + type: file + path: input.txt + - name: branch_a + type: jq + path: '.data | { a: . }' + - name: branch_b + type: echo + - name: sink + type: file + path: output/result.json + +dag: read >> [branch_a, branch_b] >> sink +``` + +DAG syntax rules: +- `>>` — sequential step +- `[task1, task2]` — parallel fan-out/fan-in; records are broadcast to all branches and merged at the next `>>` +- Brackets must balance; no single-item groups; valid characters: letters, digits, `_`, `-`, `[`, `]`, `,`, `>`, whitespace +- Full syntax reference: `DAG_README.md` + +## Known Broken Packages + +Do not run `go test ./...` or `go build ./...`. + +### 1. `internal/pkg/pipeline/task/kinesis/` — WIP, do not touch + +- **Problem:** Imports `github.com/aws/aws-sdk-go-v2/service/kinesis`, which is not in `go.mod`. Does not build. +- **Status:** Untracked (not committed). Not registered in `tasks.go`. +- **Action:** Leave it alone. Do not add it to `go.mod` or register it in `tasks.go` without explicit instruction from the team. + +### 2. `internal/pkg/pipeline/task/file/` — production code is fine; test build fails + +- **Problem:** `file_success_path_test.go` (untracked) references `resolveSuccessObjectPath` and `writerSchemeFromPath`, which do not yet exist in the production package. The test was written ahead of the implementation. +- **Status:** `file.go` and `s3.go` build and run correctly. Only the test build is broken. +- **Action:** Do not run `go test ./internal/pkg/pipeline/task/file/`. The production package is safe to import and extend. + +### 3. Root package — scratch files with duplicate `main()` declarations + +- **Problem:** `push_sqs_localstack.go` and `push_kafka_message.go` both declare `package main` with a `func main()`, causing a duplicate-symbol error if you compile the root package. +- **Status:** Untracked (not committed). Used locally for manual testing. +- **Action:** Do not delete without asking. Do not attempt `go build .` at the repo root. + +## Conventions + +- `task.Base` must be embedded with `yaml:",inline" json:",inline"` in every task struct. Omitting the inline tag breaks YAML decoding. +- `New()` always returns `(task.Task, error)` — even tasks with no initialization that can fail. +- Template strings in YAML (`config.String`) expand `{{ secret "/path" }}` (fetched at startup) and `{{ context "key" }}` (evaluated per record from values set upstream via `context:` JQ expressions). +- Task names must be unique within a pipeline. In DAG mode, the `name:` field is the identifier used in the `dag:` expression. +- `fail_on_error: true` causes the pipeline to collect the task error and return it after all goroutines finish. Without it, errors are logged but execution continues. +- Concurrency per task is controlled with `task_concurrency`; the pipeline runs N goroutines calling the same `Run()` concurrently. The `Base` methods (`GetRecord`, `SendRecord`) are safe for concurrent use. + +## Smoke Path + +The smoke path is the one end-to-end test an agent can always run without any external dependencies (no AWS, no Kafka, no network). + +```bash +# Build +go build -o caterpillar ./cmd/caterpillar/caterpillar.go + +# Run +./caterpillar -conf test/pipelines/hello_name.yaml +``` + +Expected outcome: exits with code 0, prints name greetings to stdout, no external services required. If this fails, something fundamental is broken before any task-specific investigation. + +## SRE / Operational Model + +Caterpillar is a CLI tool, not a long-running server. There is no process to restart, no service to scale, and no health endpoint to query. + +**During an incident:** +1. Check CI logs first: `gh run list --repo patterninc/caterpillar` then `gh run view ` to inspect a specific run. +2. If CI is passing and runtime behavior is wrong, check the state of the relevant AWS service (S3 bucket access, SQS queue depth, SSM parameter existence) using the AWS Console or CLI. +3. There is no caterpillar daemon to restart — re-running the binary with a corrected YAML is the recovery action. + +**Release pipeline status:** Currently broken. The Docker Hub publish step fails because the `DOCKERHUB_USERNAME` and `DOCKERHUB_TOKEN` secrets are not configured in the repository's GitHub Actions secrets. Do not attempt to trigger a release until those secrets are set. + +## Environment Variables + +| Variable | Required | Description | +|---|---|---| +| `AWS_ACCESS_KEY_ID` | Required for AWS tasks | IAM access key for authenticating S3, SQS, SNS, and SSM tasks | +| `AWS_SECRET_ACCESS_KEY` | Required for AWS tasks | IAM secret key paired with `AWS_ACCESS_KEY_ID` | +| `AWS_REGION` | Required for all AWS tasks | AWS region to target (e.g., `us-east-1`) | +| `AWS_ENDPOINT_URL` | Optional | Override the AWS endpoint URL; use for LocalStack (e.g., `http://localhost:4566`) | + +No credentials are stored in this repository. Set these variables in your shell or CI environment before running pipelines that use AWS-backed tasks. + +## Pull Request Instructions + +All changes must go through a PR targeting `main`. Never push directly to `main`. + +### Branch naming + +``` +/ +``` + +Common types: `feat`, `fix`, `chore`, `refactor`, `docs`. Example: `feat/add-grpc-task`. + +### Before opening a PR + +1. Build must pass: `go build ./cmd/caterpillar/caterpillar.go` +2. If you added a new task type, confirm it is registered in `tasks.go` and has a `README.md` in its package directory. + +### PR title format + +``` +: +``` + +Examples: +- `feat: add grpc task` +- `fix: handle nil record in jq task` +- `chore: update go.mod dependencies` + +Keep titles under 72 characters. Do not include the issue number in the title. + +### PR body + +``` +## What +<1-3 sentences describing what changed> + +## Why + + +## Test plan +- [ ] `go build ./cmd/caterpillar/caterpillar.go` passes +- [ ] +``` + +### Creating the PR + +```bash +git checkout -b feat/my-change +git add +git commit -m "feat: my change" +gh pr create --title "feat: my change" --body "..." +``` + +Scope `git add` to specific files — never use `git add -A` or `git add .` to avoid accidentally staging scratch files or credentials. + +## Agent Permissions + +- **Autonomous:** Read files, run `go build ./cmd/caterpillar/caterpillar.go`, create branches, add new task packages following the pattern above. +- **Supervised:** Modify `internal/pkg/pipeline/tasks.go`, change `go.mod`/`go.sum`, open PRs, modify CI configuration. +- **Restricted:** Delete untracked scratch files at repo root, add anything to `internal/pkg/pipeline/task/kinesis/` or register it, modify `.github/CODEOWNERS`. From ca457e39ae07235d4b418f7daaec3425f9bec53d Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Mon, 6 Apr 2026 13:33:08 +0530 Subject: [PATCH 2/3] FEAT: Support for storage classes in file task type --- internal/pkg/pipeline/task/file/README.md | 38 +++++++++++++++- internal/pkg/pipeline/task/file/file.go | 12 ++++- internal/pkg/pipeline/task/file/s3.go | 7 +-- .../pkg/pipeline/task/file/storage_class.go | 30 +++++++++++++ .../context_test_with_storage_class.yaml | 44 +++++++++++++++++++ 5 files changed, 125 insertions(+), 6 deletions(-) create mode 100644 internal/pkg/pipeline/task/file/storage_class.go create mode 100644 test/pipelines/context_test_with_storage_class.yaml diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index 1612587..e4bcbc9 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -28,13 +28,39 @@ In read mode, the sanitized base filename is stored in the record context under |-------|------|---------|-------------| | `name` | string | - | Task name for identification | | `type` | string | `file` | Must be "file" | -| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (s3://bucket/key) supports glob patterns in reading mode +| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (`s3://bucket/key`); glob patterns supported in read mode | | `region` | string | `us-west-2` | AWS region for S3 operations | +| `storage_class` | string | `STANDARD` | S3 **write** only: on `PutObject`. Ignored for local paths. See [S3 storage class](#s3-storage-class). | | `delimiter` | string | `\n` | Delimiter used to separate records when reading | | `success_file` | bool | `false` | Whether to create a success file after writing | | `success_file_name` | string | `_SUCCESS` | Name of the success file | | `fail_on_error` | bool | `false` | Whether to stop the pipeline if this task encounters an error | +## S3 storage class + +When the write `path` is an S3 URI (`s3://...`), each object is uploaded with the configured `storage_class`. The same class applies to the optional `success_file` marker in that task. + +Allowed values are the **PutObject storage class** strings known to the AWS SDK in this build (invalid values fail when the task runs). Typical values include: + +| Value | Notes | +|-------|--------| +| `STANDARD` | Default | +| `REDUCED_REDUNDANCY` | RRS | +| `STANDARD_IA` | Infrequent access | +| `ONEZONE_IA` | Single AZ IA | +| `INTELLIGENT_TIERING` | Intelligent-Tiering | +| `GLACIER` | Glacier Flexible Retrieval (instantiation rules apply) | +| `DEEP_ARCHIVE` | Lowest-cost archive | +| `GLACIER_IR` | Glacier Instant Retrieval | +| `EXPRESS_ONEZONE` | S3 Express One Zone | +| `OUTPOSTS` | S3 on Outposts | +| `SNOW` | Snowball / Snow Family edge | +| `FSX_OPENZFS` | FSx for OpenZFS–backed directory buckets | + +AWS may add or adjust classes in newer SDK releases; if a value is rejected as unknown, compare with the [S3 PutObject storage class documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) or upgrade the SDK in this project. + +Read mode does not set storage class (objects are read as-is). + ## Path Schemes The task supports different path schemes: @@ -62,6 +88,16 @@ tasks: success_file: true ``` +### Writing to S3 with a non-default storage class: +```yaml +tasks: + - name: write_to_s3_ia + type: file + path: s3://my-bucket/logs/{{ macro "timestamp" }}.jsonl + region: us-east-1 + storage_class: STANDARD_IA +``` + ### Using macros in path: ```yaml tasks: diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 2cbedaf..4b79c20 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -46,13 +46,16 @@ type file struct { SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"` SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"` Region string `yaml:"region,omitempty" json:"region,omitempty"` + StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"` Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"` } func New() (task.Task, error) { + ensureStorageClasses() return &file{ Path: defaultPath, Region: defaultRegion, + StorageClass: defaultStorageClass, Delimiter: defaultDelimiter, SuccessFileName: defaultSuccessFileName, }, nil @@ -60,6 +63,10 @@ func New() (task.Task, error) { func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) error { + if err := validateStorageClass(f.StorageClass); err != nil { + return err + } + // let's check if we read file or we write file... if input != nil && output != nil { return task.ErrPresentInputOutput @@ -231,8 +238,9 @@ func (f *file) writeSuccessFile() error { } successFile := &file{ - Path: config.String(successFileName), - Region: f.Region, + Path: config.String(successFileName), + Region: f.Region, + StorageClass: f.StorageClass, } return writerFunction(successFile, nil, bytes.NewReader([]byte{})) diff --git a/internal/pkg/pipeline/task/file/s3.go b/internal/pkg/pipeline/task/file/s3.go index fc197ba..183d79d 100644 --- a/internal/pkg/pipeline/task/file/s3.go +++ b/internal/pkg/pipeline/task/file/s3.go @@ -90,9 +90,10 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error { } _, err = client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &bucket, - Key: &key, - Body: reader, + Bucket: &bucket, + Key: &key, + Body: reader, + StorageClass: f.StorageClass, }) return err diff --git a/internal/pkg/pipeline/task/file/storage_class.go b/internal/pkg/pipeline/task/file/storage_class.go new file mode 100644 index 0000000..4f23343 --- /dev/null +++ b/internal/pkg/pipeline/task/file/storage_class.go @@ -0,0 +1,30 @@ +package file + +import ( + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +type storageClass = types.StorageClass + +var ( + storageClasses map[string]storageClass + defaultStorageClass storageClass +) + +func ensureStorageClasses() { + vals := types.StorageClass("").Values() + storageClasses = make(map[string]storageClass, len(vals)) + for _, v := range vals { + storageClasses[string(v)] = v + } + defaultStorageClass = storageClasses["STANDARD"] +} + +func validateStorageClass(c storageClass) error { + if _, ok := storageClasses[string(c)]; ok { + return nil + } + return fmt.Errorf("storage_class: unknown value %q", c) +} diff --git a/test/pipelines/context_test_with_storage_class.yaml b/test/pipelines/context_test_with_storage_class.yaml new file mode 100644 index 0000000..d98431a --- /dev/null +++ b/test/pipelines/context_test_with_storage_class.yaml @@ -0,0 +1,44 @@ +tasks: + - name: fetch_user_data + type: http + endpoint: https://jsonplaceholder.typicode.com/users/2 + context: + user_id: ".data | fromjson | .id" + user_name: ".data | fromjson | .name" + user_email: ".data | fromjson | .email" + user_company: ".data | fromjson | .company.name" + + - name: form_posts_endpoint_with_context + type: jq + path: | + { + "endpoint": "https://jsonplaceholder.typicode.com/posts?userId={{ context "user_id" }}" + } + + - name: echo_posts_endpoint + type: echo + only_data: true + + - name: hit_endpoint_with_context + type: http + context: + "number_of_posts": ".data | fromjson | length" + + - name: transform_with_context_jq + type: jq + path: | + { + "user_id": {{ context "user_id" }}, + "user_name": "{{ context "user_name" }}", + "user_email": "{{ context "user_email" }}", + "user_company": "{{ context "user_company" }}", + "headers_content_type": "{{ context "http-header-Content-Type" }}", + "headers_age": "{{ context "http-header-Age" }}", + "number_of_posts": {{ context "number_of_posts" }}, + "original_data": . + } + + - name: file_with_context + type: file + path: s3://{{ env "bucket_name" }}/local/output/{{ context "user_name" }}_{{ macro "uuid" }}_record.txt + storage_class: STANDARD_IA From a864c335261e4a3a04887b6925e51b2cbd4403dc Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Mon, 6 Apr 2026 13:33:08 +0530 Subject: [PATCH 3/3] Removed agents file --- AGENTS.md | 315 ------------------------------------------------------ 1 file changed, 315 deletions(-) delete mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md deleted file mode 100644 index d6c480d..0000000 --- a/AGENTS.md +++ /dev/null @@ -1,315 +0,0 @@ -# caterpillar - -A Go CLI that reads a YAML pipeline configuration and executes an ordered chain (or DAG) of typed data-processing tasks. Tasks receive `*record.Record` values from an upstream buffered channel, transform or route them, and emit results downstream. - -The binary can operate in two modes depending on the pipeline: -- **Batch mode** (most pipelines) — runs, processes all records, and exits. -- **Server mode** — when the pipeline starts with an `http_server` task, the CLI acts as a long-running HTTP server. Incoming requests are converted to `*record.Record` values and emitted downstream to the rest of the pipeline. The binary does not exit until the server shuts down (or `end_after` is configured). - -Apply these instructions to `/Users/prasadlohakpure/Desktop/go_projects/src/github.com/patterninc/caterpillar`. Treat paths and commands below as relative to that location unless explicitly stated otherwise. - -## Tech Stack - -- **Language:** Go 1.22+ -- **Config format:** YAML (gopkg.in/yaml.v3) -- **Validation:** go-playground/validator/v10 -- **Testing:** standard `go test` -- **CI:** GitHub Actions (`.github/workflows/ci.yaml`) — runs `go build ./cmd/caterpillar/caterpillar.go` on every PR to `main`; does not run tests - -## Commands - -| Action | Command | -|---|---| -| Build | `go build ./cmd/caterpillar/caterpillar.go` | -| Run a pipeline | `./caterpillar -conf ` | - -**Do NOT run `go build ./...`** — three packages currently fail to build (see Known Broken Packages). - -## Directory Map - -| Directory | Purpose | -|---|---| -| `cmd/caterpillar/` | Entry point — parses `-conf` flag, loads YAML into `Pipeline`, calls `p.Run()` | -| `internal/pkg/pipeline/` | Core runtime: `Pipeline` struct, `Run()`, channel wiring, DAG execution engine | -| `internal/pkg/pipeline/tasks.go` | Task registry (`supportedTasks` map) — add new task types here | -| `internal/pkg/pipeline/task/` | `Task` interface and `Base` struct shared by all task packages | -| `internal/pkg/pipeline/task//` | One directory per task type; each exports `New() (task.Task, error)` | -| `internal/pkg/pipeline/task/aws/` | AWS-specific tasks (currently `parameter_store`) | -| `internal/pkg/pipeline/record/` | `Record` type — the unit of data flowing between tasks | -| `internal/pkg/config/` | `config.String` — Go template string that expands `{{ secret }}` and `{{ context }}` at runtime | -| `internal/pkg/jq/` | JQ query wrapper used by `Base.Context` to extract per-record context values | -| `test/pipelines/` | Example YAML pipeline files (most require live AWS/Kafka — not safe to run without dependencies) | -| `docs/` | Onboarding checklist, code-mint framework docs, outcomes, skills-status | -| `.agents/` | code-mint AI infrastructure (skills, rules, reports, status JSON) | - -## How to Add a New Task Type - -Use `internal/pkg/pipeline/task/echo/echo.go` as the canonical minimal example. - -### Step 1 — Create the package - -``` -internal/pkg/pipeline/task// - .go - README.md (document all YAML fields — follow echo/README.md format) -``` - -### Step 2 — Embed `task.Base` and implement `Run` - -```go -package mytask - -import ( - "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" - "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" -) - -type myTask struct { - task.Base `yaml:",inline" json:",inline"` // required — enables YAML decoding and satisfies interface - MyOption string `yaml:"my_option,omitempty" json:"my_option,omitempty"` -} - -func New() (task.Task, error) { - return &myTask{}, nil -} - -func (t *myTask) Run(input <-chan *record.Record, output chan<- *record.Record) error { - for { - r, ok := t.GetRecord(input) - if !ok { - break - } - // transform r.Data here - t.SendRecord(r, output) // also evaluates context: JQ expressions - } - return nil -} -``` - -Rules: -- `task.Base` **must** be embedded with `yaml:",inline" json:",inline"` — YAML decoding breaks without it. -- `New()` signature is always `func() (task.Task, error)` — keep it consistent with the registry even if init cannot fail. -- `Init()` is called once after unmarshaling, before `Run`. Override it only when you need one-time setup (e.g., creating a client). The `Base.Init()` no-op covers the default case. -- No `input` channel: task is a **source** (generates records). No `output` channel: task is a **sink**. Both present: transformer. -- `t.GetRecord(input)` safely reads from the channel and handles nil and close. `t.SendRecord(r, output)` evaluates `context:` JQ expressions and forwards the record. - -### Step 3 — Register in `tasks.go` - -Add an import and one entry to `supportedTasks` in `internal/pkg/pipeline/tasks.go`: - -```go -import ( - "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/mytask" -) - -var supportedTasks = map[string]func() (task.Task, error){ - // existing entries ... - `my_task_name`: mytask.New, -} -``` - -The map key becomes the `type:` value in pipeline YAML. - -### Step 4 — Verify - -```bash -go build ./cmd/caterpillar/caterpillar.go -``` - -## Task Interface - -Defined in `internal/pkg/pipeline/task/task.go`: - -```go -type Task interface { - Run(<-chan *record.Record, chan<- *record.Record) error - GetName() string - GetFailOnError() bool - GetTaskConcurrency() int - Init() error // called once after YAML unmarshal, before pipeline starts -} -``` - -`task.Base` satisfies every method except `Run`. Only override a method if you need non-default behavior. - -## Pipeline YAML Structure - -### Linear pipeline (implicit ordering by declaration) - -```yaml -channel_size: 10000 # optional; default 10000; buffer size between tasks - -tasks: - - name: source # unique within the pipeline; used in DAG expressions - type: file # must match a key in supportedTasks - path: ./input.txt - fail_on_error: false # optional; default false - task_concurrency: 1 # optional; default 1 — runs N concurrent workers for this task - context: - user_id: ".data | fromjson | .id" # JQ expressions stored per-record as context - - - name: transform - type: jq - path: '{ id: "{{ context "user_id" }}", raw: .data }' - - - name: sink - type: file - path: output/{{ context "user_id" }}.json -``` - -The last declared task is the sink (no output channel). Execution flows top-to-bottom when no `dag:` key is present. - -### DAG pipeline (experimental, v2.1.0+) - -Add a `dag:` key. All names in the expression must match `name:` values declared under `tasks:`. - -```yaml -tasks: - - name: read - type: file - path: input.txt - - name: branch_a - type: jq - path: '.data | { a: . }' - - name: branch_b - type: echo - - name: sink - type: file - path: output/result.json - -dag: read >> [branch_a, branch_b] >> sink -``` - -DAG syntax rules: -- `>>` — sequential step -- `[task1, task2]` — parallel fan-out/fan-in; records are broadcast to all branches and merged at the next `>>` -- Brackets must balance; no single-item groups; valid characters: letters, digits, `_`, `-`, `[`, `]`, `,`, `>`, whitespace -- Full syntax reference: `DAG_README.md` - -## Known Broken Packages - -Do not run `go test ./...` or `go build ./...`. - -### 1. `internal/pkg/pipeline/task/kinesis/` — WIP, do not touch - -- **Problem:** Imports `github.com/aws/aws-sdk-go-v2/service/kinesis`, which is not in `go.mod`. Does not build. -- **Status:** Untracked (not committed). Not registered in `tasks.go`. -- **Action:** Leave it alone. Do not add it to `go.mod` or register it in `tasks.go` without explicit instruction from the team. - -### 2. `internal/pkg/pipeline/task/file/` — production code is fine; test build fails - -- **Problem:** `file_success_path_test.go` (untracked) references `resolveSuccessObjectPath` and `writerSchemeFromPath`, which do not yet exist in the production package. The test was written ahead of the implementation. -- **Status:** `file.go` and `s3.go` build and run correctly. Only the test build is broken. -- **Action:** Do not run `go test ./internal/pkg/pipeline/task/file/`. The production package is safe to import and extend. - -### 3. Root package — scratch files with duplicate `main()` declarations - -- **Problem:** `push_sqs_localstack.go` and `push_kafka_message.go` both declare `package main` with a `func main()`, causing a duplicate-symbol error if you compile the root package. -- **Status:** Untracked (not committed). Used locally for manual testing. -- **Action:** Do not delete without asking. Do not attempt `go build .` at the repo root. - -## Conventions - -- `task.Base` must be embedded with `yaml:",inline" json:",inline"` in every task struct. Omitting the inline tag breaks YAML decoding. -- `New()` always returns `(task.Task, error)` — even tasks with no initialization that can fail. -- Template strings in YAML (`config.String`) expand `{{ secret "/path" }}` (fetched at startup) and `{{ context "key" }}` (evaluated per record from values set upstream via `context:` JQ expressions). -- Task names must be unique within a pipeline. In DAG mode, the `name:` field is the identifier used in the `dag:` expression. -- `fail_on_error: true` causes the pipeline to collect the task error and return it after all goroutines finish. Without it, errors are logged but execution continues. -- Concurrency per task is controlled with `task_concurrency`; the pipeline runs N goroutines calling the same `Run()` concurrently. The `Base` methods (`GetRecord`, `SendRecord`) are safe for concurrent use. - -## Smoke Path - -The smoke path is the one end-to-end test an agent can always run without any external dependencies (no AWS, no Kafka, no network). - -```bash -# Build -go build -o caterpillar ./cmd/caterpillar/caterpillar.go - -# Run -./caterpillar -conf test/pipelines/hello_name.yaml -``` - -Expected outcome: exits with code 0, prints name greetings to stdout, no external services required. If this fails, something fundamental is broken before any task-specific investigation. - -## SRE / Operational Model - -Caterpillar is a CLI tool, not a long-running server. There is no process to restart, no service to scale, and no health endpoint to query. - -**During an incident:** -1. Check CI logs first: `gh run list --repo patterninc/caterpillar` then `gh run view ` to inspect a specific run. -2. If CI is passing and runtime behavior is wrong, check the state of the relevant AWS service (S3 bucket access, SQS queue depth, SSM parameter existence) using the AWS Console or CLI. -3. There is no caterpillar daemon to restart — re-running the binary with a corrected YAML is the recovery action. - -**Release pipeline status:** Currently broken. The Docker Hub publish step fails because the `DOCKERHUB_USERNAME` and `DOCKERHUB_TOKEN` secrets are not configured in the repository's GitHub Actions secrets. Do not attempt to trigger a release until those secrets are set. - -## Environment Variables - -| Variable | Required | Description | -|---|---|---| -| `AWS_ACCESS_KEY_ID` | Required for AWS tasks | IAM access key for authenticating S3, SQS, SNS, and SSM tasks | -| `AWS_SECRET_ACCESS_KEY` | Required for AWS tasks | IAM secret key paired with `AWS_ACCESS_KEY_ID` | -| `AWS_REGION` | Required for all AWS tasks | AWS region to target (e.g., `us-east-1`) | -| `AWS_ENDPOINT_URL` | Optional | Override the AWS endpoint URL; use for LocalStack (e.g., `http://localhost:4566`) | - -No credentials are stored in this repository. Set these variables in your shell or CI environment before running pipelines that use AWS-backed tasks. - -## Pull Request Instructions - -All changes must go through a PR targeting `main`. Never push directly to `main`. - -### Branch naming - -``` -/ -``` - -Common types: `feat`, `fix`, `chore`, `refactor`, `docs`. Example: `feat/add-grpc-task`. - -### Before opening a PR - -1. Build must pass: `go build ./cmd/caterpillar/caterpillar.go` -2. If you added a new task type, confirm it is registered in `tasks.go` and has a `README.md` in its package directory. - -### PR title format - -``` -: -``` - -Examples: -- `feat: add grpc task` -- `fix: handle nil record in jq task` -- `chore: update go.mod dependencies` - -Keep titles under 72 characters. Do not include the issue number in the title. - -### PR body - -``` -## What -<1-3 sentences describing what changed> - -## Why - - -## Test plan -- [ ] `go build ./cmd/caterpillar/caterpillar.go` passes -- [ ] -``` - -### Creating the PR - -```bash -git checkout -b feat/my-change -git add -git commit -m "feat: my change" -gh pr create --title "feat: my change" --body "..." -``` - -Scope `git add` to specific files — never use `git add -A` or `git add .` to avoid accidentally staging scratch files or credentials. - -## Agent Permissions - -- **Autonomous:** Read files, run `go build ./cmd/caterpillar/caterpillar.go`, create branches, add new task packages following the pattern above. -- **Supervised:** Modify `internal/pkg/pipeline/tasks.go`, change `go.mod`/`go.sum`, open PRs, modify CI configuration. -- **Restricted:** Delete untracked scratch files at repo root, add anything to `internal/pkg/pipeline/task/kinesis/` or register it, modify `.github/CODEOWNERS`.