-
Notifications
You must be signed in to change notification settings - Fork 20
Introduce support for Loki logentry Stages for event processing #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
jeschkies
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. I have a few suggestions.
| } | ||
|
|
||
| // NewLokiStages creates a new instance of LokiStages | ||
| func NewLokiStages(logger log.Logger, stgs []map[string]any, jobName *string, registerer prometheus.Registerer) (*LokiStages, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a method like this in promtail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it requires:
- A YAML format configuration (Lambda-Promtail seems to prefer JSON?) - and simply translating the YAML to JSON wasn't working due to these lines
- Their pipeline only provides access to asynchronous
Runmethods, whereas Lambda-Promtail is fully synchronous
My initial implementation did try to use this Pipeline class, but the necessary changes to Lambda-Promtail felt too large to me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info. Promtail isn't really maintained well so I think it's fine to move the bits here.
| } | ||
| entry = processedEntry | ||
| case <-time.After(timeout): | ||
| return stages.Entry{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you log the timeout? Do we have to cancel anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I was debating whether, if the processing does timeout, we should simply return the unprocessed entry rather than an dropping the result - thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should error instead. Otherwise users will get unexpected log lines.
pkg/promtail.go
Outdated
| stream.Entries = append(stream.Entries, e.entry) | ||
| // Apply pipeline stages to entry | ||
| stageEntry := stages.Entry{ | ||
| Extracted: map[string]interface{}{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used? I'm wondering if we could avoud allocations here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I included this based on this code - from the comment above, it seems some stages expect the Extracted labels to already contain any existing labels. I agree it's not ideal, but not something I think we can necessarily fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a bit of reflection, I've updated this code to skip this entire step if there are no processing stages specified.
Lambda-Promtail currently offers the ability to manipulate labels using Prometheus's relabelling configuration, but does not yet support Promtail/Loki's wider pipeline stages.
This PR introduces support for them into Lambda-Promtail. It provides support for all the stages defined in the logentry package (Apache 2 licensed), using a JSON configuration format (rather than YAML that pipeline requires, given the existing use of JSON configuration).
The implementation adds a new
LokiStagestype that stores all the relevant stages defined in theLOKI_STAGE_CONFIGSenvironment variable. It updates the batch processing to process each entry with the configured stages and adds the processed entries to the batch that is sent.The current Stages implementation is predominantly designed for asynchronous usage (via channels and Run), but Lambda-Promtail uses synchronous functions. If a synchronous
Processfunction is provided for a stage, it will be used, otherwise we use short-lived channels to achieve the equivalent. Where asynchronous calls are used, aPIPELINE_TIMEOUTconfigures the maximum duration (currently 1s by default) before producing an empty entry.Closes #7