Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion internal/pkg/pipeline/task/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,39 @@ In read mode, the sanitized base filename is stored in the record context under
|-------|------|---------|-------------|
| `name` | string | - | Task name for identification |
| `type` | string | `file` | Must be "file" |
| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (s3://bucket/key) supports glob patterns in reading mode
| `path` | string | `/tmp/caterpillar.txt` | File path or S3 URL (`s3://bucket/key`); glob patterns supported in read mode |
| `region` | string | `us-west-2` | AWS region for S3 operations |
| `storage_class` | string | `STANDARD` | S3 **write** only: on `PutObject`. Ignored for local paths. See [S3 storage class](#s3-storage-class). |
| `delimiter` | string | `\n` | Delimiter used to separate records when reading |
| `success_file` | bool | `false` | Whether to create a success file after writing |
| `success_file_name` | string | `_SUCCESS` | Name of the success file |
| `fail_on_error` | bool | `false` | Whether to stop the pipeline if this task encounters an error |

## S3 storage class

When the write `path` is an S3 URI (`s3://...`), each object is uploaded with the configured `storage_class`. The same class applies to the optional `success_file` marker in that task.

Allowed values are the **PutObject storage class** strings known to the AWS SDK in this build (invalid values fail when the task runs). Typical values include:

| Value | Notes |
|-------|--------|
| `STANDARD` | Default |
| `REDUCED_REDUNDANCY` | RRS |
| `STANDARD_IA` | Infrequent access |
| `ONEZONE_IA` | Single AZ IA |
| `INTELLIGENT_TIERING` | Intelligent-Tiering |
| `GLACIER` | Glacier Flexible Retrieval (instantiation rules apply) |
| `DEEP_ARCHIVE` | Lowest-cost archive |
| `GLACIER_IR` | Glacier Instant Retrieval |
| `EXPRESS_ONEZONE` | S3 Express One Zone |
| `OUTPOSTS` | S3 on Outposts |
| `SNOW` | Snowball / Snow Family edge |
| `FSX_OPENZFS` | FSx for OpenZFS–backed directory buckets |

AWS may add or adjust classes in newer SDK releases; if a value is rejected as unknown, compare with the [S3 PutObject storage class documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) or upgrade the SDK in this project.

Read mode does not set storage class (objects are read as-is).

## Path Schemes

The task supports different path schemes:
Expand Down Expand Up @@ -62,6 +88,16 @@ tasks:
success_file: true
```

### Writing to S3 with a non-default storage class:
```yaml
tasks:
- name: write_to_s3_ia
type: file
path: s3://my-bucket/logs/{{ macro "timestamp" }}.jsonl
region: us-east-1
storage_class: STANDARD_IA
```

### Using macros in path:
```yaml
tasks:
Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/pipeline/task/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,27 @@ type file struct {
SuccessFile bool `yaml:"success_file,omitempty" json:"success_file,omitempty"`
SuccessFileName config.String `yaml:"success_file_name,omitempty" json:"success_file_name,omitempty"`
Region string `yaml:"region,omitempty" json:"region,omitempty"`
StorageClass storageClass `yaml:"storage_class,omitempty" json:"storage_class,omitempty"`
Delimiter string `yaml:"delimiter,omitempty" json:"delimiter,omitempty"`
}

func New() (task.Task, error) {
ensureStorageClasses()
return &file{
Path: defaultPath,
Region: defaultRegion,
StorageClass: defaultStorageClass,
Delimiter: defaultDelimiter,
SuccessFileName: defaultSuccessFileName,
}, nil
}

func (f *file) Run(input <-chan *record.Record, output chan<- *record.Record) error {

if err := validateStorageClass(f.StorageClass); err != nil {
return err
}

// let's check if we read file or we write file...
if input != nil && output != nil {
return task.ErrPresentInputOutput
Expand Down Expand Up @@ -231,8 +238,9 @@ func (f *file) writeSuccessFile() error {
}

successFile := &file{
Path: config.String(successFileName),
Region: f.Region,
Path: config.String(successFileName),
Region: f.Region,
StorageClass: f.StorageClass,
}

return writerFunction(successFile, nil, bytes.NewReader([]byte{}))
Expand Down
7 changes: 4 additions & 3 deletions internal/pkg/pipeline/task/file/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ func writeS3File(f *file, rec *record.Record, reader io.Reader) error {
}

_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &key,
Body: reader,
Bucket: &bucket,
Key: &key,
Body: reader,
StorageClass: f.StorageClass,
})

return err
Expand Down
30 changes: 30 additions & 0 deletions internal/pkg/pipeline/task/file/storage_class.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package file

import (
"fmt"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

type storageClass = types.StorageClass

var (
storageClasses map[string]storageClass
defaultStorageClass storageClass
)

func ensureStorageClasses() {
vals := types.StorageClass("").Values()
storageClasses = make(map[string]storageClass, len(vals))
for _, v := range vals {
storageClasses[string(v)] = v
}
defaultStorageClass = storageClasses["STANDARD"]
}

func validateStorageClass(c storageClass) error {
if _, ok := storageClasses[string(c)]; ok {
return nil
}
return fmt.Errorf("storage_class: unknown value %q", c)
}
44 changes: 44 additions & 0 deletions test/pipelines/context_test_with_storage_class.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
tasks:
- name: fetch_user_data
type: http
endpoint: https://jsonplaceholder.typicode.com/users/2
context:
user_id: ".data | fromjson | .id"
user_name: ".data | fromjson | .name"
user_email: ".data | fromjson | .email"
user_company: ".data | fromjson | .company.name"

- name: form_posts_endpoint_with_context
type: jq
path: |
{
"endpoint": "https://jsonplaceholder.typicode.com/posts?userId={{ context "user_id" }}"
}

- name: echo_posts_endpoint
type: echo
only_data: true

- name: hit_endpoint_with_context
type: http
context:
"number_of_posts": ".data | fromjson | length"

- name: transform_with_context_jq
type: jq
path: |
{
"user_id": {{ context "user_id" }},
"user_name": "{{ context "user_name" }}",
"user_email": "{{ context "user_email" }}",
"user_company": "{{ context "user_company" }}",
"headers_content_type": "{{ context "http-header-Content-Type" }}",
"headers_age": "{{ context "http-header-Age" }}",
"number_of_posts": {{ context "number_of_posts" }},
"original_data": .
}

- name: file_with_context
type: file
path: s3://{{ env "bucket_name" }}/local/output/{{ context "user_name" }}_{{ macro "uuid" }}_record.txt
storage_class: STANDARD_IA
Loading