Skip to content

Conversation

@kelnage
Copy link
Contributor

@kelnage kelnage commented Dec 31, 2025

Lambda-Promtail currently offers the ability to manipulate labels using Prometheus's relabelling configuration, but does not yet support Promtail/Loki's wider pipeline stages.

This PR introduces support for them into Lambda-Promtail. It provides support for all the stages defined in the logentry package (Apache 2 licensed), using a JSON configuration format (rather than YAML that pipeline requires, given the existing use of JSON configuration).

The implementation adds a new LokiStages type that stores all the relevant stages defined in the LOKI_STAGE_CONFIGS environment variable. It updates the batch processing to process each entry with the configured stages and adds the processed entries to the batch that is sent.

The current Stages implementation is predominantly designed for asynchronous usage (via channels and Run), but Lambda-Promtail uses synchronous functions. If a synchronous Process function is provided for a stage, it will be used, otherwise we use short-lived channels to achieve the equivalent. Where asynchronous calls are used, a PIPELINE_TIMEOUT configures the maximum duration (currently 1s by default) before producing an empty entry.

Closes #7

Copy link
Collaborator

@jeschkies jeschkies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. I have a few suggestions.

}

// NewLokiStages creates a new instance of LokiStages
func NewLokiStages(logger log.Logger, stgs []map[string]any, jobName *string, registerer prometheus.Registerer) (*LokiStages, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a method like this in promtail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it requires:

  • A YAML format configuration (Lambda-Promtail seems to prefer JSON?) - and simply translating the YAML to JSON wasn't working due to these lines
  • Their pipeline only provides access to asynchronous Run methods, whereas Lambda-Promtail is fully synchronous

My initial implementation did try to use this Pipeline class, but the necessary changes to Lambda-Promtail felt too large to me?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info. Promtail isn't really maintained well so I think it's fine to move the bits here.

}
entry = processedEntry
case <-time.After(timeout):
return stages.Entry{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you log the timeout? Do we have to cancel anything here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I was debating whether, if the processing does timeout, we should simply return the unprocessed entry rather than an dropping the result - thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should error instead. Otherwise users will get unexpected log lines.

pkg/promtail.go Outdated
stream.Entries = append(stream.Entries, e.entry)
// Apply pipeline stages to entry
stageEntry := stages.Entry{
Extracted: map[string]interface{}{},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used? I'm wondering if we could avoud allocations here.

Copy link
Contributor Author

@kelnage kelnage Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included this based on this code - from the comment above, it seems some stages expect the Extracted labels to already contain any existing labels. I agree it's not ideal, but not something I think we can necessarily fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a bit of reflection, I've updated this code to skip this entire step if there are no processing stages specified.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[lambda-promtail] Add support for log line parsing/transforming/filtering feature

2 participants