Skip to content

CC-38979 Add opt-in validation for signal data collection table#413

Open
Animesh Kumar (akanimesh7) wants to merge 10 commits intov3.2.5-hotfix-xfrom
CC-38979-signal-table-validation
Open

CC-38979 Add opt-in validation for signal data collection table#413
Animesh Kumar (akanimesh7) wants to merge 10 commits intov3.2.5-hotfix-xfrom
CC-38979-signal-table-validation

Conversation

@akanimesh7
Copy link
Copy Markdown
Member

@akanimesh7 Animesh Kumar (akanimesh7) commented Apr 23, 2026

Summary

  • Add signal.data.collection.validation.enabled (default false). When enabled, connector validate() probes the signaling table for existence and the documented three-column id/type/data shape (compared case-insensitively) and surfaces failures on the signal.data.collection ConfigValue.
  • The validator also enforces the connector-specific canonical form (MySQL: db.table; Postgres: schema.table; SqlServer: db.schema.table) by reusing the runtime's own signal predicate (CommonConnectorConfig.isSignalDataCollection(DataCollectionId)). This is the same check EventDispatcher performs at capture time, so validator and runtime cannot drift.
  • DB-specific identifier resolution is a protected JdbcConnection.resolveSignalDataCollectionTableId(String) hook; the flag / config / try-catch scaffolding lives once in the base. Only BinlogConnectorConnection overrides the hook (catalog-first parse + isTableIdCaseSensitive() fold). Postgres and SqlServer use the default schema-first probe unchanged.
  • Pure shape check is a new helper in debezium-core (SignalDataCollectionChecks).
  • Probe failures (any Exception) are logged and swallowed (no error surfaced) so a validator-side bug can never block connector CREATE/UPDATE — the signal table isn't exercised until a signal is inserted.
  • Backward compatible: the default value of the flag reproduces today's behavior.

Rollout / backward compatibility plan

Enabling the validation flag on by default would reject CREATE/UPDATE for any existing customer whose signal table doesn't match the documented shape (different names, extra column, wrong count). Before flipping the default we will:

  1. Audit the fleet to identify connectors whose current signal tables would fail the new checks.
  2. Use a LaunchDarkly flag to force signal.data.collection.validation.enabled=false for those specific connectors (targeted by connector ID), so they keep today's behavior and keep working.
  3. Flip the global default to true only after the LaunchDarkly targeting is in place, so new connectors and already-conforming existing connectors get the stricter checks while non-conforming ones are explicitly grandfathered via LaunchDarkly.

Test plan

  • 11 unit tests on SignalDataCollectionChecks — all green
  • 7 ITs on PostgreSQL against a real container — all green (also caught + fixed a latent NPE where PostgresConnection.readTableColumn required a TypeRegistry not present at validate time)
  • 7 ITs on SqlServer against a real container — all green
  • 7 ITs on MySQL against a real container, exercised end-to-end through MySqlConnector.validate(...) — all green
  • Regression: with the flag off, misshapen signal tables do not produce validation errors (backward-compat)

Manual end-to-end test (Kafka Connect PUT /connector-plugins/<CLASS>/config/validate)

For each connector, drove four scenarios against a running DB container through the V2 plugin on local Kafka Connect, with signal.data.collection.validation.enabled=true. The errors array on the signal.data.collection ConfigValue was captured from the REST response.

Scenario MySqlConnectorV2 PostgresConnectorV2 SqlServerConnector (V2)
(a) table missing Signal data collection '<id>' was not found in the database. same same
(b) non-canonical form 1-part sigsignal.data.collection must be 'manualdb.sig_manual_test' (got 'sig_manual_test'). 3-part postgres.public.sigsignal.data.collection must be 'public.sig_manual_test' (got 'postgres.public.sig_manual_test'). 2-part dbo.sigsignal.data.collection must be 'testDB1.dbo.sig_manual_test' (got 'dbo.sig_manual_test').
(c) wrong column name at position 0 column at position 0 must be named 'id' but found 'signal_id'. same same
(d) canonical + well-formed no errors no errors no errors

Additionally verified that all three column-name positions are checked (not only position 0): a table with signal_id/signal_type/payload produced three separate errors, one per position.

🤖 Generated with Claude Code

Introduce signal.data.collection.validation.enabled (default false). When
enabled at connector validate() time, probe the configured signaling table
for existence and the shape mandated by the Debezium docs: exactly three
columns named id/type/data in that order (compared case-insensitively).

DB-specific identifier resolution lives on each *Connection subclass
(PostgresConnection, SqlServerConnection, BinlogConnectorConnection for
MySQL/MariaDB, OracleConnection), reusing the existing readTableNames
polymorphism and, for MySQL, the existing isTableIdCaseSensitive() fold.
The shape check itself is a pure static helper in debezium-core.

Validator failures attach to the signal.data.collection ConfigValue so the
Kafka Connect REST /validate call surfaces them. SQLException during the
probe is logged and swallowed (empty result) so a validator-side bug can
never block CREATE/UPDATE - the signal table is only exercised later when
a signal is inserted.

Backward compatible: the default value of the flag reproduces today's
behavior. Covered by 11 unit tests on the pure checks plus per-connector
ITs (Postgres 8, SQL Server 6, MySQL 6) that exercise the full
connector.validate -> connection.validateSignalDataCollection ->
SignalDataCollectionChecks.attach path against real database containers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Move the flag/config guard, try/catch, "not found" error, and column-shape
delegation onto JdbcConnection.validateSignalDataCollection, exposing just
the DB-specific identifier resolution as a protected hook
(resolveSignalDataCollectionTableId). PostgresConnection, SqlServerConnection,
and OracleConnection now inherit the base implementation unchanged - the
default schema-first parse + readTableNames probe matches what each of them
was doing. BinlogConnectorConnection keeps a small override for catalog-first
parsing and conditional TableId.toLowercase() per isTableIdCaseSensitive().

Net effect: ~100 lines of duplicated scaffolding removed across four
*Connection subclasses. All 31 existing unit + IT tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Oracle is outside the reactor for this branch, so the Oracle connector's
validate() call into the signal-table validator was dead code for this PR.
Revert OracleConnector.validateConnection to the base branch so the diff
stays scoped to the three connectors actually shipped from v3.2.5-hotfix-x
(MySQL, PostgreSQL, SQL Server). The base JdbcConnection template method
and default resolver hook remain available for Oracle / other connectors
in follow-up upstream work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add targeted DEBUG logs in JdbcConnection.validateSignalDataCollection so
operators can trace the validator end-to-end without pulling in a stack
trace: entry, flag/config short-circuits, resolved TableId, column list,
and the final error list (if any). Existing SQLException WARN is kept.

All new logs are at DEBUG so the default log level is unchanged. Enable
via category io.debezium.jdbc.JdbcConnection.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tegy

FieldTest.shouldNotAllowDuplicateGroupAssignment failed for MySQL/MariaDB/
SqlServer/MongoDB/Postgres because the new signal.data.collection.validation.enabled
was declared at ADVANCED position 23, already occupied by topic.naming.strategy.
Move it to the next unused slot (24).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Verify signal-table validator accepts the short forms customers actually use
in the wild, in addition to the canonical documented form:

- SqlServer: 2-part `schema.table` (resolves against connected database) in
  addition to 3-part `db.schema.table`.
- Postgres: 3-part `db.schema.table` in addition to 2-part `schema.table`.
- MySQL: bare 1-part `table` (MySQL has no schemas; driver scans all catalogs)
  in addition to 2-part `db.table`. The 1-part case is pinned down to make any
  future behavior change visible in review — its contract is not guaranteed.

Tests run against real containers; all three new cases pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@b-goyal
Copy link
Copy Markdown
Member

I did not follow, what exactly was tested ?

Tested with mysql, postgres and sqlserver with signal.data.collection set to the table in table.include.listt along with "signal.data.collection.validation.enabled": "true",. Found that the validation is failing.

LOGGER.error(errorMessage);
userValue.addErrorMessage(errorMessage);
}
SignalDataCollectionChecks.attach(connection.validateSignalDataCollection(sqlServerConfig), configValues);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine for SqlServer based on this but just see if this introduction can cause validation timeout for any connector

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not, it's a minor driver call to fetch one table from the driver.
In the worst case, we can turn the validation off and figure this out.

Comment thread debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java Outdated
*/
public static void attach(List<String> errors, Map<String, ConfigValue> configValues) {
final ConfigValue target = configValues.get(CommonConnectorConfig.SIGNAL_DATA_COLLECTION.name());
errors.forEach(target::addErrorMessage);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be safe, should we add a null check for errors?

Comment on lines +1429 to +1431
if (!config.isSignalDataCollectionValidationEnabled()) {
LOGGER.debug("Signal data collection validation disabled; skipping");
return Collections.emptyList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the check is disabled by default and we return immediately, how would we audit the fleet?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan was to enable the check by default and find out the connectors which would have failed due to the check. Only override those via LD to the disabled check variation.

But now that you pointed out. I think another config -- which will determine the action to be taken if validations don't pass would be a right approach. We can roll initially with check enabled but action as warn and later decide.

Comment on lines +1457 to +1460
catch (SQLException e) {
LOGGER.warn("Unable to validate signal data collection '{}'; skipping signal-table check", raw, e);
return Collections.emptyList();
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an Exception block too for any non-SqlExceptions? The SqlServerConnector class (caller) has an Exception block but that registers the failure against hostName which may be incorrect in this case.

Copy link
Copy Markdown
Member

@b-goyal Bhagyashree (b-goyal) Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to this, if not already done can you test some invalid/complex signal table names too? Some examples -

  1. / a (Space)
  2. a.b.c (3 cases where one of a/b/c does not exist)
  3. a.b.c d (table name with space)
  4. [a].[b].[c] (SqlServer supports [])
  5. bad_db.a.b (a table where db is non-existent)
  6. any other you think make sense.

}
catch (SQLException e) {
LOGGER.warn("Unable to validate signal data collection '{}'; skipping signal-table check", raw, e);
return Collections.emptyList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct to return an empty list for a SqlException during validation? I assumed the failures may be permissions issue/non-existent db etc. and those sound genuine enough to be reported back to the user if validation is enabled.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is signal table's structure validation is not FATAL. Connectors don't require signal table in most of their lifetime. So not letting the connector start just because signal table could not be validated didn't sound great.

@akanimesh7
Copy link
Copy Markdown
Member Author

I did not follow, what exactly was tested ?

Tested with mysql, postgres and sqlserver with signal.data.collection set to the table in table.include.listt along with "signal.data.collection.validation.enabled": "true",. Found that the validation is failing.

Replaced this

Replace inline `resolved.equals(TableId.parse(raw, true))` with
CommonConnectorConfig.isSignalDataCollection(resolved) - the same
check EventDispatcher runs on captured events, so validator and
runtime cannot drift.

Broaden catch from SQLException to Exception so driver-side
RuntimeExceptions are swallowed by the validator rather than
escaping to BinlogConnector.validateConnection's generic block
where they get misattributed to hostname.

ITs tightened to assert exact canonical-form error messages per
connector (1-part MySQL, 2-part SqlServer, 3-part Postgres).
Two redundant Postgres tests (tooFew, jsonb) dropped - same
branches covered by the retained tests. SqlServer IT's
createTestDatabase moved to @BeforeClass to cut setup cost.

Tested end-to-end through Kafka Connect validate REST on all
three V2 connectors; all four scenarios (missing, non-canonical,
wrong-column, happy-path) produce the intended messages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The validator checks existence + column count + column names (case-insensitive),
but not column types. The config's description claimed "and column types"
which was stale — remove it to match behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add an internal-only `signal.data.collection.validation.action` enum
config (WARN/FAIL, default WARN) modeled after `guardrail.collections.
limit.action`. Routes the validator's outcome:

- WARN: log only via the new `[signal.data.collection.validation]`
  audit prefix; validate() returns 200 with no ConfigValue errors.
- FAIL: surface errors on the `signal.data.collection` ConfigValue
  so connector CREATE/UPDATE is rejected; also emits the audit log
  for fleet observability.

Default WARN keeps the initial fleet rollout observability-only.
Both configs (`validation.enabled` + `validation.action`) are
internal-only; not customer-facing.

Audit visibility additions:

- Probe-failure path (`SQLException` in JdbcConnection.validate
  SignalDataCollection's catch): now uses the audit prefix and
  inlines the driver exception message; full stacktrace also logged.
- Upstream parser failure path (`TableId.parse` from `RelationalTable
  Filters.<init>` and `CommonConnectorConfig.<init>`): wrap with
  try/catch that logs the audit prefix and re-throws so HTTP 500
  behavior is preserved (legacy/back-compat). Customer experience
  unchanged; fleet now sees a greppable line.

ITs: MySQL adds `warnActionDoesNotSurfaceErrorsToConfigValue` to
lock in the WARN→ConfigValue-clean contract through the full
`MySqlConnector.validate(...)` pipeline. Unit tests split into
WARN-action and FAIL-action variants on `attach()`.

Tested end-to-end through Kafka Connect REST validate on all
three V2 connectors (SqlServer/Postgres/MySQL) for: missing
table, wrong canonical form (1/2/3-part variants), wrong
column name, case mismatch, probe failure, and pre-existing
parse-failure 500s. Every case where a problem exists emits a
greppable audit line; happy paths stay silent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The runtime is positional end-to-end:
- read: parseSignallingMessage extracts row fields by position 0/1/2 and
  SignalRecord builds from positions, never looking up by name.
- write: SignalBasedIncrementalSnapshotChangeEventSource issues
  `INSERT INTO <signal_table> VALUES (?, ?, ?)` with no column list.

A signal table with id/type/data renamed (e.g. signal_id/signal_type/payload)
runs correctly today. Validating names would be a regression in FAIL mode for
customers who happen to have such configs working in production, and a
misleading audit signal in WARN mode.

Drop the name check. Keep table-existence + canonical-form (runtime predicate)
+ exactly-3-columns. Column-type validation deferred to a follow-up — requires
per-connector wiring through each connector's ValueConverter for the only
authoritative answer to "is this JDBC type going to map to Connect STRING?".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
columns.stream().map(c -> c.name() + " " + c.typeName()).collect(Collectors.joining(", ")));
final List<String> errors = SignalDataCollectionChecks.validateShape(raw, columns);
if (!errors.isEmpty()) {
LOGGER.debug("Signal data collection '{}' validation produced {} error(s): {}", raw, errors.size(), errors);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add [signal.data.collection.validation] in debug logs as well?

}
catch (Exception e) {
LOGGER.warn("[signal.data.collection.validation] Unable to validate signal data collection '{}'; skipping signal-table check ({})", raw, e.getMessage(), e);
return Collections.emptyList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Animesh Kumar (@akanimesh7) can we add testcase for failure scenario if not added.

* table identifiers.
*/
@Override
protected TableId resolveSignalDataCollectionTableId(String raw) throws SQLException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of raw can we use better variable name like rawTableId ?

@@ -0,0 +1 @@
-- Empty DDL. The signal-data-collection validate IT creates its own test table per case.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this req?

* probes once via {@link #readTableNames}. Subclasses should override to apply DB-specific
* parsing (e.g. catalog-first on MySQL) or identifier folding before probing.
*/
protected TableId resolveSignalDataCollectionTableId(String raw) throws SQLException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace raw with something informative.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants