From 5451058a2532de194eaf071c2c2be256936f97d5 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari Date: Mon, 20 Apr 2026 11:38:50 +0530 Subject: [PATCH 1/3] FEAT: Support for S3 object tagging in file task Adds a `tags` field to the file task that is applied as the `x-amz-tagging` header on S3 PutObject (including the optional _SUCCESS marker). Tag values support macro/context templating so they can be evaluated per record. Validates S3 limits: up to 10 tags per object, key length up to 128 UTF-16 code units, and value length up to 256 UTF-16 code units. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/pkg/pipeline/task/file/README.md | 34 ++++++++++ internal/pkg/pipeline/task/file/file.go | 18 ++++-- internal/pkg/pipeline/task/file/s3.go | 77 +++++++++++++++++++++++ 3 files changed, 123 insertions(+), 6 deletions(-) diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index e4bcbc9..bb11aab 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -31,6 +31,7 @@ In read mode, the sanitized base filename is stored in the record context under | `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (`s3://bucket/key`); glob patterns supported in read mode | | `region` | string | `us-west-2` | AWS region for S3 operations | | `storage_class` | string | `STANDARD` | S3 **write** only: on `PutObject`. Ignored for local paths. See [S3 storage class](#s3-storage-class). | +| `tags` | map[string]string | - | S3 **write** only: object tags applied on `PutObject`. Ignored for local paths. Values support macros and context templates. See [S3 object tags](#s3-object-tags). | | `delimiter` | string | `\n` | Delimiter used to separate records when reading | | `success_file` | bool | `false` | Whether to create a success file after writing | | `success_file_name` | string | `_SUCCESS` | Name of the success file | @@ -61,6 +62,39 @@ AWS may add or adjust classes in newer SDK releases; if a value is rejected as u Read mode does not set storage class (objects are read as-is). +## S3 object tags + +When the write `path` is an S3 URI (`s3://...`), each object is uploaded with the configured `tags` applied as the `x-amz-tagging` header on `PutObject`. The same tags are applied to the optional `success_file` marker. + +Tag values are evaluated per record, so macros and context templates (e.g. `{{ macro "timestamp" }}`, `{{ context "user_id" }}`) are resolved against the record being written. + +### Limits + +S3 enforces the following constraints ([docs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html)): + +- At most **10 tags** per object. +- Tag keys must be unique (enforced by the YAML map). +- Tag **keys** up to **128 UTF-16 code units**. +- Tag **values** up to **256 UTF-16 code units**. + +Tag counts and key lengths are validated at task startup; resolved value lengths are validated per record. In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. + +Read mode does not apply tags (objects are read as-is). + +### Example + +```yaml +tasks: + - name: write_to_s3_tagged + type: file + path: s3://my-bucket/events/{{ macro "timestamp" }}.jsonl + region: us-east-1 + tags: + env: prod + pipeline: events + user_id: '{{ context "user_id" }}' +``` + ## Path Schemes The task supports different path schemes: diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 4b79c20..7036eb7 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -42,12 +42,13 @@ var ( type file struct { task.Base `yaml:",inline" json:",inline"` - Path config.String `yaml:"path,omitempty" json:"path,omitempty"` - SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"` - SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"` - Region string `yaml:"region,omitempty" json:"region,omitempty"` - StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"` - Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"` + Path config.String `yaml:"path,omitempty" json:"path,omitempty"` + SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"` + SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"` + Region string `yaml:"region,omitempty" json:"region,omitempty"` + StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"` + Tags map[string]config.String `yaml:"tags,omitempty" json:"tags,omitempty"` + Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"` } func New() (task.Task, error) { @@ -67,6 +68,10 @@ func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) er return err } + if err := validateS3Tags(f.Tags); err != nil { + return err + } + // let's check if we read file or we write file... if input != nil && output != nil { return task.ErrPresentInputOutput @@ -241,6 +246,7 @@ func (f *file) writeSuccessFile() error { Path: config.String(successFileName), Region: f.Region, StorageClass: f.StorageClass, + Tags: f.Tags, } return writerFunction(successFile, nil, bytes.NewReader([]byte{})) diff --git a/internal/pkg/pipeline/task/file/s3.go b/internal/pkg/pipeline/task/file/s3.go index 183d79d..1250906 100644 --- a/internal/pkg/pipeline/task/file/s3.go +++ b/internal/pkg/pipeline/task/file/s3.go @@ -3,14 +3,25 @@ package file import ( "fmt" "io" + "net/url" + "unicode/utf16" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/patterninc/caterpillar/internal/pkg/config" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" s3client "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/file/s3_client" ) const ( s3Scheme = `s3` + + // S3 object tagging limits (see + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html). + // Lengths are measured in UTF-16 code units. + s3MaxTagsPerObject = 10 + s3MaxTagKeyLen = 128 + s3MaxTagValueLen = 256 ) type s3Reader struct { @@ -89,13 +100,79 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error { return err } + tags, err := buildTags(f.Tags, rec) + if err != nil { + return err + } + _, err = client.PutObject(ctx, &s3.PutObjectInput{ Bucket: &bucket, Key: &key, Body: reader, StorageClass: f.StorageClass, + Tagging: tags, }) return err } + +// buildTags evaluates each tag value against the record and returns a +// URL-encoded query string (key1=value1&key2=value2) as required by the +// S3 PutObject Tagging header. Returns nil if no tags are configured. +func buildTags(tags map[string]config.String, rec *record.Record) (*string, error) { + + if len(tags) == 0 { + return nil, nil + } + + values := make(url.Values, len(tags)) + for k, v := range tags { + resolved, err := v.Get(rec) + if err != nil { + return nil, fmt.Errorf("tag %q: %w", k, err) + } + if n := utf16Len(resolved); n > s3MaxTagValueLen { + return nil, fmt.Errorf("tag %q: value length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagValueLen) + } + values.Set(k, resolved) + } + + return aws.String(values.Encode()), nil + +} + +// validateS3Tags checks the static tag constraints enforced by S3: at most +// 10 tags per object and tag keys up to 128 UTF-16 code units. Uniqueness +// is already guaranteed by the map. Value lengths depend on per-record +// templating and are validated in buildTags. +func validateS3Tags(tags map[string]config.String) error { + + if len(tags) > s3MaxTagsPerObject { + return fmt.Errorf("tags: %d tags configured, S3 allows at most %d per object", len(tags), s3MaxTagsPerObject) + } + + for k := range tags { + if k == "" { + return fmt.Errorf("tags: empty key is not allowed") + } + if n := utf16Len(k); n > s3MaxTagKeyLen { + return fmt.Errorf("tag %q: key length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagKeyLen) + } + } + + return nil + +} + +// utf16Len returns the number of UTF-16 code units that would encode s, +// which is how S3 measures tag key and value lengths. +func utf16Len(s string) int { + + n := 0 + for _, r := range s { + n += utf16.RuneLen(r) + } + return n + +} From 4ceef9714f5e868585bd01960d66c395c9bb0e55 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari Date: Mon, 20 Apr 2026 11:50:10 +0530 Subject: [PATCH 2/3] address review feedback for S3 object tagging - Move tag validation out of task startup into writeS3File so read mode and local-scheme writes are unaffected by tag config. - Fix UTF-16 length accounting to use utf16.Encode (handles surrogate code points safely instead of letting RuneLen return -1). - Substitute an empty record when buildTags is called with nil (the _SUCCESS marker case) so unresolved {{ context }} placeholders surface as an explicit error rather than being uploaded verbatim. - Document the success-marker restriction in the file task README. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/pkg/pipeline/task/file/README.md | 8 +++++- internal/pkg/pipeline/task/file/file.go | 4 --- internal/pkg/pipeline/task/file/s3.go | 35 +++++++++++++---------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index bb11aab..c4aa3cd 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -77,7 +77,13 @@ S3 enforces the following constraints ([docs](https://docs.aws.amazon.com/Amazon - Tag **keys** up to **128 UTF-16 code units**. - Tag **values** up to **256 UTF-16 code units**. -Tag counts and key lengths are validated at task startup; resolved value lengths are validated per record. In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. +Tag count and key length are validated on the first S3 write; resolved value length is validated per write. In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. Validation runs only when actually writing to S3 — local or read-mode runs are not affected by tag configuration. + +### `success_file` marker + +The `_SUCCESS` marker is not tied to any record, so tag values for the success marker must only use static strings or startup-time templates (`env`, `secret`, `macro`). A tag that references `{{ context "..." }}` will fail at the success-marker write with `context keys were not set: ...`, since there is no record context to resolve against. + +If you need record-derived tag values, either drop the context reference from the success-marker tags, or disable `success_file`. Read mode does not apply tags (objects are read as-is). diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 7036eb7..633e364 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -68,10 +68,6 @@ func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) er return err } - if err := validateS3Tags(f.Tags); err != nil { - return err - } - // let's check if we read file or we write file... if input != nil && output != nil { return task.ErrPresentInputOutput diff --git a/internal/pkg/pipeline/task/file/s3.go b/internal/pkg/pipeline/task/file/s3.go index 1250906..433f177 100644 --- a/internal/pkg/pipeline/task/file/s3.go +++ b/internal/pkg/pipeline/task/file/s3.go @@ -83,6 +83,13 @@ func (r *s3Reader) parse(glob string) ([]string, error) { func writeS3File(f *file, rec *record.Record, reader io.Reader) error { + // Validate static tag constraints (count, key length) here rather than + // at task startup so read-mode and local-scheme writes are unaffected + // when tags are configured but never applied. + if err := validateS3Tags(f.Tags); err != nil { + return err + } + // create s3 client client, err := s3client.New(ctx, f.Region) if err != nil { @@ -120,19 +127,29 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error { // buildTags evaluates each tag value against the record and returns a // URL-encoded query string (key1=value1&key2=value2) as required by the // S3 PutObject Tagging header. Returns nil if no tags are configured. +// +// When rec is nil (e.g. the _SUCCESS marker write), a synthetic empty +// record is substituted so any unresolved {{ context }} placeholders +// surface as a clear "context keys were not set" error instead of being +// uploaded as the internal placeholder string. func buildTags(tags map[string]config.String, rec *record.Record) (*string, error) { if len(tags) == 0 { return nil, nil } + evalRec := rec + if evalRec == nil { + evalRec = &record.Record{Context: ctx} + } + values := make(url.Values, len(tags)) for k, v := range tags { - resolved, err := v.Get(rec) + resolved, err := v.Get(evalRec) if err != nil { return nil, fmt.Errorf("tag %q: %w", k, err) } - if n := utf16Len(resolved); n > s3MaxTagValueLen { + if n := len(utf16.Encode([]rune(resolved))); n > s3MaxTagValueLen { return nil, fmt.Errorf("tag %q: value length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagValueLen) } values.Set(k, resolved) @@ -156,7 +173,7 @@ func validateS3Tags(tags map[string]config.String) error { if k == "" { return fmt.Errorf("tags: empty key is not allowed") } - if n := utf16Len(k); n > s3MaxTagKeyLen { + if n := len(utf16.Encode([]rune(k))); n > s3MaxTagKeyLen { return fmt.Errorf("tag %q: key length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagKeyLen) } } @@ -164,15 +181,3 @@ func validateS3Tags(tags map[string]config.String) error { return nil } - -// utf16Len returns the number of UTF-16 code units that would encode s, -// which is how S3 measures tag key and value lengths. -func utf16Len(s string) int { - - n := 0 - for _, r := range s { - n += utf16.RuneLen(r) - } - return n - -} From 6f297532d27f640fbbe02b834c51dff9680935c7 Mon Sep 17 00:00:00 2001 From: Divyanshu Tiwari Date: Mon, 20 Apr 2026 12:05:20 +0530 Subject: [PATCH 3/3] docs: align tag validation description with implementation validateS3Tags runs on every writeS3File call, not just the first. Update the README to match so the described timing matches the code. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/pkg/pipeline/task/file/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index c4aa3cd..93d1f9c 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -77,7 +77,7 @@ S3 enforces the following constraints ([docs](https://docs.aws.amazon.com/Amazon - Tag **keys** up to **128 UTF-16 code units**. - Tag **values** up to **256 UTF-16 code units**. -Tag count and key length are validated on the first S3 write; resolved value length is validated per write. In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. Validation runs only when actually writing to S3 — local or read-mode runs are not affected by tag configuration. +Tag count, key length, and resolved value length are validated on every S3 write (the count and keys don't change per record, but the checks are cheap and run alongside per-record value validation). In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. Validation runs only when actually writing to S3 — local or read-mode runs are not affected by tag configuration. ### `success_file` marker