-
Notifications
You must be signed in to change notification settings - Fork 78
feat(interaction-store): add optional client gRPC keepalive #379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,19 +4,25 @@ import ( | |
| "context" | ||
| "crypto/tls" | ||
| "errors" | ||
| "fmt" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "github.com/rs/zerolog/log" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/credentials" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
| "google.golang.org/grpc/keepalive" | ||
| "google.golang.org/grpc/resolver" | ||
| "google.golang.org/grpc/status" | ||
| ) | ||
|
|
||
| const ( | ||
| ResolverDefaultScheme = "dns" | ||
|
|
||
| // roundRobinServiceConfig is the default gRPC service config applied to every | ||
| // interaction-store connection. It pins the load-balancing policy to round_robin. | ||
| roundRobinServiceConfig = `{"loadBalancingPolicy":"round_robin"}` | ||
| ) | ||
|
|
||
| // GRPCClient wraps a gRPC client connection with metrics support | ||
|
|
@@ -42,23 +48,64 @@ func NewConnFromConfig(config *Config, externalServiceName string, timing func(n | |
|
|
||
| func getGRPCConnections(config Config) (*GRPCClient, error) { | ||
| resolver.SetDefaultScheme(ResolverDefaultScheme) | ||
| var gConn *grpc.ClientConn | ||
| var err error | ||
| opts, err := dialOptions(config) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| gConn, err := grpc.NewClient(config.Host+":"+config.Port, opts...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil | ||
| } | ||
|
|
||
| // dialOptions assembles the gRPC dial options for the connection: transport | ||
| // credentials (plaintext vs TLS) and the round-robin service config, plus an | ||
| // optional client keepalive that is appended only when configured (see | ||
| // keepaliveParamsFromConfig). When keepalive is not configured the option set is | ||
| // identical to the pre-keepalive behaviour, so existing callers are unaffected. It | ||
| // returns an error when the keepalive configuration is invalid. | ||
| func dialOptions(config Config) ([]grpc.DialOption, error) { | ||
| var opts []grpc.DialOption | ||
| if config.PlainText { | ||
| gConn, err = grpc.NewClient(config.Host+":"+config.Port, | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), | ||
| ) | ||
| opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
| } else { | ||
| creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) | ||
| gConn, err = grpc.NewClient(config.Host+":"+config.Port, | ||
| grpc.WithTransportCredentials(creds), | ||
| grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`)) | ||
| opts = append(opts, grpc.WithTransportCredentials(creds)) | ||
| } | ||
| opts = append(opts, grpc.WithDefaultServiceConfig(roundRobinServiceConfig)) | ||
| params, enabled, err := keepaliveParamsFromConfig(config) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return &GRPCClient{Conn: gConn, DeadLine: int64(config.DeadLine)}, nil | ||
| if enabled { | ||
| opts = append(opts, grpc.WithKeepaliveParams(params)) | ||
| } | ||
| return opts, nil | ||
| } | ||
|
|
||
| // keepaliveParamsFromConfig derives the client keepalive parameters from config. | ||
| // Keepalive is opt-in: it returns ok=false (and a nil error) when KeepaliveTimeMs <= 0, | ||
| // so no keepalive dial option is applied and behaviour is unchanged. | ||
| // | ||
| // When keepalive is enabled (KeepaliveTimeMs > 0), KeepaliveTimeoutMs must be positive. | ||
| // A non-positive timeout is rejected with an error rather than being converted into a | ||
| // zero or negative gRPC timer duration, which would make keepalive PINGs fail | ||
| // immediately and churn otherwise-healthy connections. | ||
| func keepaliveParamsFromConfig(config Config) (keepalive.ClientParameters, bool, error) { | ||
| if config.KeepaliveTimeMs <= 0 { | ||
| return keepalive.ClientParameters{}, false, nil | ||
| } | ||
| if config.KeepaliveTimeoutMs <= 0 { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new gate treats a zero KeepaliveTimeoutMs as fatal whenever KeepaliveTimeMs is positive. Because these fields are advertised as optional and gRPC keepalive parameters normally have default timeout behavior, a partial rollout that sets only the ping interval turns into a startup panic via NewConnFromConfig. |
||
| return keepalive.ClientParameters{}, false, fmt.Errorf( | ||
| "interaction-store: KeepaliveTimeoutMs must be > 0 when keepalive is enabled "+ | ||
| "(KeepaliveTimeMs=%d ms), got %d ms", config.KeepaliveTimeMs, config.KeepaliveTimeoutMs) | ||
| } | ||
| return keepalive.ClientParameters{ | ||
| Time: time.Duration(config.KeepaliveTimeMs) * time.Millisecond, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very large positive millisecond values can overflow the duration multiplication into a negative or otherwise incorrect timer, bypassing the current non-positive validation and passing invalid keepalive settings to gRPC. Also flagged on this line:
|
||
| Timeout: time.Duration(config.KeepaliveTimeoutMs) * time.Millisecond, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The validation only rejects non-positive timeouts, then converts the raw millisecond value directly into the gRPC keepalive timeout. A timeout of a few milliseconds can expire before an ACK during routine network jitter, server scheduling delay, or client/server GC pauses, causing reconnects and retries that hurt tail latency; enforce a sane minimum such as seconds-scale or document/configure a safe default when keepalive is enabled. |
||
| PermitWithoutStream: config.KeepalivePermitWithoutStream, | ||
| }, true, nil | ||
| } | ||
|
|
||
| // Invoke is a wrapper around grpc.ClientConn.Invoke with metrics support | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| package interactionstore | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "google.golang.org/grpc/keepalive" | ||
| ) | ||
|
|
||
| func TestKeepaliveParamsFromConfig(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| config Config | ||
| wantOK bool | ||
| wantErr bool | ||
| wantParams keepalive.ClientParameters | ||
| }{ | ||
| { | ||
| name: "disabled by default (zero value config)", | ||
| config: Config{}, | ||
| wantOK: false, | ||
| }, | ||
| { | ||
| name: "disabled when time is zero even if other keepalive fields are set", | ||
| config: Config{KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true}, | ||
| wantOK: false, | ||
| }, | ||
| { | ||
| name: "disabled when time is negative", | ||
| config: Config{KeepaliveTimeMs: -1, KeepaliveTimeoutMs: 5000}, | ||
| wantOK: false, | ||
| }, | ||
| { | ||
| name: "enabled with full params, milliseconds converted to Duration", | ||
| config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000, KeepalivePermitWithoutStream: true}, | ||
| wantOK: true, | ||
| wantParams: keepalive.ClientParameters{ | ||
| Time: 20 * time.Second, | ||
| Timeout: 5 * time.Second, | ||
| PermitWithoutStream: true, | ||
| }, | ||
| }, | ||
| { | ||
| name: "enabled with permit-without-stream defaulting to false", | ||
| config: Config{KeepaliveTimeMs: 10000, KeepaliveTimeoutMs: 3000}, | ||
| wantOK: true, | ||
| wantParams: keepalive.ClientParameters{ | ||
| Time: 10 * time.Second, | ||
| Timeout: 3 * time.Second, | ||
| PermitWithoutStream: false, | ||
| }, | ||
| }, | ||
| { | ||
| name: "enabled with negative timeout is rejected", | ||
| config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1}, | ||
| wantOK: false, | ||
| wantErr: true, | ||
| }, | ||
| { | ||
| name: "enabled with zero timeout is rejected", | ||
| config: Config{KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 0}, | ||
| wantOK: false, | ||
| wantErr: true, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| params, ok, err := keepaliveParamsFromConfig(tt.config) | ||
| if tt.wantErr { | ||
| assert.Error(t, err) | ||
| } else { | ||
| assert.NoError(t, err) | ||
| } | ||
| assert.Equal(t, tt.wantOK, ok) | ||
| assert.Equal(t, tt.wantParams, params) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // TestDefaultServiceConfigIsRoundRobin verifies the default dial behaviour directly: | ||
| // the service config must keep the round_robin load-balancing policy. Asserting the | ||
| // effective policy (not just an option count) means the test fails if round_robin is | ||
| // removed, renamed, or replaced. | ||
| func TestDefaultServiceConfigIsRoundRobin(t *testing.T) { | ||
| var sc struct { | ||
| LoadBalancingPolicy string `json:"loadBalancingPolicy"` | ||
| } | ||
| require.NoError(t, json.Unmarshal([]byte(roundRobinServiceConfig), &sc)) | ||
| assert.Equal(t, "round_robin", sc.LoadBalancingPolicy) | ||
| } | ||
|
|
||
| func TestDialOptions(t *testing.T) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 SUGGESTION — Convert TestDialOptions to a table-driven test instead of separate hand-written subtests for each input case. This test covers multiple input cases for the same function, and the repository rule requires table-driven tests for that pattern; using a table also makes future dial option cases less repetitive and less error-prone. |
||
| t.Run("default path applies no keepalive and keeps the round-robin config", func(t *testing.T) { | ||
| // The PR promises the default dial behaviour is unchanged. Assert it directly: | ||
| // keepalive is off for a default config (no keepalive dial option is produced). | ||
| _, enabled, err := keepaliveParamsFromConfig(Config{PlainText: true}) | ||
| require.NoError(t, err) | ||
| assert.False(t, enabled, "default config must not enable keepalive") | ||
|
|
||
| opts, err := dialOptions(Config{PlainText: true}) | ||
| require.NoError(t, err) | ||
| assert.Len(t, opts, 2, "default plaintext dial = transport-credentials + service-config only") | ||
|
|
||
| tlsOpts, err := dialOptions(Config{PlainText: false}) | ||
| require.NoError(t, err) | ||
| assert.Len(t, tlsOpts, 2, "default TLS dial = transport-credentials + service-config only") | ||
| }) | ||
|
|
||
| t.Run("valid keepalive appends exactly one dial option", func(t *testing.T) { | ||
| opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: 5000}) | ||
| require.NoError(t, err) | ||
| assert.Len(t, opts, 3, "enabling keepalive should append exactly one dial option") | ||
| }) | ||
|
|
||
| t.Run("invalid keepalive timeout is rejected (no options returned)", func(t *testing.T) { | ||
| opts, err := dialOptions(Config{PlainText: true, KeepaliveTimeMs: 20000, KeepaliveTimeoutMs: -1}) | ||
| assert.Error(t, err) | ||
| assert.Nil(t, opts) | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new dial option makes gRPC send periodic HTTP/2 PINGs on each client connection according to caller-provided millisecond values. With no enforced minimum interval or guard for PermitWithoutStream, a misconfigured service can generate continuous idle-connection traffic and server ACK work across every pod and resolved backend connection; the SDK should clamp/reject intervals below the server policy or provide a safe named preset rather than accepting any positive value.