From 9d6d13d0daef45c947f7b3033fb3f05937e01318 Mon Sep 17 00:00:00 2001 From: kanhaiya-garg-meesho Date: Mon, 15 Jun 2026 11:59:36 +0530 Subject: [PATCH 1/2] feat(interaction-store): add optional client gRPC keepalive Add opt-in client keepalive to the interaction-store gRPC client. Three new optional Config fields (KeepaliveTimeMs, KeepaliveTimeoutMs, KeepalivePermitWithoutStream) drive grpc.WithKeepaliveParams, applied only when KeepaliveTimeMs > 0. When unset (the zero value) the dial options are identical to before, so existing consumers are unaffected. This lets latency-sensitive callers detect a silently dead connection (e.g. a black-holed TCP path) via HTTP/2 keepalive PINGs, instead of waiting for a per-RPC deadline or the kernel's TCP retransmit give-up. Callers must keep KeepaliveTimeMs at or above the server's keepalive EnforcementPolicy MinTime to avoid the server closing the connection with GOAWAY "too_many_pings". Dial-option assembly is extracted into dialOptions / keepaliveParamsFromConfig so the opt-in gate, millisecond-to-Duration mapping, and backward-compatible option set are unit tested. Co-Authored-By: Claude Opus 4.8 Signed-off-by: kanhaiya-garg-meesho --- go-sdk/pkg/interaction-store/client_test.go | 13 ++++ go-sdk/pkg/interaction-store/grpc.go | 47 +++++++++---- go-sdk/pkg/interaction-store/grpc_test.go | 78 +++++++++++++++++++++ go-sdk/pkg/interaction-store/models.go | 13 ++++ 4 files changed, 139 insertions(+), 12 deletions(-) create mode 100644 go-sdk/pkg/interaction-store/grpc_test.go diff --git a/go-sdk/pkg/interaction-store/client_test.go b/go-sdk/pkg/interaction-store/client_test.go index b5217290..0c9aef5d 100644 --- a/go-sdk/pkg/interaction-store/client_test.go +++ b/go-sdk/pkg/interaction-store/client_test.go @@ -71,6 +71,19 @@ func TestNewClientV1(t *testing.T) { CallerId: "test-caller", }, }, + { + name: "success with keepalive enabled", + config: &Config{ + Host: "localhost", + Port: "50051", + DeadLine: 1000, + PlainText: true, + CallerId: "test-caller", + KeepaliveTimeMs: 20000, + KeepaliveTimeoutMs: 5000, + KeepalivePermitWithoutStream: true, + }, + }, } for _, tt := range tests { diff --git a/go-sdk/pkg/interaction-store/grpc.go b/go-sdk/pkg/interaction-store/grpc.go index 92c0c7f5..487f3432 100644 --- a/go-sdk/pkg/interaction-store/grpc.go +++ b/go-sdk/pkg/interaction-store/grpc.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" ) @@ -42,23 +43,45 @@ func NewConnFromConfig(config *Config, externalServiceName string, timing func(n func getGRPCConnections(config Config) (*GRPCClient, error) { resolver.SetDefaultScheme(ResolverDefaultScheme) - var gConn *grpc.ClientConn - var err error + gConn, err := grpc.NewClient(config.Host+":"+config.Port, dialOptions(config)...) + if err != nil { + return nil, err + } + return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil +} + +// dialOptions assembles the gRPC dial options for the connection: transport +// credentials (plaintext vs TLS) and the round-robin service config, plus an +// optional client keepalive that is appended only when configured (see +// keepaliveParamsFromConfig). When keepalive is not configured the option set is +// identical to the pre-keepalive behaviour, so existing callers are unaffected. +func dialOptions(config Config) []grpc.DialOption { + var opts []grpc.DialOption if config.PlainText { - gConn, err = grpc.NewClient(config.Host+":"+config.Port, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), - ) + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) - gConn, err = grpc.NewClient(config.Host+":"+config.Port, - grpc.WithTransportCredentials(creds), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) + opts = append(opts, grpc.WithTransportCredentials(creds)) } - if err != nil { - return nil, err + opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) + if params, ok := keepaliveParamsFromConfig(config); ok { + opts = append(opts, grpc.WithKeepaliveParams(params)) } - return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil + return opts +} + +// keepaliveParamsFromConfig derives the client keepalive parameters from config. +// Keepalive is opt-in: ok is false when KeepaliveTimeMs <= 0, and callers must not +// apply a keepalive dial option in that case (preserving the prior behaviour). +func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool) { + if config.KeepaliveTimeMs <= 0 { + return keepalive.ClientParameters{}, false + } + return keepalive.ClientParameters{ + Time: time.Duration(config.KeepaliveTimeMs) * time.Millisecond, + Timeout: time.Duration(config.KeepaliveTimeoutMs) * time.Millisecond, + PermitWithoutStream: config.KeepalivePermitWithoutStream, + }, true } // Invoke is a wrapper around grpc.ClientConn.Invoke with metrics support diff --git a/go-sdk/pkg/interaction-store/grpc_test.go b/go-sdk/pkg/interaction-store/grpc_test.go new file mode 100644 index 00000000..4c72ec68 --- /dev/null +++ b/go-sdk/pkg/interaction-store/grpc_test.go @@ -0,0 +1,78 @@ +package interactionstore + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/keepalive" +) + +func TestKeepaliveParamsFromConfig(t *testing.T) { + tests := []struct { + name string + config Config + wantOK bool + wantParams keepalive.ClientParameters + }{ + { + name: "disabled by default (zero value config)", + config: Config{}, + wantOK: false, + }, + { + name: "disabled when time is zero even if other keepalive fields are set", + config: Config{KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true}, + wantOK: false, + }, + { + name: "disabled when time is negative", + config: Config{KeepaliveTimeMs: -1, KeepaliveTimeoutMs: 5000}, + wantOK: false, + }, + { + name: "enabled with full params, milliseconds converted to Duration", + config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true}, + wantOK: true, + wantParams: keepalive.ClientParameters{ + Time: 20 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }, + }, + { + name: "enabled with permit-without-stream defaulting to false", + config: Config{KeepaliveTimeMs: 10000, KeepaliveTimeoutMs: 3000}, + wantOK: true, + wantParams: keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + PermitWithoutStream: false, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params, ok := keepaliveParamsFromConfig(tt.config) + assert.Equal(t, tt.wantOK, ok) + assert.Equal(t, tt.wantParams, params) + }) + } +} + +func TestDialOptions_KeepaliveAppendedOnlyWhenConfigured(t *testing.T) { + // Backward compatibility: without keepalive config the dial-option set is unchanged + // (transport credentials + service config). Enabling keepalive appends exactly one. + plaintextNoKeepalive := dialOptions(Config{PlainText: true}) + tlsNoKeepalive := dialOptions(Config{PlainText: false}) + withKeepalive := dialOptions(Config{ + PlainText: true, + KeepaliveTimeMs: 20000, + KeepaliveTimeoutMs: 5000, + }) + + assert.Len(t, plaintextNoKeepalive, 2, "plaintext dial should be transport-credentials + service-config only") + assert.Len(t, tlsNoKeepalive, 2, "TLS dial should be transport-credentials + service-config only") + assert.Len(t, withKeepalive, 3, "enabling keepalive should append exactly one dial option") +} diff --git a/go-sdk/pkg/interaction-store/models.go b/go-sdk/pkg/interaction-store/models.go index 704598a1..dc0684d0 100644 --- a/go-sdk/pkg/interaction-store/models.go +++ b/go-sdk/pkg/interaction-store/models.go @@ -10,6 +10,19 @@ type Config struct { DeadLine int PlainText bool CallerId string + + // Optional client-side gRPC keepalive. Keepalive is opt-in: when KeepaliveTimeMs + // is <= 0 (the zero value) it stays disabled and the connection behaves exactly as + // before. When set, the client sends an HTTP/2 keepalive PING every KeepaliveTimeMs + // and waits KeepaliveTimeoutMs for the ack before treating the connection as dead. + // KeepalivePermitWithoutStream allows pings on a connection that has no active RPCs. + // + // Callers MUST keep KeepaliveTimeMs at or above the server's keepalive + // EnforcementPolicy MinTime; pinging more frequently than the server permits causes + // the server to close the connection with a GOAWAY "too_many_pings". + KeepaliveTimeMs int + KeepaliveTimeoutMs int + KeepalivePermitWithoutStream bool } type InteractionType int From 67d781a4cfec2b7600b941bba4618f9e5f427cd8 Mon Sep 17 00:00:00 2001 From: kanhaiya-garg-meesho Date: Mon, 15 Jun 2026 12:40:54 +0530 Subject: [PATCH 2/2] fix(interaction-store): validate keepalive timeout and harden backward-compat test Address review feedback on the optional keepalive change: - Reject a non-positive KeepaliveTimeoutMs when keepalive is enabled (KeepaliveTimeMs > 0) instead of converting it into a zero/negative gRPC timer, which would make keepalive PINGs fail immediately and churn otherwise-healthy connections. keepaliveParamsFromConfig now returns an error, propagated through dialOptions and getGRPCConnections so the misconfiguration fails loudly at client construction (consistent with the existing validateConfig behaviour). - Strengthen the backward-compatibility test to assert the effective default dial behaviour rather than only the option count: the round_robin load-balancing policy is now a named constant that is parsed and verified, and the default path is asserted to produce no keepalive option. Co-Authored-By: Claude Opus 4.8 Signed-off-by: kanhaiya-garg-meesho --- go-sdk/pkg/interaction-store/grpc.go | 46 +++++++++++---- go-sdk/pkg/interaction-store/grpc_test.go | 72 +++++++++++++++++++---- go-sdk/pkg/interaction-store/models.go | 3 + 3 files changed, 97 insertions(+), 24 deletions(-) diff --git a/go-sdk/pkg/interaction-store/grpc.go b/go-sdk/pkg/interaction-store/grpc.go index 487f3432..8cd79211 100644 --- a/go-sdk/pkg/interaction-store/grpc.go +++ b/go-sdk/pkg/interaction-store/grpc.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "errors" + "fmt" "strconv" "time" @@ -18,6 +19,10 @@ import ( const ( ResolverDefaultScheme = "dns" + + // roundRobinServiceConfig is the default gRPC service config applied to every + // interaction-store connection. It pins the load-balancing policy to round_robin. + roundRobinServiceConfig = `{"loadBalancingPolicy":"round_robin"}` ) // GRPCClient wraps a gRPC client connection with metrics support @@ -43,7 +48,11 @@ func NewConnFromConfig(config *Config, externalServiceName string, timing func(n func getGRPCConnections(config Config) (*GRPCClient, error) { resolver.SetDefaultScheme(ResolverDefaultScheme) - gConn, err := grpc.NewClient(config.Host+":"+config.Port, dialOptions(config)...) + opts, err := dialOptions(config) + if err != nil { + return nil, err + } + gConn, err := grpc.NewClient(config.Host+":"+config.Port, opts...) if err != nil { return nil, err } @@ -54,8 +63,9 @@ func getGRPCConnections(config Config) (*GRPCClient, error) { // credentials (plaintext vs TLS) and the round-robin service config, plus an // optional client keepalive that is appended only when configured (see // keepaliveParamsFromConfig). When keepalive is not configured the option set is -// identical to the pre-keepalive behaviour, so existing callers are unaffected. -func dialOptions(config Config) []grpc.DialOption { +// identical to the pre-keepalive behaviour, so existing callers are unaffected. It +// returns an error when the keepalive configuration is invalid. +func dialOptions(config Config) ([]grpc.DialOption, error) { var opts []grpc.DialOption if config.PlainText { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -63,25 +73,39 @@ func dialOptions(config Config) []grpc.DialOption { creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) opts = append(opts, grpc.WithTransportCredentials(creds)) } - opts = append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) - if params, ok := keepaliveParamsFromConfig(config); ok { + opts = append(opts, grpc.WithDefaultServiceConfig(roundRobinServiceConfig)) + params, enabled, err := keepaliveParamsFromConfig(config) + if err != nil { + return nil, err + } + if enabled { opts = append(opts, grpc.WithKeepaliveParams(params)) } - return opts + return opts, nil } // keepaliveParamsFromConfig derives the client keepalive parameters from config. -// Keepalive is opt-in: ok is false when KeepaliveTimeMs <= 0, and callers must not -// apply a keepalive dial option in that case (preserving the prior behaviour). -func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool) { +// Keepalive is opt-in: it returns ok=false (and a nil error) when KeepaliveTimeMs <= 0, +// so no keepalive dial option is applied and behaviour is unchanged. +// +// When keepalive is enabled (KeepaliveTimeMs > 0), KeepaliveTimeoutMs must be positive. +// A non-positive timeout is rejected with an error rather than being converted into a +// zero or negative gRPC timer duration, which would make keepalive PINGs fail +// immediately and churn otherwise-healthy connections. +func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool, error) { if config.KeepaliveTimeMs <= 0 { - return keepalive.ClientParameters{}, false + return keepalive.ClientParameters{}, false, nil + } + if config.KeepaliveTimeoutMs <= 0 { + return keepalive.ClientParameters{}, false, fmt.Errorf( + "interaction-store: KeepaliveTimeoutMs must be > 0 when keepalive is enabled "+ + "(KeepaliveTimeMs=%d ms), got %d ms", config.KeepaliveTimeMs, config.KeepaliveTimeoutMs) } return keepalive.ClientParameters{ Time: time.Duration(config.KeepaliveTimeMs) * time.Millisecond, Timeout: time.Duration(config.KeepaliveTimeoutMs) * time.Millisecond, PermitWithoutStream: config.KeepalivePermitWithoutStream, - }, true + }, true, nil } // Invoke is a wrapper around grpc.ClientConn.Invoke with metrics support diff --git a/go-sdk/pkg/interaction-store/grpc_test.go b/go-sdk/pkg/interaction-store/grpc_test.go index 4c72ec68..298a3205 100644 --- a/go-sdk/pkg/interaction-store/grpc_test.go +++ b/go-sdk/pkg/interaction-store/grpc_test.go @@ -1,10 +1,12 @@ package interactionstore import ( + "encoding/json" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc/keepalive" ) @@ -13,6 +15,7 @@ func TestKeepaliveParamsFromConfig(t *testing.T) { name string config Config wantOK bool + wantErr bool wantParams keepalive.ClientParameters }{ { @@ -50,29 +53,72 @@ func TestKeepaliveParamsFromConfig(t *testing.T) { PermitWithoutStream: false, }, }, + { + name: "enabled with negative timeout is rejected", + config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1}, + wantOK: false, + wantErr: true, + }, + { + name: "enabled with zero timeout is rejected", + config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 0}, + wantOK: false, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - params, ok := keepaliveParamsFromConfig(tt.config) + params, ok, err := keepaliveParamsFromConfig(tt.config) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } assert.Equal(t, tt.wantOK, ok) assert.Equal(t, tt.wantParams, params) }) } } -func TestDialOptions_KeepaliveAppendedOnlyWhenConfigured(t *testing.T) { - // Backward compatibility: without keepalive config the dial-option set is unchanged - // (transport credentials + service config). Enabling keepalive appends exactly one. - plaintextNoKeepalive := dialOptions(Config{PlainText: true}) - tlsNoKeepalive := dialOptions(Config{PlainText: false}) - withKeepalive := dialOptions(Config{ - PlainText: true, - KeepaliveTimeMs: 20000, - KeepaliveTimeoutMs: 5000, +// TestDefaultServiceConfigIsRoundRobin verifies the default dial behaviour directly: +// the service config must keep the round_robin load-balancing policy. Asserting the +// effective policy (not just an option count) means the test fails if round_robin is +// removed, renamed, or replaced. +func TestDefaultServiceConfigIsRoundRobin(t *testing.T) { + var sc struct { + LoadBalancingPolicy string `json:"loadBalancingPolicy"` + } + require.NoError(t, json.Unmarshal([]byte(roundRobinServiceConfig), &sc)) + assert.Equal(t, "round_robin", sc.LoadBalancingPolicy) +} + +func TestDialOptions(t *testing.T) { + t.Run("default path applies no keepalive and keeps the round-robin config", func(t *testing.T) { + // The PR promises the default dial behaviour is unchanged. Assert it directly: + // keepalive is off for a default config (no keepalive dial option is produced). + _, enabled, err := keepaliveParamsFromConfig(Config{PlainText: true}) + require.NoError(t, err) + assert.False(t, enabled, "default config must not enable keepalive") + + opts, err := dialOptions(Config{PlainText: true}) + require.NoError(t, err) + assert.Len(t, opts, 2, "default plaintext dial = transport-credentials + service-config only") + + tlsOpts, err := dialOptions(Config{PlainText: false}) + require.NoError(t, err) + assert.Len(t, tlsOpts, 2, "default TLS dial = transport-credentials + service-config only") }) - assert.Len(t, plaintextNoKeepalive, 2, "plaintext dial should be transport-credentials + service-config only") - assert.Len(t, tlsNoKeepalive, 2, "TLS dial should be transport-credentials + service-config only") - assert.Len(t, withKeepalive, 3, "enabling keepalive should append exactly one dial option") + t.Run("valid keepalive appends exactly one dial option", func(t *testing.T) { + opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000}) + require.NoError(t, err) + assert.Len(t, opts, 3, "enabling keepalive should append exactly one dial option") + }) + + t.Run("invalid keepalive timeout is rejected (no options returned)", func(t *testing.T) { + opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1}) + assert.Error(t, err) + assert.Nil(t, opts) + }) } diff --git a/go-sdk/pkg/interaction-store/models.go b/go-sdk/pkg/interaction-store/models.go index dc0684d0..bcb184bc 100644 --- a/go-sdk/pkg/interaction-store/models.go +++ b/go-sdk/pkg/interaction-store/models.go @@ -16,6 +16,9 @@ type Config struct { // before. When set, the client sends an HTTP/2 keepalive PING every KeepaliveTimeMs // and waits KeepaliveTimeoutMs for the ack before treating the connection as dead. // KeepalivePermitWithoutStream allows pings on a connection that has no active RPCs. + // When keepalive is enabled, KeepaliveTimeoutMs must be > 0; a zero or negative + // timeout is rejected at client construction (it would otherwise be passed to gRPC + // as a non-positive timer and fail keepalive immediately). // // Callers MUST keep KeepaliveTimeMs at or above the server's keepalive // EnforcementPolicy MinTime; pinging more frequently than the server permits causes