diff --git a/go-sdk/pkg/interaction-store/client_test.go b/go-sdk/pkg/interaction-store/client_test.go index b5217290..0c9aef5d 100644 --- a/go-sdk/pkg/interaction-store/client_test.go +++ b/go-sdk/pkg/interaction-store/client_test.go @@ -71,6 +71,19 @@ func TestNewClientV1(t *testing.T) { CallerId: "test-caller", }, }, + { + name: "success with keepalive enabled", + config: &Config{ + Host: "localhost", + Port: "50051", + DeadLine: 1000, + PlainText: true, + CallerId: "test-caller", + KeepaliveTimeMs: 20000, + KeepaliveTimeoutMs: 5000, + KeepalivePermitWithoutStream: true, + }, + }, } for _, tt := range tests { diff --git a/go-sdk/pkg/interaction-store/grpc.go b/go-sdk/pkg/interaction-store/grpc.go index 92c0c7f5..8cd79211 100644 --- a/go-sdk/pkg/interaction-store/grpc.go +++ b/go-sdk/pkg/interaction-store/grpc.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "fmt" "strconv" "time" @@ -11,12 +12,17 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" ) const ( ResolverDefaultScheme = "dns" + + // roundRobinServiceConfig is the default gRPC service config applied to every + // interaction-store connection. It pins the load-balancing policy to round_robin. + roundRobinServiceConfig = `{"loadBalancingPolicy":"round_robin"}` ) // GRPCClient wraps a gRPC client connection with metrics support @@ -42,23 +48,64 @@ func NewConnFromConfig(config *Config, externalServiceName string, timing func(n func getGRPCConnections(config Config) (*GRPCClient, error) { resolver.SetDefaultScheme(ResolverDefaultScheme) - var gConn *grpc.ClientConn - var err error + opts, err := dialOptions(config) + if err != nil { + return nil, err + } + gConn, err := grpc.NewClient(config.Host+":"+config.Port, opts...) + if err != nil { + return nil, err + } + return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil +} + +// dialOptions assembles the gRPC dial options for the connection: transport +// credentials (plaintext vs TLS) and the round-robin service config, plus an +// optional client keepalive that is appended only when configured (see +// keepaliveParamsFromConfig). When keepalive is not configured the option set is +// identical to the pre-keepalive behaviour, so existing callers are unaffected. It +// returns an error when the keepalive configuration is invalid. +func dialOptions(config Config) ([]grpc.DialOption, error) { + var opts []grpc.DialOption if config.PlainText { - gConn, err = grpc.NewClient(config.Host+":"+config.Port, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), - ) + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) - gConn, err = grpc.NewClient(config.Host+":"+config.Port, - grpc.WithTransportCredentials(creds), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) + opts = append(opts, grpc.WithTransportCredentials(creds)) } + opts = append(opts, grpc.WithDefaultServiceConfig(roundRobinServiceConfig)) + params, enabled, err := keepaliveParamsFromConfig(config) if err != nil { return nil, err } - return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil + if enabled { + opts = append(opts, grpc.WithKeepaliveParams(params)) + } + return opts, nil +} + +// keepaliveParamsFromConfig derives the client keepalive parameters from config. +// Keepalive is opt-in: it returns ok=false (and a nil error) when KeepaliveTimeMs <= 0, +// so no keepalive dial option is applied and behaviour is unchanged. +// +// When keepalive is enabled (KeepaliveTimeMs > 0), KeepaliveTimeoutMs must be positive. +// A non-positive timeout is rejected with an error rather than being converted into a +// zero or negative gRPC timer duration, which would make keepalive PINGs fail +// immediately and churn otherwise-healthy connections. +func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool, error) { + if config.KeepaliveTimeMs <= 0 { + return keepalive.ClientParameters{}, false, nil + } + if config.KeepaliveTimeoutMs <= 0 { + return keepalive.ClientParameters{}, false, fmt.Errorf( + "interaction-store: KeepaliveTimeoutMs must be > 0 when keepalive is enabled "+ + "(KeepaliveTimeMs=%d ms), got %d ms", config.KeepaliveTimeMs, config.KeepaliveTimeoutMs) + } + return keepalive.ClientParameters{ + Time: time.Duration(config.KeepaliveTimeMs) * time.Millisecond, + Timeout: time.Duration(config.KeepaliveTimeoutMs) * time.Millisecond, + PermitWithoutStream: config.KeepalivePermitWithoutStream, + }, true, nil } // Invoke is a wrapper around grpc.ClientConn.Invoke with metrics support diff --git a/go-sdk/pkg/interaction-store/grpc_test.go b/go-sdk/pkg/interaction-store/grpc_test.go new file mode 100644 index 00000000..298a3205 --- /dev/null +++ b/go-sdk/pkg/interaction-store/grpc_test.go @@ -0,0 +1,124 @@ +package interactionstore + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/keepalive" +) + +func TestKeepaliveParamsFromConfig(t *testing.T) { + tests := []struct { + name string + config Config + wantOK bool + wantErr bool + wantParams keepalive.ClientParameters + }{ + { + name: "disabled by default (zero value config)", + config: Config{}, + wantOK: false, + }, + { + name: "disabled when time is zero even if other keepalive fields are set", + config: Config{KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true}, + wantOK: false, + }, + { + name: "disabled when time is negative", + config: Config{KeepaliveTimeMs: -1, KeepaliveTimeoutMs: 5000}, + wantOK: false, + }, + { + name: "enabled with full params, milliseconds converted to Duration", + config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true}, + wantOK: true, + wantParams: keepalive.ClientParameters{ + Time: 20 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, + }, + { + name: "enabled with permit-without-stream defaulting to false", + config: Config{KeepaliveTimeMs: 10000, KeepaliveTimeoutMs: 3000}, + wantOK: true, + wantParams: keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + PermitWithoutStream: false, + }, + }, + { + name: "enabled with negative timeout is rejected", + config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1}, + wantOK: false, + wantErr: true, + }, + { + name: "enabled with zero timeout is rejected", + config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 0}, + wantOK: false, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params, ok, err := keepaliveParamsFromConfig(tt.config) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantOK, ok) + assert.Equal(t, tt.wantParams, params) + }) + } +} + +// TestDefaultServiceConfigIsRoundRobin verifies the default dial behaviour directly: +// the service config must keep the round_robin load-balancing policy. Asserting the +// effective policy (not just an option count) means the test fails if round_robin is +// removed, renamed, or replaced. +func TestDefaultServiceConfigIsRoundRobin(t *testing.T) { + var sc struct { + LoadBalancingPolicy string `json:"loadBalancingPolicy"` + } + require.NoError(t, json.Unmarshal([]byte(roundRobinServiceConfig), &sc)) + assert.Equal(t, "round_robin", sc.LoadBalancingPolicy) +} + +func TestDialOptions(t *testing.T) { + t.Run("default path applies no keepalive and keeps the round-robin config", func(t *testing.T) { + // The PR promises the default dial behaviour is unchanged. Assert it directly: + // keepalive is off for a default config (no keepalive dial option is produced). + _, enabled, err := keepaliveParamsFromConfig(Config{PlainText: true}) + require.NoError(t, err) + assert.False(t, enabled, "default config must not enable keepalive") + + opts, err := dialOptions(Config{PlainText: true}) + require.NoError(t, err) + assert.Len(t, opts, 2, "default plaintext dial = transport-credentials + service-config only") + + tlsOpts, err := dialOptions(Config{PlainText: false}) + require.NoError(t, err) + assert.Len(t, tlsOpts, 2, "default TLS dial = transport-credentials + service-config only") + }) + + t.Run("valid keepalive appends exactly one dial option", func(t *testing.T) { + opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000}) + require.NoError(t, err) + assert.Len(t, opts, 3, "enabling keepalive should append exactly one dial option") + }) + + t.Run("invalid keepalive timeout is rejected (no options returned)", func(t *testing.T) { + opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1}) + assert.Error(t, err) + assert.Nil(t, opts) + }) +} diff --git a/go-sdk/pkg/interaction-store/models.go b/go-sdk/pkg/interaction-store/models.go index 704598a1..bcb184bc 100644 --- a/go-sdk/pkg/interaction-store/models.go +++ b/go-sdk/pkg/interaction-store/models.go @@ -10,6 +10,22 @@ type Config struct { DeadLine int PlainText bool CallerId string + + // Optional client-side gRPC keepalive. Keepalive is opt-in: when KeepaliveTimeMs + // is <= 0 (the zero value) it stays disabled and the connection behaves exactly as + // before. When set, the client sends an HTTP/2 keepalive PING every KeepaliveTimeMs + // and waits KeepaliveTimeoutMs for the ack before treating the connection as dead. + // KeepalivePermitWithoutStream allows pings on a connection that has no active RPCs. + // When keepalive is enabled, KeepaliveTimeoutMs must be > 0; a zero or negative + // timeout is rejected at client construction (it would otherwise be passed to gRPC + // as a non-positive timer and fail keepalive immediately). + // + // Callers MUST keep KeepaliveTimeMs at or above the server's keepalive + // EnforcementPolicy MinTime; pinging more frequently than the server permits causes + // the server to close the connection with a GOAWAY "too_many_pings". + KeepaliveTimeMs int + KeepaliveTimeoutMs int + KeepalivePermitWithoutStream bool } type InteractionType int