Skip to content

Conversation

@hendoxc
Copy link
Contributor

@hendoxc hendoxc commented Jan 5, 2026

  • Adds chipingress batching client
  • Adds unit-tests

Copilot AI review requested due to automatic review settings January 5, 2026 21:58
@hendoxc hendoxc requested a review from a team as a code owner January 5, 2026 21:58
@github-actions
Copy link

github-actions bot commented Jan 5, 2026

👋 hendoxc, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a batching client for chipingress that collects CloudEvent messages and sends them in batches based on configurable size and timeout triggers. The implementation provides non-blocking message queuing with configurable concurrency control.

Key changes:

  • Implements a BatchClient with configurable batch size, timeout, buffer size, and concurrency limits
  • Provides asynchronous batch processing with proper context cancellation and shutdown handling
  • Includes comprehensive unit tests covering batch triggers, edge cases, and concurrent operations

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
pkg/chipingress/batch/client.go Implements the core BatchClient with Start/Stop lifecycle, message queuing, and batch sending logic with configurable options
pkg/chipingress/batch/client_test.go Provides comprehensive test coverage for client initialization, message queuing, batch sending, and various trigger scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions
Copy link

github-actions bot commented Jan 5, 2026

✅ API Diff Results - No breaking changes


📄 View full apidiff report

@hendoxc hendoxc force-pushed the INFOPLAT-3099-chip-ingress-batching branch from e705b9f to 703498f Compare January 5, 2026 22:12
Copy link
Contributor

@4of9 4of9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved with one small suggestion

return
default:
if msg.callback != nil {
msg.callback(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
msg.callback(err)
go msg.callback(err)

maybe each callback gets its own goroutine? this way no callback can block other callbacks

@pkcll pkcll requested a review from Copilot January 14, 2026 23:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +145 to +158
go func() {
for _, msg := range messages {
select {
case <-ctx.Done():
return
case <-b.shutdownChan:
return
default:
if msg.callback != nil {
msg.callback(err)
}
}
}
}()
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested goroutine for invoking callbacks creates a potential race condition. If ctx.Done() or b.shutdownChan are triggered after checking them in the select but before invoking the callback, some callbacks may not be invoked. Consider removing the nested goroutine and invoking callbacks directly in the parent goroutine, or ensuring all callbacks are invoked even during shutdown.

Suggested change
go func() {
for _, msg := range messages {
select {
case <-ctx.Done():
return
case <-b.shutdownChan:
return
default:
if msg.callback != nil {
msg.callback(err)
}
}
}
}()
for _, msg := range messages {
if msg.callback != nil {
msg.callback(err)
}
}

Copilot uses AI. Check for mistakes.
}

func (b *Client) Stop() {
close(b.shutdownChan)
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling Stop() multiple times will panic due to closing an already-closed channel. Add a sync.Once or check if the channel is already closed to make this method safe for concurrent calls.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendoxc ^^^


func WithCompressionType(compressionType string) Opt {
return func(c *Client) {
c.compressionType = compressionType
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compressionType field is set but never used in the client implementation. Either remove this field and option, or implement compression functionality in the sendBatch method.

Suggested change
c.compressionType = compressionType
// Compression is currently not implemented; this option is a no-op to
// avoid setting an unused field.

Copilot uses AI. Check for mistakes.
callback func(error)
}

type Client struct {
Copy link
Contributor

@pkcll pkcll Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add metrics for batching logic here. Could be optional if otel MeterProvider is passed.

callback func(error)
}

type Client struct {
Copy link
Contributor

@pkcll pkcll Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need retry mechanism here? Or that is expected to be configured on gRPC client level?

client: client,
batchSize: 1,
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 1000),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should buffer size be configurable?

b.maxConcurrentSends <- struct{}{}

go func() {
defer func() { <-b.maxConcurrentSends }()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric for this ?

callback func(error)
}

type Client struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a backpressure mechanism, what should happen if ChIP Ingress is not available for X minutes?

select {
case <-ctx.Done():
b.flush(batch)
close(b.shutdownChan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is closing shutdownChan needed here ?

case <-b.shutdownChan:
return
default:
if msg.callback != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add context to callback function so it can be canceled gracefully?

select {
case <-ctx.Done():
return
case <-b.shutdownChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On shutdown or ctx cancelation should we attempt to execute callbacks and complete them with some timeout maybe ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe flush should take care of that ?

if len(batch) >= b.batchSize {
batchToSend := batch
batch = make([]*messageWithCallback, 0, b.batchSize)
timer.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timer.Stop()
stopAndDrainTimer(timer)

return
case msg := <-b.messageBuffer:
if len(batch) == 0 {
timer.Reset(b.batchInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timer.Reset(b.batchInterval)
stopAndDrainTimer(timer)
timer.Reset(b.batchInterval)

go func() {
batch := make([]*messageWithCallback, 0, b.batchSize)
timer := time.NewTimer(b.batchInterval)
timer.Stop()
Copy link
Contributor

@pkcll pkcll Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more safe flow: stop and drain timer channel

Suggested change
timer.Stop()
stopAndDrainTimer(timer)
func stopAndDrainTimer(timer *time.Timer) {
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants