Skip to content

feat - kafka support#243

Open
msmygit wants to merge 55 commits into
masterfrom
feat/kafka_support
Open

feat - kafka support#243
msmygit wants to merge 55 commits into
masterfrom
feat/kafka_support

Conversation

@msmygit
Copy link
Copy Markdown
Member

@msmygit msmygit commented Mar 17, 2026

No description provided.

@msmygit msmygit changed the title Feat/kafka support feat - test support Mar 18, 2026
@msmygit msmygit force-pushed the feat/kafka_support branch from f28ff47 to 1dad8c4 Compare March 20, 2026 20:38
@msmygit msmygit force-pushed the feat/kafka_support branch 3 times, most recently from 2bfd5c2 to 2a04130 Compare April 7, 2026 21:03
Madhavan and others added 11 commits May 29, 2026 15:10
…Source to original implementation

- Reverted NativeSchemaWrapper.encode() to simple pass-through (return bytes)
- Reverted CassandraSource JsonValueRecord.getValue() and getKey() to original cast
- Root cause: Cannot handle GenericRecord in encode() due to method signature forcing byte[] cast
- The original simple implementation works correctly with Pulsar's internal handling
Root Cause:
- Phase 3 refactoring created separate MessagingClient instead of using SourceContext's PulsarClient
- Connector subscribed to wrong Pulsar instance, never received CDC events from agent
- All 24 connector tests failed across 3 Pulsar images

Fix:
- Reverted connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java to working version
- Restored direct Pulsar API usage via sourceContext.newConsumerBuilder()
- Removed messaging abstraction initialization methods
- Connector remains Pulsar-specific (runs in Pulsar IO framework)

Impact:
- Fixes all 24 test failures in connector module
- Code compiles successfully
- Agent messaging abstraction still supports both Pulsar and Kafka

Documentation:
- Added docs/CONNECTOR_ARCHITECTURE_DECISION.md
- Updated docs/BOB_CONTEXT_SUMMARY.md
…ka) CDC

Make the CDC agent publish change events to either Pulsar or Kafka/Confluent
via the messaging abstraction, with configurable serialization. Pulsar wire
format is unchanged; the active provider is selected at runtime.

Serialization
- commons: add MutationValueCodec, a canonical registry-less Avro codec for
  MutationValue shared by producer and (future) consumer.
- messaging-kafka: introduce a KafkaSerde strategy (RawAvroSerde registry-less
  default + RegistryAvroSerde Confluent), replacing the registry-mandatory
  KafkaSchemaProvider. Wire producer/consumer/client to it.
- agent: AbstractMessagingMutationSender branches by provider — Pulsar keeps its
  exact schema/format; Kafka uses raw Avro (default) or Confluent registry when
  kafkaSchemaRegistryUrl is set.

Bug fixes (also affecting CI)
- Agents passed Platform.PULSAR, rejecting all Kafka params; pass Platform.ALL
  and treat ALL as a wildcard in AgentConfig.configure.
- ProducerConfigBuilder.sendTimeoutMs(0) was rejected though 0 = infinite
  (Pulsar-compatible); allow 0, reject only negative.
- CassandraContainer.createCassandraContainerWithAgentKafka now sets
  messagingProvider=kafka.
- Bump Testcontainers 1.19.1 -> 1.20.6 and force the docker-java API version via
  the api.version system property (newer Docker engines reject the 1.32 default);
  CI passes -Papi.version=1.43.

Tests & CI
- Add unit tests for the messaging modules (codec, raw-avro serde, serde
  selection, SPI discovery, producer config validation).
- Add KafkaSingleNodeC4Tests (Testcontainers cp-kafka), tagged @tag("kafka");
  excluded from default runs, included via -PkafkaTests.
- Re-enable the test-kafka CI job; add -Papi.version to the test jobs.

Docs
- docs/KAFKA_SUPPORT.md describing agent Kafka config and serialization modes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…-> data)

New connector-kafka module: a Kafka Connect SinkConnector
(com.datastax.oss.kafka.sink.CassandraSinkConnector) that mirrors the Pulsar
CassandraSource for Kafka. It consumes the events-<ks>.<table> topic produced by
the CDC agent, de-duplicates mutations, queries Cassandra for the current row,
and publishes the row to data-<ks>.<table>.

Maximal reuse of the existing connector module: CassandraClient, MutationCache,
CassandraSourceConnectorConfig and the NativeAvroConverter/NativeJsonConverter
(whose toConnectData(Row) already returns raw AVRO/JSON bytes, provider-agnostic).
The events key (raw AVRO primary key) is decoded via the key converter's native
schema; the data record key reuses the event key bytes and the value is the
AVRO/JSON row (or null tombstone for a delete). Registry-less serialization,
matching the agent default.

- CassandraSinkConfig delegates Cassandra/cache/output settings to
  CassandraSourceConnectorConfig and injects a default events.topic.
- CassandraKafkaSinkE2ETest validates the full pipeline end-to-end (agent ->
  events -> connector -> data) for both AVRO and JSON output.
- CI: test-kafka job matrix extended to ['agent-c4','connector-kafka'].
- docs/KAFKA_SUPPORT.md documents deployment, config and known follow-ups.

Verified no regressions: Pulsar agent (testSchema) and Pulsar connector
(testSinglePk) still pass under the Testcontainers 1.20.6 bump.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@msmygit msmygit force-pushed the feat/kafka_support branch from e8b50b8 to 3ecdfb3 Compare May 30, 2026 01:18
Madhavan and others added 5 commits May 29, 2026 22:38
… sender construction)

The backfill-ci e2e tests failed across all matrix jobs due to two pre-existing
bugs introduced by the messaging-abstraction migration:

1) ProviderRegistry discovered providers via ServiceLoader using only the thread
   context class loader. In a Pulsar NAR (CLI-extension backfill path) that loader
   is not the one that bundled the META-INF/services files, so discovery returned
   "No provider implementation found for: PULSAR. Available providers: []".
   Fix: load from both the context class loader and the messaging API's own class
   loader (and the system loader as a last resort).

2) PulsarMutationSenderFactory reflectively instantiated
   PulsarMutationSender(MessagingClient, boolean) — a constructor that does not
   exist. The agent senders extend AbstractMessagingMutationSender, which owns its
   messaging client and is constructed from (AgentConfig, boolean).
   Fix: translate ImportSettings into an AgentConfig (provider=pulsar, service URL,
   auth, SSL/TLS, topic prefix) and instantiate via the real (AgentConfig, boolean)
   constructor.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ro Utf8

testTimestampInCollection failed with ClassCastException: org.apache.avro.util.Utf8
cannot be cast to org.apache.pulsar.shade.org.apache.avro.util.Utf8. The test helper
cast collection map keys to Pulsar's shaded Utf8, but depending on deserialization the
keys may be plain Avro Utf8. Compare keys via Object::toString to accept either type.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…kfill-ci

The custom backfill e2eTest task did not inherit the api.version system-property
forwarding configured on the standard `test` task, so the docker-java client used by
Testcontainers fell back to API 1.32 and failed on newer Docker engines with
"client version 1.32 is too old". Forward -Papi.version to the e2eTest task and pass
-Papi.version=1.43 in backfill-ci.yaml.

Validated end-to-end locally: backfill-cli e2eTest testBackfillCLISinglePk
(apachepulsar/pulsar:2.11.0 standalone path) BUILD SUCCESSFUL — export 100 PKs,
Sent mutations=100/Failed=0, connector produced 100 data rows, assertion passed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…on path

The lunastreaming backfill-ci jobs (the Pulsar NAR CLI-extension path)
failed testBackfillCLIFullSchema with:

  CodecNotFoundException: Codec not found for requested operation:
  [BOOLEAN <-> java.lang.String]

table2's primary key includes xboolean, so PulsarImporter builds a dsbulk
ConvertingCodecFactory to turn the exported CSV strings back into CQL
types. The no-arg ConvertingCodecFactory constructor discovers its
ConvertingCodecProviders (dsbulk-codecs-text's StringConvertingCodecProvider
handles String<->boolean/int/decimal/...) via ServiceLoader.load(Class),
which uses only the thread-context class loader. In a Pulsar NAR that loader
is not the NAR loader that bundled the META-INF/services files, so discovery
returns no providers and createConvertingCodec falls through to the codec
registry, which has no String<->boolean codec.

Same SPI/class-loader root cause as the earlier ProviderRegistry and
PulsarMutationSenderFactory fixes. Fix: construct the factory with the
thread-context class loader temporarily set to PulsarImporter's own loader
(the NAR loader that bundles dsbulk), then restore the previous loader.

apachepulsar 2.10.3/2.11.0 jobs run backfill as a standalone JAR where the
context loader is already correct, so only the LS 2.10_3.4 (NAR) jobs hit
this. BackfillUnloadWorkflow uses dsbulk's own codecSettings.createCodecFactory()
and the unload phase already succeeds, so it is left untouched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Rename the Pulsar test job to "Test Pulsar"; add jdk 17 alongside 11.
- Restore the 360-minute job timeout (was temporarily reduced to 90m for
  faster failure detection during debugging).
- Kafka job: expand modules to agent-c3/agent-c4/agent-dse4/connector-kafka,
  add jdk 17, and bump cp-kafka images to 7.8.8 / 7.9.7 / 8.1.3.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@msmygit msmygit changed the title feat - test support feat - kafka support May 30, 2026
Madhavan and others added 11 commits May 30, 2026 13:06
…nfig

determineProvider() previously fell back to PULSAR for any unrecognized
messagingProvider value. A typo (e.g. "kafak"), an unsupported name
("confluent"), or even "kafka " with a trailing space would silently run
the agent against Pulsar, surfacing only as confusing downstream
connection failures.

Changes:
- determineProvider() now trims and matches case-insensitively, defaults to
  PULSAR only when unset/blank, and throws IllegalArgumentException listing
  the supported values ('pulsar'/'kafka') for any other value.
- Add validateProviderConfig(), invoked at construction (fail fast), which
  requires kafkaBootstrapServers when provider=kafka, requires an http(s)
  kafkaSchemaRegistryUrl when one is supplied, and requires pulsarServiceUrl
  when provider=pulsar -- each with an actionable message instead of a late,
  cryptic client-side error.
- Add unit tests covering case/whitespace handling, invalid provider
  rejection, missing Kafka bootstrap servers, and malformed registry URL.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…arams

The Antora user docs were entirely Pulsar-only; the Kafka/Confluent
capability was documented only in the internal docs/KAFKA_SUPPORT.md, which
is not part of the published site. Users reading the official docs could not
discover Kafka, its parameters, or its serialization/security options.

Changes:
- New published page docs/modules/ROOT/pages/kafka.adoc ("Stream CDC to
  Kafka"): provider selection, enabling Kafka on the agent, registry-less
  raw Avro vs Confluent Schema Registry modes, producer tuning, SSL/SASL
  security, topic naming/data flow, and Kafka Connect sink deployment.
- Wire the page into nav.adoc; cross-link from index.adoc (overview +
  supported platforms) and install.adoc.
- stringMappings.adoc: add messagingProvider and all kafka* parameter
  -> CDC_* environment-variable mappings.
- Regenerate partials/agentParams.adoc from AgentConfig: now includes
  messagingProvider and the kafka* settings, plus the EnvVar column.
- AgentConfig.main() default output path corrected to the partials/
  directory (where the include lives) so future regen lands in place.
- antora.yml: add kafka/confluent attributes. KAFKA_SUPPORT.md: note it is
  the implementation reference and points at the published page.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The backfill CLI could only seed historical (pre-CDC) rows into Pulsar, so
Kafka deployments had no way to back-fill. The export + import engine was
already provider-agnostic (TableExporter + PulsarImporter + the messaging
abstraction's AbstractMessagingMutationSender, which branches by provider);
only the CLI option surface and the AgentConfig mapping were Pulsar-bound.

Code:
- ImportSettings: add --messaging-provider (default pulsar) and the --kafka-*
  options (bootstrap servers, schema registry URL, acks, compression, batch
  size, linger, max in-flight).
- PulsarMutationSenderFactory.buildAgentConfig: map the provider and the
  Kafka fields instead of hardcoding pulsar. Provider/required-field
  validation is handled by AbstractMessagingMutationSender at construction.
- Generalize CassandraToPulsarMigrator log messages.
- The pulsar-admin CLI-extension form stays Pulsar-only (there is no Kafka
  admin host); Kafka backfill runs as the standalone shadow JAR.

Tests/CI:
- New CassandraContainer.createCassandraContainer (no-agent node) so the
  backfill source needs neither an agent build nor DSE Maven creds.
- BackfillCLIKafkaE2ETest (@tag kafka): runs the backfill JAR with
  --messaging-provider=kafka against Kafka + Cassandra, asserts the events
  topic, then runs the Kafka sink in-process and asserts the data topic --
  the Kafka counterpart of BackfillCLIE2ETests.
- build.gradle: add connector-kafka/kafka-clients/connect-api/testcontainers-
  kafka test deps; new e2eTestKafka task (includeTags kafka); the existing
  e2eTest now excludeTags kafka so the Pulsar job skips it.
- backfill-ci.yaml: new test-kafka job over kafkaImage x cassandraFamily.
- Unit test PulsarMutationSenderFactoryTest covers the Kafka config mapping.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- backfill-cli.adoc: new "Back-fill to Kafka" section with a standalone-JAR
  example, a Kafka connectivity parameter table (--messaging-provider,
  --kafka-*), and a note that the pulsar-admin extension form is Pulsar-only.
- kafka.adoc: add a "Back-fill historical data to Kafka" section linking to
  the backfill reference.
- KAFKA_SUPPORT.md: document the backfill Kafka path, its shared engine, and
  the BackfillCLIKafkaE2ETest / test-kafka CI coverage.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…cassandra.yaml is mounted

The e2eTestKafka task set CASSANDRA_IMAGE and cassandraVersion per family but
not the cassandraFamily system property the test reads to choose the
config-override resource dir. So BackfillCLIKafkaE2ETest fell back to its
default "c4" and mounted the 4.x cassandra.yaml onto the c3 (3.11) and dse4
nodes, which reject the version-incompatible properties and never start
(AllNodesFailedException). Forward cassandraFamily to the test JVM.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
All Kafka backfill e2e jobs failed at cassandra.start() with "Timed out
waiting for container port to open (ports [9042,8000,7199])". The base
CassandraContainer exposes the debug port 8000 in addition to CQL/JMX, and
the default wait strategy waits for ALL exposed ports. The agent-based
factories open 8000 via the JVM debug agent (-agentlib:jdwp=...address=8000),
but createCassandraContainer (no agent) never does, so the wait always timed
out -- across c3/c4/dse4 (c3/dse4 hit the earlier cassandraFamily yaml issue
first, masking this).

Wait on the "Starting listening for CQL clients" log line instead (emitted by
Cassandra 3.x/4.x and DSE) so node readiness no longer depends on port 8000.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ent node

The CQL-readiness log wait fixed c3/dse4 but the c4 Kafka backfill jobs still
timed out: the c4 config-override logback.xml has its STDOUT (and all file)
root appender-refs commented out, so Cassandra 4.0.4 writes nothing to the
container stdout and the log-message wait never matches. (The Pulsar c4 jobs
don't hit this because they wait on the debug port the agent opens.)

Replace the log-message wait with a host-side TCP probe of the CQL port
(9042). This targets only the CQL port -- so it ignores the always-exposed-
but-unused debug port (8000) -- and does not depend on Cassandra logging to
stdout, so it works uniformly across c3/c4/dse4 regardless of the mounted
logback configuration.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…to stdout

The host-side CQL-port probe regressed all 9 Kafka backfill jobs (c3/dse4 had
passed with the log wait). A host-side TCP connect to a mapped port is
unreliable: Docker's port forwarding can accept the connection before
Cassandra has bound 9042, so the wait returns prematurely and getCqlSession()
then fails -- across every family.

Revert to the proven "Starting listening for CQL clients" log wait, and fix
the actual c4 outlier: its config-override logback.xml had every root
appender-ref (including STDOUT) commented out, so Cassandra 4.0.4 logged
nothing to the container stdout and the log wait never matched. Re-enable the
STDOUT appender so c4 emits the readiness line like c3/dse4 already do.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…whole-number doubles

Two pre-existing failures in the connector PulsarCassandraSourceTests.testSchema
(across Avro and JSON variants), surfaced by running the suite on jdk 17 as
well as 11. Neither is caused by the Kafka work; they stem from the
messaging-abstraction migration and were partially addressed before (see
51f872b, which made assertMapsEqual tolerant of shaded/non-shaded Avro Utf8).

1) Avro: java.lang.ClassCastException casting org.apache.avro.generic.GenericData$Array
   to the Pulsar-shaded GenericData$Array. Pulsar's GenericRecord.getField()
   returns collection (array) columns as NON-shaded Avro objects even though
   nested records come back shaded, so the shaded-typed assertions throw.
   Fix: normalizeToShadedAvro() round-trips any non-shaded Avro GenericContainer
   through binary Avro into the shaded type in genericRecordToMap(), deeply
   converting arrays and everything nested (records, maps, strings, CQL
   logical-type records) so the existing shaded assertions work unchanged. The
   writer uses GenericData.get() (no registered conversions) so raw datums are
   written verbatim. Also make the map-of-tuple assertion look up its key by
   string and normalize the tuple value.

2) JSON: "double expected:<1.0> but was:<1>". A whole-number double/float is
   serialized to JSON as `1`, which Jackson reads as an IntNode, so a
   type-sensitive equals against the expected Double fails. Compare double/float
   numerically (via asDouble) instead.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…haded Avro

Follow-up to the testSchema Avro tolerance fix. After normalizing non-shaded
Avro arrays to shaded, testSchema failed one layer deeper at assertGenericMap:

  ClassCastException: org.apache.avro.util.Utf8 cannot be cast to
  org.apache.pulsar.shade.org.apache.avro.util.Utf8

A map column (e.g. map<text,double>) comes back from Pulsar as a plain
java.util.Map whose keys are non-shaded Avro Utf8. The parameter type
Map<Utf8,Object> (shaded Utf8) made the compiler insert a checkcast to the
shaded Utf8 on getKey(), which threw even though keys are compared via
toString(). Type the parameter as Map<Object,Object> (as assertMapsEqual
already does) so no checkcast is inserted.

Top-level maps (ymap, ymapoftuple) are the only ones needing this -- maps
nested inside arrays (e.g. zmap inside ysetofudt) are already converted to
shaded by the binary round-trip in normalizeToShadedAvro.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… shaded UDT

Next layer of the testSchema shaded/non-shaded Avro tolerance. After the
top-level array/map fixes, testSchema failed at:

  assertField:857 (cast to shaded GenericData$Array)
   <- assertGenericRecords:903 (udt field)
   <- assertField:853 (yudt)

Pulsar returns a top-level UDT (yudt) as a SHADED GenericRecord, but the
collection fields nested inside it (the UDT's zlist/zset) still come back as
NON-shaded Avro arrays. normalizeToShadedAvro is a no-op on the already-shaded
record, so those nested arrays were never converted and the shaded-typed
assertion threw.

Wrap the per-field extraction in assertGenericRecords' udt/udtoptional cases
with normalizeToShadedAvro so each nested value is converted before asserting.
(Collections nested inside ysetofudt are already handled: that whole array is
deep-converted to shaded by the binary round-trip.)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@msmygit msmygit marked this pull request as ready for review June 1, 2026 14:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant