Skip to content

[FLINK-34620] Fix stack overflow in recursive protobuf message handling in flink-protobuf#28108

Open
khaledh wants to merge 1 commit intoapache:masterfrom
khaledh:handle-recursive-proto-fields
Open

[FLINK-34620] Fix stack overflow in recursive protobuf message handling in flink-protobuf#28108
khaledh wants to merge 1 commit intoapache:masterfrom
khaledh:handle-recursive-proto-fields

Conversation

@khaledh
Copy link
Copy Markdown
Contributor

@khaledh khaledh commented May 4, 2026

What is the purpose of the change

Protobuf allows recursive message definitions (e.g., message Node { Node child = 1; }). When Flink's protobuf format encounters such a schema, PbToRowTypeUtil.generateRowType recurses infinitely and crashes with StackOverflowError. This makes it impossible to use the protobuf format with any proto schema that contains recursive references.

This change adds cycle detection to PbToRowTypeUtil. When a message type is encountered that is already being resolved in the current ancestor chain, the recursive field is emitted as VARBINARY (raw protobuf bytes) instead of recursing. The protobuf binary data is preserved and can be deserialized on demand by consumers if needed.

Brief change log

  • PbToRowTypeUtil tracks ancestor message types during schema generation and emits VARBINARY for fields that would cause infinite recursion
  • PbCodegenBytesDeserializer generates code to serialize recursive message fields to their raw protobuf bytes via .toByteArray()
  • PbCodegenDeserializeFactory routes MESSAGE fields mapped to VARBINARY to the new bytes deserializer
  • PbSchemaValidationUtils accepts VARBINARY as a valid mapping for MESSAGE fields at recursive boundaries

Verifying this change

Note: This change has been reviewed and deployed internally at Shopify on production Flink jobs with no regressions since March 13, 2026.

This change added tests and can be verified as follows:

  • Added RecursiveMessageProtoToRowTest with 6 tests covering:
    • Schema generation produces VARBINARY for recursive fields instead of crashing
    • Nested data is preserved as parseable protobuf bytes through the full deserialization pipeline
    • Proto3 unset recursive fields produce empty bytes (default instance) regardless of readDefaultValues setting
    • Proto2 unset recursive fields are null with readDefaultValues=false (explicit field presence)
    • Proto2 unset recursive fields produce empty bytes with readDefaultValues=true
    • Proto2 set recursive fields are preserved as bytes
  • Added test_recursive_message.proto (proto3) and test_recursive_message_proto2.proto (proto2) test protos with self-referencing message definitions
  • All flink-protobuf tests pass with no regressions

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes - adds handling for a new case in protobuf deserialization (recursive messages)
  • The runtime per-record code paths (performance sensitive): no - cycle detection runs at planning time only, not per-record
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 4, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants