CC-38979 Add opt-in validation for signal data collection table#413
CC-38979 Add opt-in validation for signal data collection table#413Animesh Kumar (akanimesh7) wants to merge 10 commits intov3.2.5-hotfix-xfrom
Conversation
Introduce signal.data.collection.validation.enabled (default false). When enabled at connector validate() time, probe the configured signaling table for existence and the shape mandated by the Debezium docs: exactly three columns named id/type/data in that order (compared case-insensitively). DB-specific identifier resolution lives on each *Connection subclass (PostgresConnection, SqlServerConnection, BinlogConnectorConnection for MySQL/MariaDB, OracleConnection), reusing the existing readTableNames polymorphism and, for MySQL, the existing isTableIdCaseSensitive() fold. The shape check itself is a pure static helper in debezium-core. Validator failures attach to the signal.data.collection ConfigValue so the Kafka Connect REST /validate call surfaces them. SQLException during the probe is logged and swallowed (empty result) so a validator-side bug can never block CREATE/UPDATE - the signal table is only exercised later when a signal is inserted. Backward compatible: the default value of the flag reproduces today's behavior. Covered by 11 unit tests on the pure checks plus per-connector ITs (Postgres 8, SQL Server 6, MySQL 6) that exercise the full connector.validate -> connection.validateSignalDataCollection -> SignalDataCollectionChecks.attach path against real database containers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move the flag/config guard, try/catch, "not found" error, and column-shape delegation onto JdbcConnection.validateSignalDataCollection, exposing just the DB-specific identifier resolution as a protected hook (resolveSignalDataCollectionTableId). PostgresConnection, SqlServerConnection, and OracleConnection now inherit the base implementation unchanged - the default schema-first parse + readTableNames probe matches what each of them was doing. BinlogConnectorConnection keeps a small override for catalog-first parsing and conditional TableId.toLowercase() per isTableIdCaseSensitive(). Net effect: ~100 lines of duplicated scaffolding removed across four *Connection subclasses. All 31 existing unit + IT tests still green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Oracle is outside the reactor for this branch, so the Oracle connector's validate() call into the signal-table validator was dead code for this PR. Revert OracleConnector.validateConnection to the base branch so the diff stays scoped to the three connectors actually shipped from v3.2.5-hotfix-x (MySQL, PostgreSQL, SQL Server). The base JdbcConnection template method and default resolver hook remain available for Oracle / other connectors in follow-up upstream work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add targeted DEBUG logs in JdbcConnection.validateSignalDataCollection so operators can trace the validator end-to-end without pulling in a stack trace: entry, flag/config short-circuits, resolved TableId, column list, and the final error list (if any). Existing SQLException WARN is kept. All new logs are at DEBUG so the default log level is unchanged. Enable via category io.debezium.jdbc.JdbcConnection. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tegy FieldTest.shouldNotAllowDuplicateGroupAssignment failed for MySQL/MariaDB/ SqlServer/MongoDB/Postgres because the new signal.data.collection.validation.enabled was declared at ADVANCED position 23, already occupied by topic.naming.strategy. Move it to the next unused slot (24). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Verify signal-table validator accepts the short forms customers actually use in the wild, in addition to the canonical documented form: - SqlServer: 2-part `schema.table` (resolves against connected database) in addition to 3-part `db.schema.table`. - Postgres: 3-part `db.schema.table` in addition to 2-part `schema.table`. - MySQL: bare 1-part `table` (MySQL has no schemas; driver scans all catalogs) in addition to 2-part `db.table`. The 1-part case is pinned down to make any future behavior change visible in review — its contract is not guaranteed. Tests run against real containers; all three new cases pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
I did not follow, what exactly was tested ?
|
| LOGGER.error(errorMessage); | ||
| userValue.addErrorMessage(errorMessage); | ||
| } | ||
| SignalDataCollectionChecks.attach(connection.validateSignalDataCollection(sqlServerConfig), configValues); |
There was a problem hiding this comment.
I think it would be fine for SqlServer based on this but just see if this introduction can cause validation timeout for any connector
There was a problem hiding this comment.
It should not, it's a minor driver call to fetch one table from the driver.
In the worst case, we can turn the validation off and figure this out.
| */ | ||
| public static void attach(List<String> errors, Map<String, ConfigValue> configValues) { | ||
| final ConfigValue target = configValues.get(CommonConnectorConfig.SIGNAL_DATA_COLLECTION.name()); | ||
| errors.forEach(target::addErrorMessage); |
There was a problem hiding this comment.
Just to be safe, should we add a null check for errors?
| if (!config.isSignalDataCollectionValidationEnabled()) { | ||
| LOGGER.debug("Signal data collection validation disabled; skipping"); | ||
| return Collections.emptyList(); |
There was a problem hiding this comment.
If the check is disabled by default and we return immediately, how would we audit the fleet?
There was a problem hiding this comment.
Plan was to enable the check by default and find out the connectors which would have failed due to the check. Only override those via LD to the disabled check variation.
But now that you pointed out. I think another config -- which will determine the action to be taken if validations don't pass would be a right approach. We can roll initially with check enabled but action as warn and later decide.
| catch (SQLException e) { | ||
| LOGGER.warn("Unable to validate signal data collection '{}'; skipping signal-table check", raw, e); | ||
| return Collections.emptyList(); | ||
| } |
There was a problem hiding this comment.
Should we add an Exception block too for any non-SqlExceptions? The SqlServerConnector class (caller) has an Exception block but that registers the failure against hostName which may be incorrect in this case.
There was a problem hiding this comment.
To add to this, if not already done can you test some invalid/complex signal table names too? Some examples -
/a(Space)a.b.c(3 cases where one of a/b/c does not exist)a.b.c d(table name with space)[a].[b].[c](SqlServer supports [])bad_db.a.b(a table where db is non-existent)- any other you think make sense.
| } | ||
| catch (SQLException e) { | ||
| LOGGER.warn("Unable to validate signal data collection '{}'; skipping signal-table check", raw, e); | ||
| return Collections.emptyList(); |
There was a problem hiding this comment.
Is this correct to return an empty list for a SqlException during validation? I assumed the failures may be permissions issue/non-existent db etc. and those sound genuine enough to be reported back to the user if validation is enabled.
There was a problem hiding this comment.
The idea is signal table's structure validation is not FATAL. Connectors don't require signal table in most of their lifetime. So not letting the connector start just because signal table could not be validated didn't sound great.
Replaced this |
Replace inline `resolved.equals(TableId.parse(raw, true))` with CommonConnectorConfig.isSignalDataCollection(resolved) - the same check EventDispatcher runs on captured events, so validator and runtime cannot drift. Broaden catch from SQLException to Exception so driver-side RuntimeExceptions are swallowed by the validator rather than escaping to BinlogConnector.validateConnection's generic block where they get misattributed to hostname. ITs tightened to assert exact canonical-form error messages per connector (1-part MySQL, 2-part SqlServer, 3-part Postgres). Two redundant Postgres tests (tooFew, jsonb) dropped - same branches covered by the retained tests. SqlServer IT's createTestDatabase moved to @BeforeClass to cut setup cost. Tested end-to-end through Kafka Connect validate REST on all three V2 connectors; all four scenarios (missing, non-canonical, wrong-column, happy-path) produce the intended messages. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The validator checks existence + column count + column names (case-insensitive), but not column types. The config's description claimed "and column types" which was stale — remove it to match behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add an internal-only `signal.data.collection.validation.action` enum config (WARN/FAIL, default WARN) modeled after `guardrail.collections. limit.action`. Routes the validator's outcome: - WARN: log only via the new `[signal.data.collection.validation]` audit prefix; validate() returns 200 with no ConfigValue errors. - FAIL: surface errors on the `signal.data.collection` ConfigValue so connector CREATE/UPDATE is rejected; also emits the audit log for fleet observability. Default WARN keeps the initial fleet rollout observability-only. Both configs (`validation.enabled` + `validation.action`) are internal-only; not customer-facing. Audit visibility additions: - Probe-failure path (`SQLException` in JdbcConnection.validate SignalDataCollection's catch): now uses the audit prefix and inlines the driver exception message; full stacktrace also logged. - Upstream parser failure path (`TableId.parse` from `RelationalTable Filters.<init>` and `CommonConnectorConfig.<init>`): wrap with try/catch that logs the audit prefix and re-throws so HTTP 500 behavior is preserved (legacy/back-compat). Customer experience unchanged; fleet now sees a greppable line. ITs: MySQL adds `warnActionDoesNotSurfaceErrorsToConfigValue` to lock in the WARN→ConfigValue-clean contract through the full `MySqlConnector.validate(...)` pipeline. Unit tests split into WARN-action and FAIL-action variants on `attach()`. Tested end-to-end through Kafka Connect REST validate on all three V2 connectors (SqlServer/Postgres/MySQL) for: missing table, wrong canonical form (1/2/3-part variants), wrong column name, case mismatch, probe failure, and pre-existing parse-failure 500s. Every case where a problem exists emits a greppable audit line; happy paths stay silent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The runtime is positional end-to-end: - read: parseSignallingMessage extracts row fields by position 0/1/2 and SignalRecord builds from positions, never looking up by name. - write: SignalBasedIncrementalSnapshotChangeEventSource issues `INSERT INTO <signal_table> VALUES (?, ?, ?)` with no column list. A signal table with id/type/data renamed (e.g. signal_id/signal_type/payload) runs correctly today. Validating names would be a regression in FAIL mode for customers who happen to have such configs working in production, and a misleading audit signal in WARN mode. Drop the name check. Keep table-existence + canonical-form (runtime predicate) + exactly-3-columns. Column-type validation deferred to a follow-up — requires per-connector wiring through each connector's ValueConverter for the only authoritative answer to "is this JDBC type going to map to Connect STRING?". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| columns.stream().map(c -> c.name() + " " + c.typeName()).collect(Collectors.joining(", "))); | ||
| final List<String> errors = SignalDataCollectionChecks.validateShape(raw, columns); | ||
| if (!errors.isEmpty()) { | ||
| LOGGER.debug("Signal data collection '{}' validation produced {} error(s): {}", raw, errors.size(), errors); |
There was a problem hiding this comment.
should we add [signal.data.collection.validation] in debug logs as well?
| } | ||
| catch (Exception e) { | ||
| LOGGER.warn("[signal.data.collection.validation] Unable to validate signal data collection '{}'; skipping signal-table check ({})", raw, e.getMessage(), e); | ||
| return Collections.emptyList(); |
There was a problem hiding this comment.
Animesh Kumar (@akanimesh7) can we add testcase for failure scenario if not added.
| * table identifiers. | ||
| */ | ||
| @Override | ||
| protected TableId resolveSignalDataCollectionTableId(String raw) throws SQLException { |
There was a problem hiding this comment.
instead of raw can we use better variable name like rawTableId ?
| @@ -0,0 +1 @@ | |||
| -- Empty DDL. The signal-data-collection validate IT creates its own test table per case. | |||
There was a problem hiding this comment.
is this req?
| * probes once via {@link #readTableNames}. Subclasses should override to apply DB-specific | ||
| * parsing (e.g. catalog-first on MySQL) or identifier folding before probing. | ||
| */ | ||
| protected TableId resolveSignalDataCollectionTableId(String raw) throws SQLException { |
There was a problem hiding this comment.
can we replace raw with something informative.
Summary
signal.data.collection.validation.enabled(defaultfalse). When enabled, connectorvalidate()probes the signaling table for existence and the documented three-columnid/type/datashape (compared case-insensitively) and surfaces failures on thesignal.data.collectionConfigValue.db.table; Postgres:schema.table; SqlServer:db.schema.table) by reusing the runtime's own signal predicate (CommonConnectorConfig.isSignalDataCollection(DataCollectionId)). This is the same checkEventDispatcherperforms at capture time, so validator and runtime cannot drift.JdbcConnection.resolveSignalDataCollectionTableId(String)hook; the flag / config / try-catch scaffolding lives once in the base. OnlyBinlogConnectorConnectionoverrides the hook (catalog-first parse +isTableIdCaseSensitive()fold). Postgres and SqlServer use the default schema-first probe unchanged.debezium-core(SignalDataCollectionChecks).Exception) are logged and swallowed (no error surfaced) so a validator-side bug can never block connector CREATE/UPDATE — the signal table isn't exercised until a signal is inserted.Rollout / backward compatibility plan
Enabling the validation flag on by default would reject CREATE/UPDATE for any existing customer whose signal table doesn't match the documented shape (different names, extra column, wrong count). Before flipping the default we will:
signal.data.collection.validation.enabled=falsefor those specific connectors (targeted by connector ID), so they keep today's behavior and keep working.trueonly after the LaunchDarkly targeting is in place, so new connectors and already-conforming existing connectors get the stricter checks while non-conforming ones are explicitly grandfathered via LaunchDarkly.Test plan
SignalDataCollectionChecks— all greenPostgresConnection.readTableColumnrequired aTypeRegistrynot present at validate time)MySqlConnector.validate(...)— all greenManual end-to-end test (Kafka Connect
PUT /connector-plugins/<CLASS>/config/validate)For each connector, drove four scenarios against a running DB container through the V2 plugin on local Kafka Connect, with
signal.data.collection.validation.enabled=true. Theerrorsarray on thesignal.data.collectionConfigValue was captured from the REST response.Signal data collection '<id>' was not found in the database.sig→signal.data.collection must be 'manualdb.sig_manual_test' (got 'sig_manual_test').postgres.public.sig→signal.data.collection must be 'public.sig_manual_test' (got 'postgres.public.sig_manual_test').dbo.sig→signal.data.collection must be 'testDB1.dbo.sig_manual_test' (got 'dbo.sig_manual_test').column at position 0 must be named 'id' but found 'signal_id'.Additionally verified that all three column-name positions are checked (not only position 0): a table with
signal_id/signal_type/payloadproduced three separate errors, one per position.🤖 Generated with Claude Code