feat - kafka support#243
Open
msmygit wants to merge 55 commits into
Open
Conversation
f28ff47 to
1dad8c4
Compare
2bfd5c2 to
2a04130
Compare
added 25 commits
May 29, 2026 15:10
…Source to original implementation - Reverted NativeSchemaWrapper.encode() to simple pass-through (return bytes) - Reverted CassandraSource JsonValueRecord.getValue() and getKey() to original cast - Root cause: Cannot handle GenericRecord in encode() due to method signature forcing byte[] cast - The original simple implementation works correctly with Pulsar's internal handling
Root Cause: - Phase 3 refactoring created separate MessagingClient instead of using SourceContext's PulsarClient - Connector subscribed to wrong Pulsar instance, never received CDC events from agent - All 24 connector tests failed across 3 Pulsar images Fix: - Reverted connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java to working version - Restored direct Pulsar API usage via sourceContext.newConsumerBuilder() - Removed messaging abstraction initialization methods - Connector remains Pulsar-specific (runs in Pulsar IO framework) Impact: - Fixes all 24 test failures in connector module - Code compiles successfully - Agent messaging abstraction still supports both Pulsar and Kafka Documentation: - Added docs/CONNECTOR_ARCHITECTURE_DECISION.md - Updated docs/BOB_CONTEXT_SUMMARY.md
…re infinite producer timeout
…ka) CDC Make the CDC agent publish change events to either Pulsar or Kafka/Confluent via the messaging abstraction, with configurable serialization. Pulsar wire format is unchanged; the active provider is selected at runtime. Serialization - commons: add MutationValueCodec, a canonical registry-less Avro codec for MutationValue shared by producer and (future) consumer. - messaging-kafka: introduce a KafkaSerde strategy (RawAvroSerde registry-less default + RegistryAvroSerde Confluent), replacing the registry-mandatory KafkaSchemaProvider. Wire producer/consumer/client to it. - agent: AbstractMessagingMutationSender branches by provider — Pulsar keeps its exact schema/format; Kafka uses raw Avro (default) or Confluent registry when kafkaSchemaRegistryUrl is set. Bug fixes (also affecting CI) - Agents passed Platform.PULSAR, rejecting all Kafka params; pass Platform.ALL and treat ALL as a wildcard in AgentConfig.configure. - ProducerConfigBuilder.sendTimeoutMs(0) was rejected though 0 = infinite (Pulsar-compatible); allow 0, reject only negative. - CassandraContainer.createCassandraContainerWithAgentKafka now sets messagingProvider=kafka. - Bump Testcontainers 1.19.1 -> 1.20.6 and force the docker-java API version via the api.version system property (newer Docker engines reject the 1.32 default); CI passes -Papi.version=1.43. Tests & CI - Add unit tests for the messaging modules (codec, raw-avro serde, serde selection, SPI discovery, producer config validation). - Add KafkaSingleNodeC4Tests (Testcontainers cp-kafka), tagged @tag("kafka"); excluded from default runs, included via -PkafkaTests. - Re-enable the test-kafka CI job; add -Papi.version to the test jobs. Docs - docs/KAFKA_SUPPORT.md describing agent Kafka config and serialization modes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…-> data) New connector-kafka module: a Kafka Connect SinkConnector (com.datastax.oss.kafka.sink.CassandraSinkConnector) that mirrors the Pulsar CassandraSource for Kafka. It consumes the events-<ks>.<table> topic produced by the CDC agent, de-duplicates mutations, queries Cassandra for the current row, and publishes the row to data-<ks>.<table>. Maximal reuse of the existing connector module: CassandraClient, MutationCache, CassandraSourceConnectorConfig and the NativeAvroConverter/NativeJsonConverter (whose toConnectData(Row) already returns raw AVRO/JSON bytes, provider-agnostic). The events key (raw AVRO primary key) is decoded via the key converter's native schema; the data record key reuses the event key bytes and the value is the AVRO/JSON row (or null tombstone for a delete). Registry-less serialization, matching the agent default. - CassandraSinkConfig delegates Cassandra/cache/output settings to CassandraSourceConnectorConfig and injects a default events.topic. - CassandraKafkaSinkE2ETest validates the full pipeline end-to-end (agent -> events -> connector -> data) for both AVRO and JSON output. - CI: test-kafka job matrix extended to ['agent-c4','connector-kafka']. - docs/KAFKA_SUPPORT.md documents deployment, config and known follow-ups. Verified no regressions: Pulsar agent (testSchema) and Pulsar connector (testSinglePk) still pass under the Testcontainers 1.20.6 bump. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
e8b50b8 to
3ecdfb3
Compare
… sender construction) The backfill-ci e2e tests failed across all matrix jobs due to two pre-existing bugs introduced by the messaging-abstraction migration: 1) ProviderRegistry discovered providers via ServiceLoader using only the thread context class loader. In a Pulsar NAR (CLI-extension backfill path) that loader is not the one that bundled the META-INF/services files, so discovery returned "No provider implementation found for: PULSAR. Available providers: []". Fix: load from both the context class loader and the messaging API's own class loader (and the system loader as a last resort). 2) PulsarMutationSenderFactory reflectively instantiated PulsarMutationSender(MessagingClient, boolean) — a constructor that does not exist. The agent senders extend AbstractMessagingMutationSender, which owns its messaging client and is constructed from (AgentConfig, boolean). Fix: translate ImportSettings into an AgentConfig (provider=pulsar, service URL, auth, SSL/TLS, topic prefix) and instantiate via the real (AgentConfig, boolean) constructor. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ro Utf8 testTimestampInCollection failed with ClassCastException: org.apache.avro.util.Utf8 cannot be cast to org.apache.pulsar.shade.org.apache.avro.util.Utf8. The test helper cast collection map keys to Pulsar's shaded Utf8, but depending on deserialization the keys may be plain Avro Utf8. Compare keys via Object::toString to accept either type. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…kfill-ci The custom backfill e2eTest task did not inherit the api.version system-property forwarding configured on the standard `test` task, so the docker-java client used by Testcontainers fell back to API 1.32 and failed on newer Docker engines with "client version 1.32 is too old". Forward -Papi.version to the e2eTest task and pass -Papi.version=1.43 in backfill-ci.yaml. Validated end-to-end locally: backfill-cli e2eTest testBackfillCLISinglePk (apachepulsar/pulsar:2.11.0 standalone path) BUILD SUCCESSFUL — export 100 PKs, Sent mutations=100/Failed=0, connector produced 100 data rows, assertion passed. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…on path The lunastreaming backfill-ci jobs (the Pulsar NAR CLI-extension path) failed testBackfillCLIFullSchema with: CodecNotFoundException: Codec not found for requested operation: [BOOLEAN <-> java.lang.String] table2's primary key includes xboolean, so PulsarImporter builds a dsbulk ConvertingCodecFactory to turn the exported CSV strings back into CQL types. The no-arg ConvertingCodecFactory constructor discovers its ConvertingCodecProviders (dsbulk-codecs-text's StringConvertingCodecProvider handles String<->boolean/int/decimal/...) via ServiceLoader.load(Class), which uses only the thread-context class loader. In a Pulsar NAR that loader is not the NAR loader that bundled the META-INF/services files, so discovery returns no providers and createConvertingCodec falls through to the codec registry, which has no String<->boolean codec. Same SPI/class-loader root cause as the earlier ProviderRegistry and PulsarMutationSenderFactory fixes. Fix: construct the factory with the thread-context class loader temporarily set to PulsarImporter's own loader (the NAR loader that bundles dsbulk), then restore the previous loader. apachepulsar 2.10.3/2.11.0 jobs run backfill as a standalone JAR where the context loader is already correct, so only the LS 2.10_3.4 (NAR) jobs hit this. BackfillUnloadWorkflow uses dsbulk's own codecSettings.createCodecFactory() and the unload phase already succeeds, so it is left untouched. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Rename the Pulsar test job to "Test Pulsar"; add jdk 17 alongside 11. - Restore the 360-minute job timeout (was temporarily reduced to 90m for faster failure detection during debugging). - Kafka job: expand modules to agent-c3/agent-c4/agent-dse4/connector-kafka, add jdk 17, and bump cp-kafka images to 7.8.8 / 7.9.7 / 8.1.3. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…nfig
determineProvider() previously fell back to PULSAR for any unrecognized
messagingProvider value. A typo (e.g. "kafak"), an unsupported name
("confluent"), or even "kafka " with a trailing space would silently run
the agent against Pulsar, surfacing only as confusing downstream
connection failures.
Changes:
- determineProvider() now trims and matches case-insensitively, defaults to
PULSAR only when unset/blank, and throws IllegalArgumentException listing
the supported values ('pulsar'/'kafka') for any other value.
- Add validateProviderConfig(), invoked at construction (fail fast), which
requires kafkaBootstrapServers when provider=kafka, requires an http(s)
kafkaSchemaRegistryUrl when one is supplied, and requires pulsarServiceUrl
when provider=pulsar -- each with an actionable message instead of a late,
cryptic client-side error.
- Add unit tests covering case/whitespace handling, invalid provider
rejection, missing Kafka bootstrap servers, and malformed registry URL.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…arams
The Antora user docs were entirely Pulsar-only; the Kafka/Confluent
capability was documented only in the internal docs/KAFKA_SUPPORT.md, which
is not part of the published site. Users reading the official docs could not
discover Kafka, its parameters, or its serialization/security options.
Changes:
- New published page docs/modules/ROOT/pages/kafka.adoc ("Stream CDC to
Kafka"): provider selection, enabling Kafka on the agent, registry-less
raw Avro vs Confluent Schema Registry modes, producer tuning, SSL/SASL
security, topic naming/data flow, and Kafka Connect sink deployment.
- Wire the page into nav.adoc; cross-link from index.adoc (overview +
supported platforms) and install.adoc.
- stringMappings.adoc: add messagingProvider and all kafka* parameter
-> CDC_* environment-variable mappings.
- Regenerate partials/agentParams.adoc from AgentConfig: now includes
messagingProvider and the kafka* settings, plus the EnvVar column.
- AgentConfig.main() default output path corrected to the partials/
directory (where the include lives) so future regen lands in place.
- antora.yml: add kafka/confluent attributes. KAFKA_SUPPORT.md: note it is
the implementation reference and points at the published page.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The backfill CLI could only seed historical (pre-CDC) rows into Pulsar, so Kafka deployments had no way to back-fill. The export + import engine was already provider-agnostic (TableExporter + PulsarImporter + the messaging abstraction's AbstractMessagingMutationSender, which branches by provider); only the CLI option surface and the AgentConfig mapping were Pulsar-bound. Code: - ImportSettings: add --messaging-provider (default pulsar) and the --kafka-* options (bootstrap servers, schema registry URL, acks, compression, batch size, linger, max in-flight). - PulsarMutationSenderFactory.buildAgentConfig: map the provider and the Kafka fields instead of hardcoding pulsar. Provider/required-field validation is handled by AbstractMessagingMutationSender at construction. - Generalize CassandraToPulsarMigrator log messages. - The pulsar-admin CLI-extension form stays Pulsar-only (there is no Kafka admin host); Kafka backfill runs as the standalone shadow JAR. Tests/CI: - New CassandraContainer.createCassandraContainer (no-agent node) so the backfill source needs neither an agent build nor DSE Maven creds. - BackfillCLIKafkaE2ETest (@tag kafka): runs the backfill JAR with --messaging-provider=kafka against Kafka + Cassandra, asserts the events topic, then runs the Kafka sink in-process and asserts the data topic -- the Kafka counterpart of BackfillCLIE2ETests. - build.gradle: add connector-kafka/kafka-clients/connect-api/testcontainers- kafka test deps; new e2eTestKafka task (includeTags kafka); the existing e2eTest now excludeTags kafka so the Pulsar job skips it. - backfill-ci.yaml: new test-kafka job over kafkaImage x cassandraFamily. - Unit test PulsarMutationSenderFactoryTest covers the Kafka config mapping. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- backfill-cli.adoc: new "Back-fill to Kafka" section with a standalone-JAR example, a Kafka connectivity parameter table (--messaging-provider, --kafka-*), and a note that the pulsar-admin extension form is Pulsar-only. - kafka.adoc: add a "Back-fill historical data to Kafka" section linking to the backfill reference. - KAFKA_SUPPORT.md: document the backfill Kafka path, its shared engine, and the BackfillCLIKafkaE2ETest / test-kafka CI coverage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…cassandra.yaml is mounted The e2eTestKafka task set CASSANDRA_IMAGE and cassandraVersion per family but not the cassandraFamily system property the test reads to choose the config-override resource dir. So BackfillCLIKafkaE2ETest fell back to its default "c4" and mounted the 4.x cassandra.yaml onto the c3 (3.11) and dse4 nodes, which reject the version-incompatible properties and never start (AllNodesFailedException). Forward cassandraFamily to the test JVM. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
All Kafka backfill e2e jobs failed at cassandra.start() with "Timed out waiting for container port to open (ports [9042,8000,7199])". The base CassandraContainer exposes the debug port 8000 in addition to CQL/JMX, and the default wait strategy waits for ALL exposed ports. The agent-based factories open 8000 via the JVM debug agent (-agentlib:jdwp=...address=8000), but createCassandraContainer (no agent) never does, so the wait always timed out -- across c3/c4/dse4 (c3/dse4 hit the earlier cassandraFamily yaml issue first, masking this). Wait on the "Starting listening for CQL clients" log line instead (emitted by Cassandra 3.x/4.x and DSE) so node readiness no longer depends on port 8000. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ent node The CQL-readiness log wait fixed c3/dse4 but the c4 Kafka backfill jobs still timed out: the c4 config-override logback.xml has its STDOUT (and all file) root appender-refs commented out, so Cassandra 4.0.4 writes nothing to the container stdout and the log-message wait never matches. (The Pulsar c4 jobs don't hit this because they wait on the debug port the agent opens.) Replace the log-message wait with a host-side TCP probe of the CQL port (9042). This targets only the CQL port -- so it ignores the always-exposed- but-unused debug port (8000) -- and does not depend on Cassandra logging to stdout, so it works uniformly across c3/c4/dse4 regardless of the mounted logback configuration. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…to stdout The host-side CQL-port probe regressed all 9 Kafka backfill jobs (c3/dse4 had passed with the log wait). A host-side TCP connect to a mapped port is unreliable: Docker's port forwarding can accept the connection before Cassandra has bound 9042, so the wait returns prematurely and getCqlSession() then fails -- across every family. Revert to the proven "Starting listening for CQL clients" log wait, and fix the actual c4 outlier: its config-override logback.xml had every root appender-ref (including STDOUT) commented out, so Cassandra 4.0.4 logged nothing to the container stdout and the log wait never matched. Re-enable the STDOUT appender so c4 emits the readiness line like c3/dse4 already do. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…whole-number doubles Two pre-existing failures in the connector PulsarCassandraSourceTests.testSchema (across Avro and JSON variants), surfaced by running the suite on jdk 17 as well as 11. Neither is caused by the Kafka work; they stem from the messaging-abstraction migration and were partially addressed before (see 51f872b, which made assertMapsEqual tolerant of shaded/non-shaded Avro Utf8). 1) Avro: java.lang.ClassCastException casting org.apache.avro.generic.GenericData$Array to the Pulsar-shaded GenericData$Array. Pulsar's GenericRecord.getField() returns collection (array) columns as NON-shaded Avro objects even though nested records come back shaded, so the shaded-typed assertions throw. Fix: normalizeToShadedAvro() round-trips any non-shaded Avro GenericContainer through binary Avro into the shaded type in genericRecordToMap(), deeply converting arrays and everything nested (records, maps, strings, CQL logical-type records) so the existing shaded assertions work unchanged. The writer uses GenericData.get() (no registered conversions) so raw datums are written verbatim. Also make the map-of-tuple assertion look up its key by string and normalize the tuple value. 2) JSON: "double expected:<1.0> but was:<1>". A whole-number double/float is serialized to JSON as `1`, which Jackson reads as an IntNode, so a type-sensitive equals against the expected Double fails. Compare double/float numerically (via asDouble) instead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…haded Avro Follow-up to the testSchema Avro tolerance fix. After normalizing non-shaded Avro arrays to shaded, testSchema failed one layer deeper at assertGenericMap: ClassCastException: org.apache.avro.util.Utf8 cannot be cast to org.apache.pulsar.shade.org.apache.avro.util.Utf8 A map column (e.g. map<text,double>) comes back from Pulsar as a plain java.util.Map whose keys are non-shaded Avro Utf8. The parameter type Map<Utf8,Object> (shaded Utf8) made the compiler insert a checkcast to the shaded Utf8 on getKey(), which threw even though keys are compared via toString(). Type the parameter as Map<Object,Object> (as assertMapsEqual already does) so no checkcast is inserted. Top-level maps (ymap, ymapoftuple) are the only ones needing this -- maps nested inside arrays (e.g. zmap inside ysetofudt) are already converted to shaded by the binary round-trip in normalizeToShadedAvro. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… shaded UDT Next layer of the testSchema shaded/non-shaded Avro tolerance. After the top-level array/map fixes, testSchema failed at: assertField:857 (cast to shaded GenericData$Array) <- assertGenericRecords:903 (udt field) <- assertField:853 (yudt) Pulsar returns a top-level UDT (yudt) as a SHADED GenericRecord, but the collection fields nested inside it (the UDT's zlist/zset) still come back as NON-shaded Avro arrays. normalizeToShadedAvro is a no-op on the already-shaded record, so those nested arrays were never converted and the shaded-typed assertion threw. Wrap the per-field extraction in assertGenericRecords' udt/udtoptional cases with normalizeToShadedAvro so each nested value is converted before asserting. (Collections nested inside ysetofudt are already handled: that whole array is deep-converted to shaded by the binary round-trip.) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.