Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions go-sdk/pkg/interaction-store/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ func TestNewClientV1(t *testing.T) {
CallerId: "test-caller",
},
},
{
name: "success with keepalive enabled",
config: &Config{
Host: "localhost",
Port: "50051",
DeadLine: 1000,
PlainText: true,
CallerId: "test-caller",
KeepaliveTimeMs: 20000,
KeepaliveTimeoutMs: 5000,
KeepalivePermitWithoutStream: true,
},
},
}

for _, tt := range tests {
Expand Down
67 changes: 57 additions & 10 deletions go-sdk/pkg/interaction-store/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"time"

"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

const (
ResolverDefaultScheme = "dns"

// roundRobinServiceConfig is the default gRPC service config applied to every
// interaction-store connection. It pins the load-balancing policy to round_robin.
roundRobinServiceConfig = `{"loadBalancingPolicy":"round_robin"}`
)

// GRPCClient wraps a gRPC client connection with metrics support
Expand All @@ -42,23 +48,64 @@ func NewConnFromConfig(config *Config, externalServiceName string, timing func(n

func getGRPCConnections(config Config) (*GRPCClient, error) {
resolver.SetDefaultScheme(ResolverDefaultScheme)
var gConn *grpc.ClientConn
var err error
opts, err := dialOptions(config)
if err != nil {
return nil, err
}
gConn, err := grpc.NewClient(config.Host+":"+config.Port, opts...)
if err != nil {
return nil, err
}
return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil
}

// dialOptions assembles the gRPC dial options for the connection: transport
// credentials (plaintext vs TLS) and the round-robin service config, plus an
// optional client keepalive that is appended only when configured (see
// keepaliveParamsFromConfig). When keepalive is not configured the option set is
// identical to the pre-keepalive behaviour, so existing callers are unaffected. It
// returns an error when the keepalive configuration is invalid.
func dialOptions(config Config) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if config.PlainText {
gConn, err = grpc.NewClient(config.Host+":"+config.Port,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
gConn, err = grpc.NewClient(config.Host+":"+config.Port,
grpc.WithTransportCredentials(creds),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
opts = append(opts, grpc.WithTransportCredentials(creds))
}
opts = append(opts, grpc.WithDefaultServiceConfig(roundRobinServiceConfig))
params, enabled, err := keepaliveParamsFromConfig(config)
if err != nil {
return nil, err
}
return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil
if enabled {
opts = append(opts, grpc.WithKeepaliveParams(params))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ WARNING — Applying keepalive parameters without any SDK-side lower bound can turn idle connection pools into a fleet-wide background PING load when consumers enable aggressive keepalive, especially with PermitWithoutStream.

The new dial option makes gRPC send periodic HTTP/2 PINGs on each client connection according to caller-provided millisecond values. With no enforced minimum interval or guard for PermitWithoutStream, a misconfigured service can generate continuous idle-connection traffic and server ACK work across every pod and resolved backend connection; the SDK should clamp/reject intervals below the server policy or provide a safe named preset rather than accepting any positive value.

}
return opts, nil
}

// keepaliveParamsFromConfig derives the client keepalive parameters from config.
// Keepalive is opt-in: it returns ok=false (and a nil error) when KeepaliveTimeMs <= 0,
// so no keepalive dial option is applied and behaviour is unchanged.
//
// When keepalive is enabled (KeepaliveTimeMs > 0), KeepaliveTimeoutMs must be positive.
// A non-positive timeout is rejected with an error rather than being converted into a
// zero or negative gRPC timer duration, which would make keepalive PINGs fail
// immediately and churn otherwise-healthy connections.
func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool, error) {
if config.KeepaliveTimeMs <= 0 {
return keepalive.ClientParameters{}, false, nil
}
if config.KeepaliveTimeoutMs <= 0 {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ WARNING — Enabling keepalive with only KeepaliveTimeMs set will now fail client construction instead of using gRPC's default keepalive timeout.

The new gate treats a zero KeepaliveTimeoutMs as fatal whenever KeepaliveTimeMs is positive. Because these fields are advertised as optional and gRPC keepalive parameters normally have default timeout behavior, a partial rollout that sets only the ping interval turns into a startup panic via NewConnFromConfig.

return keepalive.ClientParameters{}, false, fmt.Errorf(
"interaction-store: KeepaliveTimeoutMs must be > 0 when keepalive is enabled "+
"(KeepaliveTimeMs=%d ms), got %d ms", config.KeepaliveTimeMs, config.KeepaliveTimeoutMs)
}
return keepalive.ClientParameters{
Time: time.Duration(config.KeepaliveTimeMs) * time.Millisecond,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ WARNING — Validate that KeepaliveTimeMs and KeepaliveTimeoutMs fit in time.Duration before multiplying by time.Millisecond, and reject values that would overflow.

Very large positive millisecond values can overflow the duration multiplication into a negative or otherwise incorrect timer, bypassing the current non-positive validation and passing invalid keepalive settings to gRPC.

Also flagged on this line:

  • 💡 SUGGESTION — KeepaliveTimeMs is multiplied into a time.Duration without checking for overflow, which can produce a negative or nonsensical ping interval.

Timeout: time.Duration(config.KeepaliveTimeoutMs) * time.Millisecond,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ WARNING — KeepaliveTimeoutMs is accepted as any positive duration, so very small values can churn otherwise healthy connections under normal RTT or GC pauses.

The validation only rejects non-positive timeouts, then converts the raw millisecond value directly into the gRPC keepalive timeout. A timeout of a few milliseconds can expire before an ACK during routine network jitter, server scheduling delay, or client/server GC pauses, causing reconnects and retries that hurt tail latency; enforce a sane minimum such as seconds-scale or document/configure a safe default when keepalive is enabled.

PermitWithoutStream: config.KeepalivePermitWithoutStream,
}, true, nil
}

// Invoke is a wrapper around grpc.ClientConn.Invoke with metrics support
Expand Down
124 changes: 124 additions & 0 deletions go-sdk/pkg/interaction-store/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package interactionstore

import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/keepalive"
)

func TestKeepaliveParamsFromConfig(t *testing.T) {
tests := []struct {
name string
config Config
wantOK bool
wantErr bool
wantParams keepalive.ClientParameters
}{
{
name: "disabled by default (zero value config)",
config: Config{},
wantOK: false,
},
{
name: "disabled when time is zero even if other keepalive fields are set",
config: Config{KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true},
wantOK: false,
},
{
name: "disabled when time is negative",
config: Config{KeepaliveTimeMs: -1, KeepaliveTimeoutMs: 5000},
wantOK: false,
},
{
name: "enabled with full params, milliseconds converted to Duration",
config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true},
wantOK: true,
wantParams: keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
},
},
{
name: "enabled with permit-without-stream defaulting to false",
config: Config{KeepaliveTimeMs: 10000, KeepaliveTimeoutMs: 3000},
wantOK: true,
wantParams: keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
},
},
{
name: "enabled with negative timeout is rejected",
config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1},
wantOK: false,
wantErr: true,
},
{
name: "enabled with zero timeout is rejected",
config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 0},
wantOK: false,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
params, ok, err := keepaliveParamsFromConfig(tt.config)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.wantOK, ok)
assert.Equal(t, tt.wantParams, params)
})
}
}

// TestDefaultServiceConfigIsRoundRobin verifies the default dial behaviour directly:
// the service config must keep the round_robin load-balancing policy. Asserting the
// effective policy (not just an option count) means the test fails if round_robin is
// removed, renamed, or replaced.
func TestDefaultServiceConfigIsRoundRobin(t *testing.T) {
var sc struct {
LoadBalancingPolicy string `json:"loadBalancingPolicy"`
}
require.NoError(t, json.Unmarshal([]byte(roundRobinServiceConfig), &sc))
assert.Equal(t, "round_robin", sc.LoadBalancingPolicy)
}

func TestDialOptions(t *testing.T) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 SUGGESTION — Convert TestDialOptions to a table-driven test instead of separate hand-written subtests for each input case.

This test covers multiple input cases for the same function, and the repository rule requires table-driven tests for that pattern; using a table also makes future dial option cases less repetitive and less error-prone.

t.Run("default path applies no keepalive and keeps the round-robin config", func(t *testing.T) {
// The PR promises the default dial behaviour is unchanged. Assert it directly:
// keepalive is off for a default config (no keepalive dial option is produced).
_, enabled, err := keepaliveParamsFromConfig(Config{PlainText: true})
require.NoError(t, err)
assert.False(t, enabled, "default config must not enable keepalive")

opts, err := dialOptions(Config{PlainText: true})
require.NoError(t, err)
assert.Len(t, opts, 2, "default plaintext dial = transport-credentials + service-config only")

tlsOpts, err := dialOptions(Config{PlainText: false})
require.NoError(t, err)
assert.Len(t, tlsOpts, 2, "default TLS dial = transport-credentials + service-config only")
})

t.Run("valid keepalive appends exactly one dial option", func(t *testing.T) {
opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000})
require.NoError(t, err)
assert.Len(t, opts, 3, "enabling keepalive should append exactly one dial option")
})

t.Run("invalid keepalive timeout is rejected (no options returned)", func(t *testing.T) {
opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1})
assert.Error(t, err)
assert.Nil(t, opts)
})
}
16 changes: 16 additions & 0 deletions go-sdk/pkg/interaction-store/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ type Config struct {
DeadLine int
PlainText bool
CallerId string

// Optional client-side gRPC keepalive. Keepalive is opt-in: when KeepaliveTimeMs
// is <= 0 (the zero value) it stays disabled and the connection behaves exactly as
// before. When set, the client sends an HTTP/2 keepalive PING every KeepaliveTimeMs
// and waits KeepaliveTimeoutMs for the ack before treating the connection as dead.
// KeepalivePermitWithoutStream allows pings on a connection that has no active RPCs.
// When keepalive is enabled, KeepaliveTimeoutMs must be > 0; a zero or negative
// timeout is rejected at client construction (it would otherwise be passed to gRPC
// as a non-positive timer and fail keepalive immediately).
//
// Callers MUST keep KeepaliveTimeMs at or above the server's keepalive
// EnforcementPolicy MinTime; pinging more frequently than the server permits causes
// the server to close the connection with a GOAWAY "too_many_pings".
KeepaliveTimeMs int
KeepaliveTimeoutMs int
KeepalivePermitWithoutStream bool
}

type InteractionType int
Expand Down
Loading