-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-38522][cdc connector mysql] use ssl for BinaryLogClient when searching for binlog offset for starting mysql cdc from timestamp #4156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…earching for binlog offset for starting mysql cdc from timestamp
...or-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
Show resolved
Hide resolved
...ysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java
Show resolved
Hide resolved
...or-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
Outdated
Show resolved
Hide resolved
…re ssl and tests to ensure ssl config is honored
| try { | ||
| return mode == null ? null : SSLMode.valueOf(mode.name()); | ||
| } catch (IllegalArgumentException e) { | ||
| LOG.error("Invalid SecureConnectionMode provided {}", mode.name(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw an exception here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ruanhang1993 , I have thrown an exception here now
… test, throw runtime exception when we have invalid ssl mode
| public static BinlogOffset findBinlogOffset( | ||
| long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { | ||
| MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); | ||
| BinaryLogClient client = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can centralize the SSL configuration in the DebeziumUtils#createBinaryClient method,
Line 96 in 001cba0
| public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) { |
and based on the implementation of MySqlStreamingChangeEventSource, we also need to add the following code to set SslSocketFactory:
Lines 224 to 231 in 001cba0
| client.setSSLMode(sslModeFor(connectorConfig.sslMode())); | |
| if (connectorConfig.sslModeEnabled()) { | |
| SSLSocketFactory sslSocketFactory = | |
| getBinlogSslSocketFactory(connectorConfig, connection); | |
| if (sslSocketFactory != null) { | |
| client.setSslSocketFactory(sslSocketFactory); | |
| } | |
| } |
https://issues.apache.org/jira/browse/FLINK-38522
What is the purpose of the change
When using StartupOptions.timestamp(), the MySQL CDC connector calls DebeziumUtils.findBinlogOffset() to locate the appropriate binlog position. This method creates a short-lived BinaryLogClient to scan binlog files, but does not configure SSL mode on the client.
If the MySQL server requires SSL connections (e.g., require_secure_transport=ON or SSL mode configured in connection parameters), the connection fails because the BinaryLogClient attempts an unencrypted connection.
This Pull Request uses the ssl mode provided as part of the connection config for the BinaryLogClient as well to prevent this issue in cases where mysql is configured for require_secure_transport.
Changes
Set SSLMode to match what was provided in the connectionConfig for BinaryLogClient
Add unit test to ensure SSLMode is converted correctly.
Verifying this change