Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions go-sdk/pkg/interaction-store/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ func TestNewClientV1(t *testing.T) {
CallerId: "test-caller",
},
},
{
name: "success with keepalive enabled",
config: &Config{
Host: "localhost",
Port: "50051",
DeadLine: 1000,
PlainText: true,
CallerId: "test-caller",
KeepaliveTimeMs: 20000,
KeepaliveTimeoutMs: 5000,
KeepalivePermitWithoutStream: true,
},
},
}

for _, tt := range tests {
Expand Down
67 changes: 57 additions & 10 deletions go-sdk/pkg/interaction-store/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"time"

"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

const (
ResolverDefaultScheme = "dns"

// roundRobinServiceConfig is the default gRPC service config applied to every
// interaction-store connection. It pins the load-balancing policy to round_robin.
roundRobinServiceConfig = `{"loadBalancingPolicy":"round_robin"}`
)

// GRPCClient wraps a gRPC client connection with metrics support
Expand All @@ -42,23 +48,64 @@ func NewConnFromConfig(config *Config, externalServiceName string, timing func(n

func getGRPCConnections(config Config) (*GRPCClient, error) {
resolver.SetDefaultScheme(ResolverDefaultScheme)
var gConn *grpc.ClientConn
var err error
opts, err := dialOptions(config)
if err != nil {
return nil, err
}
gConn, err := grpc.NewClient(config.Host+":"+config.Port, opts...)
if err != nil {
return nil, err
}
return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil
}

// dialOptions assembles the gRPC dial options for the connection: transport
// credentials (plaintext vs TLS) and the round-robin service config, plus an
// optional client keepalive that is appended only when configured (see
// keepaliveParamsFromConfig). When keepalive is not configured the option set is
// identical to the pre-keepalive behaviour, so existing callers are unaffected. It
// returns an error when the keepalive configuration is invalid.
func dialOptions(config Config) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if config.PlainText {
gConn, err = grpc.NewClient(config.Host+":"+config.Port,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
Comment thread
kanhaiya-garg-meesho marked this conversation as resolved.
gConn, err = grpc.NewClient(config.Host+":"+config.Port,
grpc.WithTransportCredentials(creds),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
opts = append(opts, grpc.WithTransportCredentials(creds))
}
opts = append(opts, grpc.WithDefaultServiceConfig(roundRobinServiceConfig))
params, enabled, err := keepaliveParamsFromConfig(config)
if err != nil {
return nil, err
}
return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil
if enabled {
opts = append(opts, grpc.WithKeepaliveParams(params))
}
return opts, nil
}

// keepaliveParamsFromConfig derives the client keepalive parameters from config.
// Keepalive is opt-in: it returns ok=false (and a nil error) when KeepaliveTimeMs <= 0,
// so no keepalive dial option is applied and behaviour is unchanged.
//
// When keepalive is enabled (KeepaliveTimeMs > 0), KeepaliveTimeoutMs must be positive.
// A non-positive timeout is rejected with an error rather than being converted into a
// zero or negative gRPC timer duration, which would make keepalive PINGs fail
// immediately and churn otherwise-healthy connections.
func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool, error) {
if config.KeepaliveTimeMs <= 0 {
return keepalive.ClientParameters{}, false, nil
}
if config.KeepaliveTimeoutMs <= 0 {
Comment thread
kanhaiya-garg-meesho marked this conversation as resolved.
return keepalive.ClientParameters{}, false, fmt.Errorf(
"interaction-store: KeepaliveTimeoutMs must be > 0 when keepalive is enabled "+
"(KeepaliveTimeMs=%d ms), got %d ms", config.KeepaliveTimeMs, config.KeepaliveTimeoutMs)
}
return keepalive.ClientParameters{
Time: time.Duration(config.KeepaliveTimeMs) * time.Millisecond,
Comment thread
kanhaiya-garg-meesho marked this conversation as resolved.
Timeout: time.Duration(config.KeepaliveTimeoutMs) * time.Millisecond,
Comment thread
kanhaiya-garg-meesho marked this conversation as resolved.
PermitWithoutStream: config.KeepalivePermitWithoutStream,
Comment thread
kanhaiya-garg-meesho marked this conversation as resolved.
}, true, nil
}

// Invoke is a wrapper around grpc.ClientConn.Invoke with metrics support
Expand Down
124 changes: 124 additions & 0 deletions go-sdk/pkg/interaction-store/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package interactionstore

import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/keepalive"
)

func TestKeepaliveParamsFromConfig(t *testing.T) {
tests := []struct {
name string
config Config
wantOK bool
wantErr bool
wantParams keepalive.ClientParameters
}{
{
name: "disabled by default (zero value config)",
config: Config{},
wantOK: false,
},
{
name: "disabled when time is zero even if other keepalive fields are set",
config: Config{KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true},
wantOK: false,
},
{
name: "disabled when time is negative",
config: Config{KeepaliveTimeMs: -1, KeepaliveTimeoutMs: 5000},
wantOK: false,
},
{
name: "enabled with full params, milliseconds converted to Duration",
config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true},
wantOK: true,
wantParams: keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
},
},
{
name: "enabled with permit-without-stream defaulting to false",
config: Config{KeepaliveTimeMs: 10000, KeepaliveTimeoutMs: 3000},
wantOK: true,
wantParams: keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
},
},
{
name: "enabled with negative timeout is rejected",
config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1},
wantOK: false,
wantErr: true,
},
{
name: "enabled with zero timeout is rejected",
config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 0},
wantOK: false,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
params, ok, err := keepaliveParamsFromConfig(tt.config)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.wantOK, ok)
assert.Equal(t, tt.wantParams, params)
})
}
}

// TestDefaultServiceConfigIsRoundRobin verifies the default dial behaviour directly:
// the service config must keep the round_robin load-balancing policy. Asserting the
// effective policy (not just an option count) means the test fails if round_robin is
// removed, renamed, or replaced.
func TestDefaultServiceConfigIsRoundRobin(t *testing.T) {
var sc struct {
LoadBalancingPolicy string `json:"loadBalancingPolicy"`
}
require.NoError(t, json.Unmarshal([]byte(roundRobinServiceConfig), &sc))
assert.Equal(t, "round_robin", sc.LoadBalancingPolicy)
}

func TestDialOptions(t *testing.T) {
t.Run("default path applies no keepalive and keeps the round-robin config", func(t *testing.T) {
// The PR promises the default dial behaviour is unchanged. Assert it directly:
// keepalive is off for a default config (no keepalive dial option is produced).
_, enabled, err := keepaliveParamsFromConfig(Config{PlainText: true})
require.NoError(t, err)
assert.False(t, enabled, "default config must not enable keepalive")

opts, err := dialOptions(Config{PlainText: true})
require.NoError(t, err)
assert.Len(t, opts, 2, "default plaintext dial = transport-credentials + service-config only")

tlsOpts, err := dialOptions(Config{PlainText: false})
require.NoError(t, err)
assert.Len(t, tlsOpts, 2, "default TLS dial = transport-credentials + service-config only")
})

t.Run("valid keepalive appends exactly one dial option", func(t *testing.T) {
opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000})
require.NoError(t, err)
assert.Len(t, opts, 3, "enabling keepalive should append exactly one dial option")
})

t.Run("invalid keepalive timeout is rejected (no options returned)", func(t *testing.T) {
opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1})
assert.Error(t, err)
assert.Nil(t, opts)
})
}
16 changes: 16 additions & 0 deletions go-sdk/pkg/interaction-store/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ type Config struct {
DeadLine int
PlainText bool
CallerId string

// Optional client-side gRPC keepalive. Keepalive is opt-in: when KeepaliveTimeMs
// is <= 0 (the zero value) it stays disabled and the connection behaves exactly as
// before. When set, the client sends an HTTP/2 keepalive PING every KeepaliveTimeMs
// and waits KeepaliveTimeoutMs for the ack before treating the connection as dead.
// KeepalivePermitWithoutStream allows pings on a connection that has no active RPCs.
// When keepalive is enabled, KeepaliveTimeoutMs must be > 0; a zero or negative
// timeout is rejected at client construction (it would otherwise be passed to gRPC
// as a non-positive timer and fail keepalive immediately).
//
// Callers MUST keep KeepaliveTimeMs at or above the server's keepalive
// EnforcementPolicy MinTime; pinging more frequently than the server permits causes
// the server to close the connection with a GOAWAY "too_many_pings".
KeepaliveTimeMs int
KeepaliveTimeoutMs int
KeepalivePermitWithoutStream bool
}

type InteractionType int
Expand Down
Loading