-
Notifications
You must be signed in to change notification settings - Fork 25
INFOPLAT 3099 chip ingress batching #1756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
hendoxc
commented
Jan 5, 2026
- Adds chipingress batching client
- Adds unit-tests
|
👋 hendoxc, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a batching client for chipingress that collects CloudEvent messages and sends them in batches based on configurable size and timeout triggers. The implementation provides non-blocking message queuing with configurable concurrency control.
Key changes:
- Implements a
BatchClientwith configurable batch size, timeout, buffer size, and concurrency limits - Provides asynchronous batch processing with proper context cancellation and shutdown handling
- Includes comprehensive unit tests covering batch triggers, edge cases, and concurrent operations
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| pkg/chipingress/batch/client.go | Implements the core BatchClient with Start/Stop lifecycle, message queuing, and batch sending logic with configurable options |
| pkg/chipingress/batch/client_test.go | Provides comprehensive test coverage for client initialization, message queuing, batch sending, and various trigger scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
✅ API Diff Results - No breaking changes |
FIxes test
e705b9f to
703498f
Compare
4of9
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approved with one small suggestion
| return | ||
| default: | ||
| if msg.callback != nil { | ||
| msg.callback(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| msg.callback(err) | |
| go msg.callback(err) |
maybe each callback gets its own goroutine? this way no callback can block other callbacks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| go func() { | ||
| for _, msg := range messages { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-b.shutdownChan: | ||
| return | ||
| default: | ||
| if msg.callback != nil { | ||
| msg.callback(err) | ||
| } | ||
| } | ||
| } | ||
| }() |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nested goroutine for invoking callbacks creates a potential race condition. If ctx.Done() or b.shutdownChan are triggered after checking them in the select but before invoking the callback, some callbacks may not be invoked. Consider removing the nested goroutine and invoking callbacks directly in the parent goroutine, or ensuring all callbacks are invoked even during shutdown.
| go func() { | |
| for _, msg := range messages { | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-b.shutdownChan: | |
| return | |
| default: | |
| if msg.callback != nil { | |
| msg.callback(err) | |
| } | |
| } | |
| } | |
| }() | |
| for _, msg := range messages { | |
| if msg.callback != nil { | |
| msg.callback(err) | |
| } | |
| } |
| } | ||
|
|
||
| func (b *Client) Stop() { | ||
| close(b.shutdownChan) |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling Stop() multiple times will panic due to closing an already-closed channel. Add a sync.Once or check if the channel is already closed to make this method safe for concurrent calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendoxc ^^^
|
|
||
| func WithCompressionType(compressionType string) Opt { | ||
| return func(c *Client) { | ||
| c.compressionType = compressionType |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compressionType field is set but never used in the client implementation. Either remove this field and option, or implement compression functionality in the sendBatch method.
| c.compressionType = compressionType | |
| // Compression is currently not implemented; this option is a no-op to | |
| // avoid setting an unused field. |
| callback func(error) | ||
| } | ||
|
|
||
| type Client struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to add metrics for batching logic here. Could be optional if otel MeterProvider is passed.
| callback func(error) | ||
| } | ||
|
|
||
| type Client struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need retry mechanism here? Or that is expected to be configured on gRPC client level?
| client: client, | ||
| batchSize: 1, | ||
| maxConcurrentSends: make(chan struct{}, 1), | ||
| messageBuffer: make(chan *messageWithCallback, 1000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should buffer size be configurable?
| b.maxConcurrentSends <- struct{}{} | ||
|
|
||
| go func() { | ||
| defer func() { <-b.maxConcurrentSends }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metric for this ?
| callback func(error) | ||
| } | ||
|
|
||
| type Client struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a backpressure mechanism, what should happen if ChIP Ingress is not available for X minutes?
| select { | ||
| case <-ctx.Done(): | ||
| b.flush(batch) | ||
| close(b.shutdownChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is closing shutdownChan needed here ?
| case <-b.shutdownChan: | ||
| return | ||
| default: | ||
| if msg.callback != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add context to callback function so it can be canceled gracefully?
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-b.shutdownChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On shutdown or ctx cancelation should we attempt to execute callbacks and complete them with some timeout maybe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe flush should take care of that ?
| if len(batch) >= b.batchSize { | ||
| batchToSend := batch | ||
| batch = make([]*messageWithCallback, 0, b.batchSize) | ||
| timer.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| timer.Stop() | |
| stopAndDrainTimer(timer) |
| return | ||
| case msg := <-b.messageBuffer: | ||
| if len(batch) == 0 { | ||
| timer.Reset(b.batchInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| timer.Reset(b.batchInterval) | |
| stopAndDrainTimer(timer) | |
| timer.Reset(b.batchInterval) |
| go func() { | ||
| batch := make([]*messageWithCallback, 0, b.batchSize) | ||
| timer := time.NewTimer(b.batchInterval) | ||
| timer.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider more safe flow: stop and drain timer channel
| timer.Stop() | |
| stopAndDrainTimer(timer) |
func stopAndDrainTimer(timer *time.Timer) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
}