Skip to content

Conversation

@psanzay
Copy link

@psanzay psanzay commented Feb 18, 2021

This PR enables support for deaggregation of the records if the stream has aggregated records.


records = output.Records
records, err = deaggregator.DeaggregateRecords(output.Records)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about swallowing this error. What are the possible errors from deaggregator?

Copy link
Author

@psanzay psanzay Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garethlewin deaggregator throws error in case if it fails to unmarshal the aggregated record.
err := proto.Unmarshal(messageData, aggRecord) if err != nil { return nil, err }
Should we not return the actual payload without deaggregation in case of such error, as it can be possible the records have been aggregated using some custom logic not via amazon's aggregation format, so for those scenarios we should return the records as pushed. It should be up to user to deaggregate them in that case.

@psanzay psanzay requested a review from garethlewin February 26, 2021 12:29
@garethlewin
Copy link
Contributor

Hi Sorry I haven't been ignoring this, I'm just at a bit of a analysis paralysis option here.

This change would make #49 very difficult (or more accurately #49 makes this more difficult). I am really also not sure how to handle erroneous situations.

As I see it there are 3 options, and I dislike all 3:

A) On error just send in the entire blob, this means clients now have to anticipate this happening and deal with the situation, which means that they have to be aware of deaggregation.

B) On error swallow the record. This means data will be dropped, this seems very bad.

C) On error return an error from kinsumer and error. The problem with this is that a checkpoint won't be created (or we are basically back to option B) ) and thus kinsumer will never be able to handle that shard again until the record expires off it.

I am wondering what the benefits of implicit deaggregation are here vs having the clients do it on their side (which is what we do at Twitch, but then we use our own aggregation method and not the one that KCL supplies.

Created pull request template to comply with SOC2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants