diff --git a/outbox.go b/outbox.go index eb05ecf..075cbbb 100644 --- a/outbox.go +++ b/outbox.go @@ -119,6 +119,14 @@ func purgeOutbox( return wait(ctx, pollingFrequency) } + // Cache senders by topic to avoid creating a new connection per event. + senders := make(map[string]EventSender) + defer func() { + for _, s := range senders { + _ = s.Close() + } + }() + // Send the events to the EventStreamer. for _, e := range events { var outboxRecord outboxpb.OutboxRecord @@ -140,9 +148,13 @@ func purgeOutbox( t0 := clock.Now() topic := headers[HeaderTopic] - producer, err := stream.NewSender(ctx, topic) - if err != nil { - return err + producer, ok := senders[topic] + if !ok { + producer, err = stream.NewSender(ctx, topic) + if err != nil { + return err + } + senders[topic] = producer } err = producer.Send(ctx, foreignID, eventType, headers)