diff --git a/internal/pkg/pipeline/task/file/README.md b/internal/pkg/pipeline/task/file/README.md index 1612587..e4bcbc9 100644 --- a/internal/pkg/pipeline/task/file/README.md +++ b/internal/pkg/pipeline/task/file/README.md @@ -28,13 +28,39 @@ In read mode, the sanitized base filename is stored in the record context under |-------|------|---------|-------------| | `name` | string | - | Task name for identification | | `type` | string | `file` | Must be "file" | -| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (s3://bucket/key) supports glob patterns in reading mode +| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (`s3://bucket/key`); glob patterns supported in read mode | | `region` | string | `us-west-2` | AWS region for S3 operations | +| `storage_class` | string | `STANDARD` | S3 **write** only: on `PutObject`. Ignored for local paths. See [S3 storage class](#s3-storage-class). | | `delimiter` | string | `\n` | Delimiter used to separate records when reading | | `success_file` | bool | `false` | Whether to create a success file after writing | | `success_file_name` | string | `_SUCCESS` | Name of the success file | | `fail_on_error` | bool | `false` | Whether to stop the pipeline if this task encounters an error | +## S3 storage class + +When the write `path` is an S3 URI (`s3://...`), each object is uploaded with the configured `storage_class`. The same class applies to the optional `success_file` marker in that task. + +Allowed values are the **PutObject storage class** strings known to the AWS SDK in this build (invalid values fail when the task runs). Typical values include: + +| Value | Notes | +|-------|--------| +| `STANDARD` | Default | +| `REDUCED_REDUNDANCY` | RRS | +| `STANDARD_IA` | Infrequent access | +| `ONEZONE_IA` | Single AZ IA | +| `INTELLIGENT_TIERING` | Intelligent-Tiering | +| `GLACIER` | Glacier Flexible Retrieval (instantiation rules apply) | +| `DEEP_ARCHIVE` | Lowest-cost archive | +| `GLACIER_IR` | Glacier Instant Retrieval | +| `EXPRESS_ONEZONE` | S3 Express One Zone | +| `OUTPOSTS` | S3 on Outposts | +| `SNOW` | Snowball / Snow Family edge | +| `FSX_OPENZFS` | FSx for OpenZFS–backed directory buckets | + +AWS may add or adjust classes in newer SDK releases; if a value is rejected as unknown, compare with the [S3 PutObject storage class documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) or upgrade the SDK in this project. + +Read mode does not set storage class (objects are read as-is). + ## Path Schemes The task supports different path schemes: @@ -62,6 +88,16 @@ tasks: success_file: true ``` +### Writing to S3 with a non-default storage class: +```yaml +tasks: + - name: write_to_s3_ia + type: file + path: s3://my-bucket/logs/{{ macro "timestamp" }}.jsonl + region: us-east-1 + storage_class: STANDARD_IA +``` + ### Using macros in path: ```yaml tasks: diff --git a/internal/pkg/pipeline/task/file/file.go b/internal/pkg/pipeline/task/file/file.go index 2cbedaf..4b79c20 100644 --- a/internal/pkg/pipeline/task/file/file.go +++ b/internal/pkg/pipeline/task/file/file.go @@ -46,13 +46,16 @@ type file struct { SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"` SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"` Region string `yaml:"region,omitempty" json:"region,omitempty"` + StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"` Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"` } func New() (task.Task, error) { + ensureStorageClasses() return &file{ Path: defaultPath, Region: defaultRegion, + StorageClass: defaultStorageClass, Delimiter: defaultDelimiter, SuccessFileName: defaultSuccessFileName, }, nil @@ -60,6 +63,10 @@ func New() (task.Task, error) { func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) error { + if err := validateStorageClass(f.StorageClass); err != nil { + return err + } + // let's check if we read file or we write file... if input != nil && output != nil { return task.ErrPresentInputOutput @@ -231,8 +238,9 @@ func (f *file) writeSuccessFile() error { } successFile := &file{ - Path: config.String(successFileName), - Region: f.Region, + Path: config.String(successFileName), + Region: f.Region, + StorageClass: f.StorageClass, } return writerFunction(successFile, nil, bytes.NewReader([]byte{})) diff --git a/internal/pkg/pipeline/task/file/s3.go b/internal/pkg/pipeline/task/file/s3.go index fc197ba..183d79d 100644 --- a/internal/pkg/pipeline/task/file/s3.go +++ b/internal/pkg/pipeline/task/file/s3.go @@ -90,9 +90,10 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error { } _, err = client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &bucket, - Key: &key, - Body: reader, + Bucket: &bucket, + Key: &key, + Body: reader, + StorageClass: f.StorageClass, }) return err diff --git a/internal/pkg/pipeline/task/file/storage_class.go b/internal/pkg/pipeline/task/file/storage_class.go new file mode 100644 index 0000000..4f23343 --- /dev/null +++ b/internal/pkg/pipeline/task/file/storage_class.go @@ -0,0 +1,30 @@ +package file + +import ( + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +type storageClass = types.StorageClass + +var ( + storageClasses map[string]storageClass + defaultStorageClass storageClass +) + +func ensureStorageClasses() { + vals := types.StorageClass("").Values() + storageClasses = make(map[string]storageClass, len(vals)) + for _, v := range vals { + storageClasses[string(v)] = v + } + defaultStorageClass = storageClasses["STANDARD"] +} + +func validateStorageClass(c storageClass) error { + if _, ok := storageClasses[string(c)]; ok { + return nil + } + return fmt.Errorf("storage_class: unknown value %q", c) +} diff --git a/test/pipelines/context_test_with_storage_class.yaml b/test/pipelines/context_test_with_storage_class.yaml new file mode 100644 index 0000000..d98431a --- /dev/null +++ b/test/pipelines/context_test_with_storage_class.yaml @@ -0,0 +1,44 @@ +tasks: + - name: fetch_user_data + type: http + endpoint: https://jsonplaceholder.typicode.com/users/2 + context: + user_id: ".data | fromjson | .id" + user_name: ".data | fromjson | .name" + user_email: ".data | fromjson | .email" + user_company: ".data | fromjson | .company.name" + + - name: form_posts_endpoint_with_context + type: jq + path: | + { + "endpoint": "https://jsonplaceholder.typicode.com/posts?userId={{ context "user_id" }}" + } + + - name: echo_posts_endpoint + type: echo + only_data: true + + - name: hit_endpoint_with_context + type: http + context: + "number_of_posts": ".data | fromjson | length" + + - name: transform_with_context_jq + type: jq + path: | + { + "user_id": {{ context "user_id" }}, + "user_name": "{{ context "user_name" }}", + "user_email": "{{ context "user_email" }}", + "user_company": "{{ context "user_company" }}", + "headers_content_type": "{{ context "http-header-Content-Type" }}", + "headers_age": "{{ context "http-header-Age" }}", + "number_of_posts": {{ context "number_of_posts" }}, + "original_data": . + } + + - name: file_with_context + type: file + path: s3://{{ env "bucket_name" }}/local/output/{{ context "user_name" }}_{{ macro "uuid" }}_record.txt + storage_class: STANDARD_IA