Skip to content

fix(connector): add gRPC keepalive and default UNAVAILABLE retry policy#7458

Open
pingsutw wants to merge 4 commits into
mainfrom
fix/connector-grpc-keepalive-retry
Open

fix(connector): add gRPC keepalive and default UNAVAILABLE retry policy#7458
pingsutw wants to merge 4 commits into
mainfrom
fix/connector-grpc-keepalive-retry

Conversation

@pingsutw

Copy link
Copy Markdown
Member

Why are the changes needed?

Connector task RPCs (CreateTask/GetTask/DeleteTask) frequently traverse an L7 gateway — e.g. the Knative/Kourier Envoy gateway that fronts connector services. That gateway reaps idle HTTP/2 connections and sends GOAWAY/drain on routing changes (which happen routinely as Knative revisions scale). The connector plugin caches one *grpc.ClientConn per endpoint for the process lifetime, so when the gateway closes the connection the cached client only discovers it on the next RPC, which then fails with:

rpc error: code = Unavailable desc = connection error: desc =
"error reading server preface: ... use of closed network connection"

surfacing to users as failed to create task from connector .... Today the client sets no keepalive and no retry policy (unless a deployment happens to specify DefaultServiceConfig), so a single reaped connection fails the in-flight task RPC.

What changes were proposed in this pull request?

In flyteplugins/go/tasks/plugins/webapi/connector:

  1. Keepalive (primary fix). getGrpcConnection now always sets keepalive.ClientParameters{Time: 30s, Timeout: 10s, PermitWithoutStream: true} so long-lived connector connections are not reaped while idle and a half-dead connection is detected proactively instead of on the next RPC.
  2. Default service config with retry. Introduced a package-level defaultGRPCServiceConfig (round-robin LB + retryPolicy retrying only UNAVAILABLE, with exponential backoff, plus retryThrottling to avoid hammering a connector that is genuinely down). getGrpcConnection falls back to it when a Deployment does not set its own DefaultServiceConfig, so every deployment (DefaultConnector, connectors, connectorApps) gets LB + retry uniformly without per-deployment config. Per-deployment DefaultServiceConfig still overrides.

UNAVAILABLE is the only retried code: the "server preface" failure means the request was never delivered, so retrying is safe; the connector's async create/get/delete are keyed by task-execution-id so retries are idempotent.

How was this patch tested?

Unit tests added/updated in the same package (go test ./go/tasks/plugins/webapi/connector/... passes, go vet and gofmt clean):

  • TestDefaultGRPCServiceConfig — asserts the default config is valid JSON (grpc silently ignores malformed configs), uses round_robin, targets flyteidl2.connector.AsyncConnectorService, retries only UNAVAILABLE, and includes retryThrottling.
  • TestGetGrpcConnection — verifies the empty→fallback path and that a deployment-specific DefaultServiceConfig still takes precedence.
  • Updated TestDefaultAgentConfig for the new empty default.

Labels

  • fixed

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Connector RPCs frequently traverse an L7 gateway (e.g. the Knative/Kourier
Envoy gateway) that reaps idle HTTP/2 connections and sends GOAWAY/drain on
routing changes. The cached *grpc.ClientConn then fails the next RPC with
"error reading server preface: ... use of closed network connection"
(gRPC code Unavailable), surfacing as 'failed to create task from connector'.

- Add client keepalive (Time=30s, Timeout=10s, PermitWithoutStream=true) so
  long-lived connector connections are not reaped while idle and dead
  connections are detected proactively.
- Introduce defaultGRPCServiceConfig (round-robin LB + retryPolicy on
  UNAVAILABLE + retryThrottling) and have getGrpcConnection fall back to it
  when a Deployment does not set DefaultServiceConfig, so every deployment
  (DefaultConnector, connectors, connectorApps) gets LB + retry uniformly
  without per-deployment config.

Signed-off-by: Kevin Su <pingsutw@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copilot AI review requested due to automatic review settings May 29, 2026 19:24

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the connector gRPC client defaults to make long-lived connector RPC connections more resilient to idle gateway connection reaping and transient UNAVAILABLE errors.

Changes:

  • Adds a default gRPC service config with round-robin load balancing, UNAVAILABLE retries, and retry throttling.
  • Adds client-side keepalive parameters for connector gRPC connections.
  • Updates default config and unit tests to reflect the new fallback behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
flyteplugins/go/tasks/plugins/webapi/connector/client.go Adds default service config fallback and keepalive dial options.
flyteplugins/go/tasks/plugins/webapi/connector/config.go Leaves default connector service config empty so client fallback applies.
flyteplugins/go/tasks/plugins/webapi/connector/config_test.go Updates default config expectations.
flyteplugins/go/tasks/plugins/webapi/connector/client_test.go Adds tests for default service config structure and connection fallback behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but this isn't needed for the grpc-go version we use (v1.81.1). The core google.golang.org/grpc package already registers round_robin for us: clientconn.go (which is package grpc) has

// google.golang.org/grpc/clientconn.go:55
_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.

and roundrobin's init() calls balancer.Register(...). Since this file already imports google.golang.org/grpc, the balancer is registered as soon as the package loads — no separate blank import required. The manual blank import was only necessary in much older grpc-go releases (pre ~v1.36) before that import was folded into clientconn.go.

Also, {"loadBalancingConfig":[{"round_robin":{}}]} was already the prior default service config and is in use in production, so there's no silent fallback here.

Comment on lines +55 to +58
var connectorKeepalive = keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this is valid, and the enforcement is cross-language (grpcio/Python servers run on C-core, whose defaults match grpc-go: min_recv_ping_interval_without_data = 5m, max_ping_strikes = 2, permit_without_calls = false), so unconditional aggressive keepalive could indeed trigger GOAWAY "too_many_pings" on a directly-reached connector.

Fixed: keepalive is now opt-in per Deployment via a new KeepaliveConfig, and getGrpcConnection applies it only when configured (default off). So directly-reached grpc-go/grpcio connectors get no idle pings. Keepalive is enabled only for connectors fronted by an L7 gateway (Knative/Kourier Envoy) where the pings terminate at Envoy rather than the connector's grpc server. The UNAVAILABLE retry policy remains always-on as the universal safety net.

…alive

Connector RPCs frequently traverse an L7 gateway (e.g. the Knative/Kourier
Envoy gateway) that reaps idle HTTP/2 connections and sends GOAWAY/drain on
routing changes. The cached *grpc.ClientConn then fails the next RPC with
"error reading server preface: ... use of closed network connection"
(gRPC code Unavailable), surfacing as 'failed to create task from connector'.

- Introduce DefaultGRPCServiceConfig (round-robin LB + retryPolicy on
  UNAVAILABLE + retryThrottling). getGrpcConnection falls back to it when a
  Deployment does not set DefaultServiceConfig, so every deployment gets LB +
  retry uniformly. Exported so programmatic registrars (operator app
  controller) can set it explicitly.
- Add a per-Deployment, opt-in Keepalive (KeepaliveConfig). It is NOT applied
  unless configured: a directly-reached grpc-go or grpcio (C-core) server
  rejects frequent idle pings with GOAWAY "too_many_pings", so keepalive is
  only safe for connectors fronted by a gateway that terminates the pings.
  Gateway-fronted deployments enable it (e.g. Time=30s, PermitWithoutStream).

Signed-off-by: Kevin Su <pingsutw@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Copilot AI review requested due to automatic review settings May 29, 2026 20:08
@pingsutw pingsutw force-pushed the fix/connector-grpc-keepalive-retry branch from 2d35ab2 to ddc8dde Compare May 29, 2026 20:08

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
// by an L7 gateway (e.g. Knative/Kourier Envoy) that reaps idle connections;
// a directly-reached grpc-go or grpcio (C-core) server rejects frequent idle
// pings with GOAWAY "too_many_pings".
if connector.Keepalive != nil {
@pingsutw pingsutw self-assigned this May 29, 2026
@pingsutw pingsutw added this to the V2 GA milestone May 29, 2026
pingsutw added 2 commits June 1, 2026 23:28
config.Duration only implemented MarshalJSON, so gopkg.in/yaml.v2/v3 (which
does not honor MarshalJSON) serialized the embedded time.Duration as a nested
`duration:` map. Any config that serializes a Duration to YAML and is read
back through the viper/mapstructure config loader (TagName json +
StringToTimeDurationHookFunc, which expects a string like "30s") would fail
to decode.

This first surfaced with the connector keepalive config: the operator writes
the connector configmap with yaml.v2, emitting `time:\n  duration: 30s`,
which the connector plugin could not read back. Add MarshalYAML/UnmarshalYAML
so Duration round-trips as a flat "30s" string under YAML.

Signed-off-by: Kevin Su <pingsutw@apache.org>
Copilot AI review requested due to automatic review settings June 2, 2026 17:18

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment on lines +75 to +79
serviceConfig := connector.DefaultServiceConfig
if len(serviceConfig) == 0 {
serviceConfig = DefaultGRPCServiceConfig
}
opts = append(opts, grpc.WithDefaultServiceConfig(serviceConfig))
Comment on lines +69 to +71
func TestGetGrpcConnection(t *testing.T) {
ctx := context.Background()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants