diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index e4bcbc9..93d1f9c 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -31,6 +31,7 @@ In read mode, the sanitized base filename is stored in the record context under | `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (`s3://bucket/key`); glob patterns supported in read mode | | `region` | string | `us-west-2` | AWS region for S3 operations | | `storage_class` | string | `STANDARD` | S3 **write** only: on `PutObject`. Ignored for local paths. See [S3 storage class](#s3-storage-class). | +| `tags` | map[string]string | - | S3 **write** only: object tags applied on `PutObject`. Ignored for local paths. Values support macros and context templates. See [S3 object tags](#s3-object-tags). | | `delimiter` | string | `\n` | Delimiter used to separate records when reading | | `success_file` | bool | `false` | Whether to create a success file after writing | | `success_file_name` | string | `_SUCCESS` | Name of the success file | @@ -61,6 +62,45 @@ AWS may add or adjust classes in newer SDK releases; if a value is rejected as u Read mode does not set storage class (objects are read as-is). +## S3 object tags + +When the write `path` is an S3 URI (`s3://...`), each object is uploaded with the configured `tags` applied as the `x-amz-tagging` header on `PutObject`. The same tags are applied to the optional `success_file` marker. + +Tag values are evaluated per record, so macros and context templates (e.g. `{{ macro "timestamp" }}`, `{{ context "user_id" }}`) are resolved against the record being written. + +### Limits + +S3 enforces the following constraints ([docs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html)): + +- At most **10 tags** per object. +- Tag keys must be unique (enforced by the YAML map). +- Tag **keys** up to **128 UTF-16 code units**. +- Tag **values** up to **256 UTF-16 code units**. + +Tag count, key length, and resolved value length are validated on every S3 write (the count and keys don't change per record, but the checks are cheap and run alongside per-record value validation). In UTF-16, most characters take 1 code unit and supplementary characters (e.g. many emoji) take 2. Validation runs only when actually writing to S3 — local or read-mode runs are not affected by tag configuration. + +### `success_file` marker + +The `_SUCCESS` marker is not tied to any record, so tag values for the success marker must only use static strings or startup-time templates (`env`, `secret`, `macro`). A tag that references `{{ context "..." }}` will fail at the success-marker write with `context keys were not set: ...`, since there is no record context to resolve against. + +If you need record-derived tag values, either drop the context reference from the success-marker tags, or disable `success_file`. + +Read mode does not apply tags (objects are read as-is). + +### Example + +```yaml +tasks: + - name: write_to_s3_tagged + type: file + path: s3://my-bucket/events/{{ macro "timestamp" }}.jsonl + region: us-east-1 + tags: + env: prod + pipeline: events + user_id: '{{ context "user_id" }}' +``` + ## Path Schemes The task supports different path schemes: diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 4b79c20..633e364 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -42,12 +42,13 @@ var ( type file struct { task.Base `yaml:",inline" json:",inline"` - Path config.String `yaml:"path,omitempty" json:"path,omitempty"` - SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"` - SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"` - Region string `yaml:"region,omitempty" json:"region,omitempty"` - StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"` - Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"` + Path config.String `yaml:"path,omitempty" json:"path,omitempty"` + SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"` + SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"` + Region string `yaml:"region,omitempty" json:"region,omitempty"` + StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"` + Tags map[string]config.String `yaml:"tags,omitempty" json:"tags,omitempty"` + Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"` } func New() (task.Task, error) { @@ -241,6 +242,7 @@ func (f *file) writeSuccessFile() error { Path: config.String(successFileName), Region: f.Region, StorageClass: f.StorageClass, + Tags: f.Tags, } return writerFunction(successFile, nil, bytes.NewReader([]byte{})) diff --git a/internal/pkg/pipeline/task/file/s3.go b/internal/pkg/pipeline/task/file/s3.go index 183d79d..433f177 100644 --- a/internal/pkg/pipeline/task/file/s3.go +++ b/internal/pkg/pipeline/task/file/s3.go @@ -3,14 +3,25 @@ package file import ( "fmt" "io" + "net/url" + "unicode/utf16" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/patterninc/caterpillar/internal/pkg/config" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" s3client "github.com/patterninc/caterpillar/internal/pkg/pipeline/task/file/s3_client" ) const ( s3Scheme = `s3` + + // S3 object tagging limits (see + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-tagging.html). + // Lengths are measured in UTF-16 code units. + s3MaxTagsPerObject = 10 + s3MaxTagKeyLen = 128 + s3MaxTagValueLen = 256 ) type s3Reader struct { @@ -72,6 +83,13 @@ func (r *s3Reader) parse(glob string) ([]string, error) { func writeS3File(f *file, rec *record.Record, reader io.Reader) error { + // Validate static tag constraints (count, key length) here rather than + // at task startup so read-mode and local-scheme writes are unaffected + // when tags are configured but never applied. + if err := validateS3Tags(f.Tags); err != nil { + return err + } + // create s3 client client, err := s3client.New(ctx, f.Region) if err != nil { @@ -89,13 +107,77 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error { return err } + tags, err := buildTags(f.Tags, rec) + if err != nil { + return err + } + _, err = client.PutObject(ctx, &s3.PutObjectInput{ Bucket: &bucket, Key: &key, Body: reader, StorageClass: f.StorageClass, + Tagging: tags, }) return err } + +// buildTags evaluates each tag value against the record and returns a +// URL-encoded query string (key1=value1&key2=value2) as required by the +// S3 PutObject Tagging header. Returns nil if no tags are configured. +// +// When rec is nil (e.g. the _SUCCESS marker write), a synthetic empty +// record is substituted so any unresolved {{ context }} placeholders +// surface as a clear "context keys were not set" error instead of being +// uploaded as the internal placeholder string. +func buildTags(tags map[string]config.String, rec *record.Record) (*string, error) { + + if len(tags) == 0 { + return nil, nil + } + + evalRec := rec + if evalRec == nil { + evalRec = &record.Record{Context: ctx} + } + + values := make(url.Values, len(tags)) + for k, v := range tags { + resolved, err := v.Get(evalRec) + if err != nil { + return nil, fmt.Errorf("tag %q: %w", k, err) + } + if n := len(utf16.Encode([]rune(resolved))); n > s3MaxTagValueLen { + return nil, fmt.Errorf("tag %q: value length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagValueLen) + } + values.Set(k, resolved) + } + + return aws.String(values.Encode()), nil + +} + +// validateS3Tags checks the static tag constraints enforced by S3: at most +// 10 tags per object and tag keys up to 128 UTF-16 code units. Uniqueness +// is already guaranteed by the map. Value lengths depend on per-record +// templating and are validated in buildTags. +func validateS3Tags(tags map[string]config.String) error { + + if len(tags) > s3MaxTagsPerObject { + return fmt.Errorf("tags: %d tags configured, S3 allows at most %d per object", len(tags), s3MaxTagsPerObject) + } + + for k := range tags { + if k == "" { + return fmt.Errorf("tags: empty key is not allowed") + } + if n := len(utf16.Encode([]rune(k))); n > s3MaxTagKeyLen { + return fmt.Errorf("tag %q: key length %d exceeds S3 limit of %d UTF-16 code units", k, n, s3MaxTagKeyLen) + } + } + + return nil + +}