fix(connector): add gRPC keepalive and default UNAVAILABLE retry policy#7458
fix(connector): add gRPC keepalive and default UNAVAILABLE retry policy#7458pingsutw wants to merge 4 commits into
Conversation
Connector RPCs frequently traverse an L7 gateway (e.g. the Knative/Kourier Envoy gateway) that reaps idle HTTP/2 connections and sends GOAWAY/drain on routing changes. The cached *grpc.ClientConn then fails the next RPC with "error reading server preface: ... use of closed network connection" (gRPC code Unavailable), surfacing as 'failed to create task from connector'. - Add client keepalive (Time=30s, Timeout=10s, PermitWithoutStream=true) so long-lived connector connections are not reaped while idle and dead connections are detected proactively. - Introduce defaultGRPCServiceConfig (round-robin LB + retryPolicy on UNAVAILABLE + retryThrottling) and have getGrpcConnection fall back to it when a Deployment does not set DefaultServiceConfig, so every deployment (DefaultConnector, connectors, connectorApps) gets LB + retry uniformly without per-deployment config. Signed-off-by: Kevin Su <pingsutw@gmail.com> Signed-off-by: Kevin Su <pingsutw@apache.org>
There was a problem hiding this comment.
Pull request overview
This PR updates the connector gRPC client defaults to make long-lived connector RPC connections more resilient to idle gateway connection reaping and transient UNAVAILABLE errors.
Changes:
- Adds a default gRPC service config with round-robin load balancing,
UNAVAILABLEretries, and retry throttling. - Adds client-side keepalive parameters for connector gRPC connections.
- Updates default config and unit tests to reflect the new fallback behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
flyteplugins/go/tasks/plugins/webapi/connector/client.go |
Adds default service config fallback and keepalive dial options. |
flyteplugins/go/tasks/plugins/webapi/connector/config.go |
Leaves default connector service config empty so client fallback applies. |
flyteplugins/go/tasks/plugins/webapi/connector/config_test.go |
Updates default config expectations. |
flyteplugins/go/tasks/plugins/webapi/connector/client_test.go |
Adds tests for default service config structure and connection fallback behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "google.golang.org/grpc/credentials" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
| "google.golang.org/grpc/grpclog" | ||
| "google.golang.org/grpc/keepalive" |
There was a problem hiding this comment.
Thanks, but this isn't needed for the grpc-go version we use (v1.81.1). The core google.golang.org/grpc package already registers round_robin for us: clientconn.go (which is package grpc) has
// google.golang.org/grpc/clientconn.go:55
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.and roundrobin's init() calls balancer.Register(...). Since this file already imports google.golang.org/grpc, the balancer is registered as soon as the package loads — no separate blank import required. The manual blank import was only necessary in much older grpc-go releases (pre ~v1.36) before that import was folded into clientconn.go.
Also, {"loadBalancingConfig":[{"round_robin":{}}]} was already the prior default service config and is in use in production, so there's no silent fallback here.
| var connectorKeepalive = keepalive.ClientParameters{ | ||
| Time: 30 * time.Second, | ||
| Timeout: 10 * time.Second, | ||
| PermitWithoutStream: true, |
There was a problem hiding this comment.
Good catch — this is valid, and the enforcement is cross-language (grpcio/Python servers run on C-core, whose defaults match grpc-go: min_recv_ping_interval_without_data = 5m, max_ping_strikes = 2, permit_without_calls = false), so unconditional aggressive keepalive could indeed trigger GOAWAY "too_many_pings" on a directly-reached connector.
Fixed: keepalive is now opt-in per Deployment via a new KeepaliveConfig, and getGrpcConnection applies it only when configured (default off). So directly-reached grpc-go/grpcio connectors get no idle pings. Keepalive is enabled only for connectors fronted by an L7 gateway (Knative/Kourier Envoy) where the pings terminate at Envoy rather than the connector's grpc server. The UNAVAILABLE retry policy remains always-on as the universal safety net.
…alive Connector RPCs frequently traverse an L7 gateway (e.g. the Knative/Kourier Envoy gateway) that reaps idle HTTP/2 connections and sends GOAWAY/drain on routing changes. The cached *grpc.ClientConn then fails the next RPC with "error reading server preface: ... use of closed network connection" (gRPC code Unavailable), surfacing as 'failed to create task from connector'. - Introduce DefaultGRPCServiceConfig (round-robin LB + retryPolicy on UNAVAILABLE + retryThrottling). getGrpcConnection falls back to it when a Deployment does not set DefaultServiceConfig, so every deployment gets LB + retry uniformly. Exported so programmatic registrars (operator app controller) can set it explicitly. - Add a per-Deployment, opt-in Keepalive (KeepaliveConfig). It is NOT applied unless configured: a directly-reached grpc-go or grpcio (C-core) server rejects frequent idle pings with GOAWAY "too_many_pings", so keepalive is only safe for connectors fronted by a gateway that terminates the pings. Gateway-fronted deployments enable it (e.g. Time=30s, PermitWithoutStream). Signed-off-by: Kevin Su <pingsutw@gmail.com> Signed-off-by: Kevin Su <pingsutw@apache.org>
2d35ab2 to
ddc8dde
Compare
| "google.golang.org/grpc/credentials" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
| "google.golang.org/grpc/grpclog" | ||
| "google.golang.org/grpc/keepalive" |
| // by an L7 gateway (e.g. Knative/Kourier Envoy) that reaps idle connections; | ||
| // a directly-reached grpc-go or grpcio (C-core) server rejects frequent idle | ||
| // pings with GOAWAY "too_many_pings". | ||
| if connector.Keepalive != nil { |
…nector-grpc-keepalive-retry
config.Duration only implemented MarshalJSON, so gopkg.in/yaml.v2/v3 (which does not honor MarshalJSON) serialized the embedded time.Duration as a nested `duration:` map. Any config that serializes a Duration to YAML and is read back through the viper/mapstructure config loader (TagName json + StringToTimeDurationHookFunc, which expects a string like "30s") would fail to decode. This first surfaced with the connector keepalive config: the operator writes the connector configmap with yaml.v2, emitting `time:\n duration: 30s`, which the connector plugin could not read back. Add MarshalYAML/UnmarshalYAML so Duration round-trips as a flat "30s" string under YAML. Signed-off-by: Kevin Su <pingsutw@apache.org>
| serviceConfig := connector.DefaultServiceConfig | ||
| if len(serviceConfig) == 0 { | ||
| serviceConfig = DefaultGRPCServiceConfig | ||
| } | ||
| opts = append(opts, grpc.WithDefaultServiceConfig(serviceConfig)) |
| func TestGetGrpcConnection(t *testing.T) { | ||
| ctx := context.Background() | ||
|
|
Why are the changes needed?
Connector task RPCs (
CreateTask/GetTask/DeleteTask) frequently traverse an L7 gateway — e.g. the Knative/Kourier Envoy gateway that fronts connector services. That gateway reaps idle HTTP/2 connections and sendsGOAWAY/drain on routing changes (which happen routinely as Knative revisions scale). The connector plugin caches one*grpc.ClientConnper endpoint for the process lifetime, so when the gateway closes the connection the cached client only discovers it on the next RPC, which then fails with:surfacing to users as
failed to create task from connector .... Today the client sets no keepalive and no retry policy (unless a deployment happens to specifyDefaultServiceConfig), so a single reaped connection fails the in-flight task RPC.What changes were proposed in this pull request?
In
flyteplugins/go/tasks/plugins/webapi/connector:getGrpcConnectionnow always setskeepalive.ClientParameters{Time: 30s, Timeout: 10s, PermitWithoutStream: true}so long-lived connector connections are not reaped while idle and a half-dead connection is detected proactively instead of on the next RPC.defaultGRPCServiceConfig(round-robin LB +retryPolicyretrying onlyUNAVAILABLE, with exponential backoff, plusretryThrottlingto avoid hammering a connector that is genuinely down).getGrpcConnectionfalls back to it when aDeploymentdoes not set its ownDefaultServiceConfig, so every deployment (DefaultConnector,connectors,connectorApps) gets LB + retry uniformly without per-deployment config. Per-deploymentDefaultServiceConfigstill overrides.UNAVAILABLEis the only retried code: the "server preface" failure means the request was never delivered, so retrying is safe; the connector's async create/get/delete are keyed by task-execution-id so retries are idempotent.How was this patch tested?
Unit tests added/updated in the same package (
go test ./go/tasks/plugins/webapi/connector/...passes,go vetandgofmtclean):TestDefaultGRPCServiceConfig— asserts the default config is valid JSON (grpc silently ignores malformed configs), usesround_robin, targetsflyteidl2.connector.AsyncConnectorService, retries onlyUNAVAILABLE, and includesretryThrottling.TestGetGrpcConnection— verifies the empty→fallback path and that a deployment-specificDefaultServiceConfigstill takes precedence.TestDefaultAgentConfigfor the new empty default.Labels
Check all the applicable boxes