Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions internal/pkg/pipeline/task/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ In read mode, the sanitized base filename is stored in the record context under
| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (`s3://bucket/key`); glob patterns supported in read mode |
| `region` | string | `us-west-2` | AWS region for S3 operations |
| `storage_class` | string | `STANDARD` | S3 **write** only: on `PutObject`. Ignored for local paths. See [S3 storage class](#s3-storage-class). |
| `tags` | map[string]string | - | S3 **write** only: object tags applied on `PutObject`. Ignored for local paths. Values support macros and context templates. See [S3 object tags](#s3-object-tags). |
| `delimiter` | string | `\n` | Delimiter used to separate records when reading |
| `success_file` | bool | `false` | Whether to create a success file after writing |
| `success_file_name` | string | `_SUCCESS` | Name of the success file |
Expand Down Expand Up @@ -61,6 +62,45 @@ AWS may add or adjust classes in newer SDK releases; if a value is rejected as u

Read mode does not set storage class (objects are read as-is).

## S3 object tags

When the write `path` is an S3 URI (`s3://...`), each object is uploaded with the configured `tags` applied as the `x-amz-tagging` header on `PutObject`. The same tags are applied to the optional `success_file` marker.

Tag values are evaluated per record, so macros and context templates (e.g. `{{ macro "timestamp" }}`, `{{ context "user_id" }}`) are resolved against the record being written.

Comment thread
divyanshu-tiwari marked this conversation as resolved.
### Limits

S3 enforces the following constraints ([docs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html)):

- At most **10 tags** per object.
- Tag keys must be unique (enforced by the YAML map).
- Tag **keys** up to **128 UTF-16 code units**.
- Tag **values** up to **256 UTF-16 code units**.

Tag count, key length, and resolved value length are validated on every S3 write (the count and keys don't change per record, but the checks are cheap and run alongside per-record value validation). In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. Validation runs only when actually writing to S3 — local or read-mode runs are not affected by tag configuration.

### `success_file` marker

The `_SUCCESS` marker is not tied to any record, so tag values for the success marker must only use static strings or startup-time templates (`env`, `secret`, `macro`). A tag that references `{{ context "..." }}` will fail at the success-marker write with `context keys were not set: ...`, since there is no record context to resolve against.

If you need record-derived tag values, either drop the context reference from the success-marker tags, or disable `success_file`.
Copy link
Copy Markdown
Contributor

@prasadlohakpure prasadlohakpure Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit pick:

context reference from the success-marker tags

Same tags would be applied for both files, right?
So do you think we should change success-marker tags > tags ?


Read mode does not apply tags (objects are read as-is).

### Example

```yaml
tasks:
- name: write_to_s3_tagged
type: file
path: s3://my-bucket/events/{{ macro "timestamp" }}.jsonl
region: us-east-1
tags:
env: prod
pipeline: events
user_id: '{{ context "user_id" }}'
```

## Path Schemes

The task supports different path schemes:
Expand Down
14 changes: 8 additions & 6 deletions internal/pkg/pipeline/task/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ var (

type file struct {
task.Base `yaml:",inline" json:",inline"`
Path config.String `yaml:"path,omitempty" json:"path,omitempty"`
SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"`
SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"`
Region string `yaml:"region,omitempty" json:"region,omitempty"`
StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"`
Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"`
Path config.String `yaml:"path,omitempty" json:"path,omitempty"`
SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"`
SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"`
Region string `yaml:"region,omitempty" json:"region,omitempty"`
StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"`
Tags map[string]config.String `yaml:"tags,omitempty" json:"tags,omitempty"`
Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"`
}

func New() (task.Task, error) {
Expand Down Expand Up @@ -241,6 +242,7 @@ func (f *file) writeSuccessFile() error {
Path: config.String(successFileName),
Region: f.Region,
StorageClass: f.StorageClass,
Tags: f.Tags,
}

return writerFunction(successFile, nil, bytes.NewReader([]byte{}))
Expand Down
82 changes: 82 additions & 0 deletions internal/pkg/pipeline/task/file/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@ package file
import (
"fmt"
"io"
"net/url"
"unicode/utf16"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/patterninc/caterpillar/internal/pkg/config"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
s3client "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/file/s3_client"
)

const (
s3Scheme = `s3`

// S3 object tagging limits (see
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html).
// Lengths are measured in UTF-16 code units.
s3MaxTagsPerObject = 10
s3MaxTagKeyLen = 128
s3MaxTagValueLen = 256
)

type s3Reader struct {
Expand Down Expand Up @@ -72,6 +83,13 @@ func (r *s3Reader) parse(glob string) ([]string, error) {

func writeS3File(f *file, rec *record.Record, reader io.Reader) error {

// Validate static tag constraints (count, key length) here rather than
// at task startup so read-mode and local-scheme writes are unaffected
// when tags are configured but never applied.
if err := validateS3Tags(f.Tags); err != nil {
return err
}
Comment thread
divyanshu-tiwari marked this conversation as resolved.

// create s3 client
client, err := s3client.New(ctx, f.Region)
if err != nil {
Expand All @@ -89,13 +107,77 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error {
return err
}

tags, err := buildTags(f.Tags, rec)
if err != nil {
return err
}

_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &key,
Body: reader,
StorageClass: f.StorageClass,
Tagging: tags,
})

return err

}

// buildTags evaluates each tag value against the record and returns a
// URL-encoded query string (key1=value1&key2=value2) as required by the
// S3 PutObject Tagging header. Returns nil if no tags are configured.
//
// When rec is nil (e.g. the _SUCCESS marker write), a synthetic empty
// record is substituted so any unresolved {{ context }} placeholders
// surface as a clear "context keys were not set" error instead of being
// uploaded as the internal placeholder string.
func buildTags(tags map[string]config.String, rec *record.Record) (*string, error) {

if len(tags) == 0 {
return nil, nil
}

evalRec := rec
if evalRec == nil {
evalRec = &record.Record{Context: ctx}
}

values := make(url.Values, len(tags))
for k, v := range tags {
resolved, err := v.Get(evalRec)
if err != nil {
return nil, fmt.Errorf("tag %q: %w", k, err)
}
if n := len(utf16.Encode([]rune(resolved))); n > s3MaxTagValueLen {
return nil, fmt.Errorf("tag %q: value length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagValueLen)
}
Comment thread
divyanshu-tiwari marked this conversation as resolved.
values.Set(k, resolved)
}

return aws.String(values.Encode()), nil

}

// validateS3Tags checks the static tag constraints enforced by S3: at most
// 10 tags per object and tag keys up to 128 UTF-16 code units. Uniqueness
// is already guaranteed by the map. Value lengths depend on per-record
// templating and are validated in buildTags.
func validateS3Tags(tags map[string]config.String) error {

if len(tags) > s3MaxTagsPerObject {
return fmt.Errorf("tags: %d tags configured, S3 allows at most %d per object", len(tags), s3MaxTagsPerObject)
}

for k := range tags {
if k == "" {
return fmt.Errorf("tags: empty key is not allowed")
}
if n := len(utf16.Encode([]rune(k))); n > s3MaxTagKeyLen {
return fmt.Errorf("tag %q: key length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagKeyLen)
}
}

return nil

}
Loading