Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f04a220
Initial plan for adding Kafka support
Mar 17, 2026
43478f8
Phase 1 - implemented
Mar 17, 2026
b29ffcf
Phase 2: Core Abstraction Layer
Mar 17, 2026
b6c21d2
Phase 3: Pulsar Implementation
Mar 18, 2026
e1dc77a
Phase 4: Kafka Implementation - week 1
Mar 18, 2026
5e3e756
Phase 2: Core Abstraction Layer - Week 2-3 Deliverables. deleted ph 3…
Mar 18, 2026
34a7bab
Phase 3 - week1 - Pulsar Impl
Mar 18, 2026
8f0134e
Phase 3 - weeks2&3 - Pulsar Impl
Mar 18, 2026
c928951
Phase 4 - Kafka implementation initial
Mar 18, 2026
46e0fee
Phase 4 - kafka impl basic docs updates
Mar 18, 2026
fab2ee9
Phase 5: Kafka Integration Tests & CI Workflows - Implementation
Mar 19, 2026
76dc7f6
Remove Made with Bob comment on files
Mar 19, 2026
ebd7fb8
Phase 1: CI Stabilization - Disable Kafka tests and add resource limits
Mar 20, 2026
d0fc511
ci failures
Mar 20, 2026
109ec66
backfill-ci failure fix attempt
Mar 20, 2026
479a92c
attempt to fix backfill-ci
Mar 21, 2026
04387e9
temp comment jdk17 from matrix
Mar 21, 2026
a141964
attempt to remove kafka premature
Mar 21, 2026
09ca19c
backfill cli and ci fix attempt
Mar 21, 2026
c4dae70
bob created mess to fix backfill-ci job attempt 5
Mar 21, 2026
8af6fb8
phase 3 impl + refactor
Apr 3, 2026
486ad86
interim
Apr 3, 2026
650df02
interim fixes;slf4j reverts
Apr 3, 2026
e5f7b9b
interim phase3 fix attempts
Apr 6, 2026
c443425
interim fixes backfill-ci failures
Apr 6, 2026
ce07cb8
interim backfill CI fixes
Apr 6, 2026
92f181f
s
Apr 6, 2026
2ef630f
s
Apr 6, 2026
e6290d8
interim backfill-ci fixes
Apr 6, 2026
9c10954
interim ci fixes
Apr 7, 2026
6ccb7e3
Fix ClassCastException by reverting NativeSchemaWrapper and Cassandra…
Apr 7, 2026
2ed932b
Update BOB_CONTEXT_SUMMARY with ClassCastException fix details
Apr 7, 2026
a1df265
backfill-ci fail fix attempt
Apr 7, 2026
a040a14
Fix connector test failures: Revert to direct Pulsar API usage
Apr 8, 2026
d4f26a6
ci failures fixes
Apr 8, 2026
8ae55a8
CI Failure fix attempt
Apr 15, 2026
72601ad
fix(kafka_support): correct SSL keystore/truststore mapping and resto…
Apr 15, 2026
fefd539
feat(kafka): complete & validate agent-side dual-provider (Pulsar+Kaf…
May 30, 2026
3ecdfb3
feat(kafka): add Kafka Connect source connector (events -> Cassandra …
May 30, 2026
6ca0f20
fix(backfill): repair messaging-abstraction backfill (SPI discovery +…
May 30, 2026
51f872b
fix(connector): make assertMapsEqual tolerant of shaded/non-shaded Av…
May 30, 2026
b18ba5f
fix(backfill): forward Docker API version to the e2eTest task and bac…
May 30, 2026
13fbdd8
fix(backfill): discover dsbulk codec providers in the NAR CLI-extensi…
May 30, 2026
5e85195
ci: expand Pulsar/Kafka test matrices and restore 360m timeout
May 30, 2026
fd2d396
fix(agent): reject invalid messagingProvider and validate provider co…
May 30, 2026
d4289b3
docs(kafka): publish Kafka/Confluent user docs and regenerate agent p…
May 30, 2026
e1ba1a6
feat(backfill): support Kafka as a backfill destination + e2e + CI
May 30, 2026
7b1a248
docs(backfill): document Kafka as a backfill destination
May 30, 2026
02849ca
fix(backfill-ci): pass cassandraFamily to the Kafka e2e so the right …
May 30, 2026
03ae4a0
fix(backfill-ci): wait on CQL readiness for the no-agent Cassandra node
May 30, 2026
7dd7f85
fix(backfill-ci): wait on the CQL port, not a log line, for the no-ag…
May 30, 2026
ebdc30f
fix(backfill-ci): revert to CQL-log wait and make the c4 logback log …
May 30, 2026
e2e67b0
fix(connector): make testSchema tolerant of non-shaded Avro and JSON …
May 31, 2026
c733f0d
fix(connector): type assertGenericMap keys as Object for shaded/non-s…
May 31, 2026
38181be
fix(connector): normalize non-shaded Avro collections nested inside a…
May 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion .github/workflows/backfill-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m" # PHASE 1: Limit JVM memory
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false" # PHASE 1: Limit Gradle memory
run: |
./gradlew -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
backfill-cli:build
Expand Down Expand Up @@ -72,6 +74,8 @@ jobs:
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m" # PHASE 1: Limit JVM memory
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false" # PHASE 1: Limit Gradle memory
run: |
set -e
PREV_IFS=$IFS
Expand All @@ -80,9 +84,58 @@ jobs:
IFS=$PREV_IFS
PULSAR_IMAGE=${PULSAR_FULL_IMAGE[0]}
PULSAR_IMAGE_TAG=${PULSAR_FULL_IMAGE[1]}

./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PtestPulsarImage=$PULSAR_IMAGE \
-PtestPulsarImageTag=$PULSAR_IMAGE_TAG \
-PcassandraFamily=${{ matrix.cassandraFamily }} \
backfill-cli:e2eTest

test-kafka:
needs: build
name: Test Backfill CLI (Kafka)
runs-on: ubuntu-latest
timeout-minutes: 90
strategy:
fail-fast: false
matrix:
jdk: ['11']
kafkaImage: ['confluentinc/cp-kafka:7.8.8', 'confluentinc/cp-kafka:7.9.7', 'confluentinc/cp-kafka:8.1.3']
cassandraFamily: ['c3', 'c4', 'dse4']
steps:
- uses: actions/checkout@v6
- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v5
with:
java-version: ${{ matrix.jdk }}
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.0
with:
version-source: file
version-file: gradle.properties
version-file-extraction-pattern: '(?<=version=).+'

- name: Cache Docker layers
uses: actions/cache@v5
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-

- name: Test with Gradle
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m"
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false"
run: |
set -e
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PkafkaImage=${{ matrix.kafkaImage }} \
-PcassandraFamily=${{ matrix.cassandraFamily }} \
backfill-cli:e2eTestKafka
65 changes: 63 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ jobs:
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
run: |
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
build -x test -x backfill-cli:compileJava
build -x test -x backfill-cli:compileJava -x license -x licenseMain -x licenseTest

test:
needs: build
name: Test
name: Test Pulsar
runs-on: ubuntu-latest
timeout-minutes: 360
strategy:
fail-fast: false
#max-parallel: 10 # PHASE 1: Limit parallel test execution
matrix:
module: ['agent', 'agent-c3', 'agent-c4', 'agent-dse4', 'connector']
jdk: ['11', '17']
Expand Down Expand Up @@ -72,6 +73,8 @@ jobs:
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m" # PHASE 1: Limit JVM memory
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false" # PHASE 1: Limit Gradle memory, disable daemon
run: |
set -e
PREV_IFS=$IFS
Expand All @@ -82,6 +85,64 @@ jobs:
PULSAR_IMAGE_TAG=${PULSAR_FULL_IMAGE[1]}

./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PtestPulsarImage=$PULSAR_IMAGE \
-PtestPulsarImageTag=$PULSAR_IMAGE_TAG \
${{ matrix.module }}:test

test-kafka:
needs: build
name: Test Kafka
runs-on: ubuntu-latest
timeout-minutes: 360
strategy:
fail-fast: false
matrix:
module: ['agent-c3', 'agent-c4', 'agent-dse4', 'connector-kafka']
jdk: ['11', '17']
kafkaImage: ['confluentinc/cp-kafka:7.8.8', 'confluentinc/cp-kafka:7.9.7', 'confluentinc/cp-kafka:8.1.3']
steps:
- uses: actions/checkout@v6
- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v5
with:
java-version: ${{ matrix.jdk }}
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.0
with:
version-source: file
version-file: gradle.properties
version-file-extraction-pattern: '(?<=version=).+'

- name: Cache Docker layers
uses: actions/cache@v5
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-

- name: Test with Gradle (Kafka)
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m"
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false"
run: |
set -e
PREV_IFS=$IFS
IFS=':'
read -ra KAFKA_FULL_IMAGE <<< "${{ matrix.kafkaImage }}"
IFS=$PREV_IFS
KAFKA_IMAGE=${KAFKA_FULL_IMAGE[0]}
KAFKA_IMAGE_TAG=${KAFKA_FULL_IMAGE[1]}

# -PkafkaTests includes the @Tag("kafka") integration tests (excluded by default).
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PkafkaTests \
-PtestKafkaImage=$KAFKA_IMAGE \
-PtestKafkaImageTag=$KAFKA_IMAGE_TAG \
${{ matrix.module }}:test
8 changes: 7 additions & 1 deletion agent-c3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ shadowJar {
manifest {
inheritFrom project.tasks.jar.manifest
}
// Merge service provider files for SPI
mergeServiceFiles()
}

jar.enabled = false
Expand All @@ -37,8 +39,11 @@ assemble.dependsOn(shadowJar)
dependencies {
implementation project(':commons')
implementation project(':agent')
implementation("org.apache.avro:avro:${avroVersion}")
implementation project(':messaging-api')
implementation project(':messaging-pulsar')
implementation project(':messaging-kafka')

implementation("org.apache.avro:avro:${avroVersion}")
implementation("org.apache.pulsar:pulsar-client:${pulsarVersion}")

compileOnly("org.apache.cassandra:cassandra-all:${cassandra3Version}")
Expand Down Expand Up @@ -68,6 +73,7 @@ test {
useJUnitPlatform()

environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra3Version

systemProperty "buildDir", buildDir
Expand Down
4 changes: 3 additions & 1 deletion agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {

static void startCdcAgent(String agentArgs) throws Exception {
log.info("Starting CDC agent, cdc_raw_directory={}", DatabaseDescriptor.getCDCLogLocation());
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
// parameters; the active provider is selected at runtime via 'messagingProvider'.
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);

// With C* 3.11, CL are immutable, we don't need to keep the last sent position.
SegmentOffsetWriter segmentOffsetFileWriter = new SegmentOffsetDummyWriter(config.cdcWorkingDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.UUID;

@Slf4j
public class PulsarMutationSender extends AbstractPulsarMutationSender<CFMetaData> {
public class PulsarMutationSender extends AbstractMessagingMutationSender<CFMetaData> {

private static final ImmutableMap<String, org.apache.avro.Schema> avroNativeTypes = ImmutableMap.<String, org.apache.avro.Schema>builder()
.put(UTF8Type.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))
Expand Down Expand Up @@ -106,6 +106,12 @@ public org.apache.avro.Schema getNativeSchema(String cql3Type) {
*/
@Override
public boolean isSupported(final AbstractMutation<CFMetaData> mutation) {
// Check if metadata is null (table may have been dropped)
if (mutation.metadata == null) {
log.warn("Table metadata is null for mutation key={}, table may have been dropped, skipping mutation", mutation.key());
return false;
}

if (!pkSchemas.containsKey(mutation.key())) {
for (ColumnDefinition cm : mutation.metadata.primaryKeyColumns()) {
if (!avroNativeTypes.containsKey(cm.type.asCQL3Type().toString())) {
Expand Down
6 changes: 6 additions & 0 deletions agent-c4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ shadowJar {
manifest {
inheritFrom project.tasks.jar.manifest
}
// Merge service provider files for SPI
mergeServiceFiles()
}

jar.enabled = true
Expand All @@ -43,6 +45,9 @@ assemble.dependsOn(shadowJar)
dependencies {
implementation project(':commons')
implementation project(':agent')
implementation project(':messaging-api')
implementation project(':messaging-pulsar')
implementation project(':messaging-kafka')

implementation("org.apache.avro:avro:${avroVersion}")
implementation("commons-io:commons-io:${commonsIOVersion}") // Override transitive dependency version to fix vulnerability
Expand Down Expand Up @@ -77,6 +82,7 @@ test {
useJUnitPlatform()

environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra4Version

systemProperty "buildDir", buildDir
Expand Down
4 changes: 3 additions & 1 deletion agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {

static void startCdcAgent(String agentArgs) throws Exception {
log.info("Starting CDC agent, cdc_raw_directory={}", DatabaseDescriptor.getCDCLogLocation());
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
// parameters; the active provider is selected at runtime via 'messagingProvider'.
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);

SegmentOffsetFileWriter segmentOffsetFileWriter = new SegmentOffsetFileWriter(config.cdcWorkingDir);
segmentOffsetFileWriter.loadOffsets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.UUID;

@Slf4j
public class PulsarMutationSender extends AbstractPulsarMutationSender<TableMetadata> {
public class PulsarMutationSender extends AbstractMessagingMutationSender<TableMetadata> {

private static final ImmutableMap<String, org.apache.avro.Schema> avroSchemaTypes = ImmutableMap.<String, org.apache.avro.Schema>builder()
.put(UTF8Type.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))
Expand Down Expand Up @@ -110,6 +110,12 @@ public org.apache.avro.Schema getNativeSchema(String cql3Type) {
*/
@Override
public boolean isSupported(final AbstractMutation<TableMetadata> mutation) {
// Check if metadata is null (table may have been dropped)
if (mutation.metadata == null) {
log.warn("Table metadata is null for mutation key={}, table may have been dropped, skipping mutation", mutation.key());
return false;
}

if (!pkSchemas.containsKey(mutation.key())) {
for (ColumnMetadata cm : mutation.metadata.primaryKeyColumns()) {
if (!avroSchemaTypes.containsKey(cm.type.asCQL3Type().toString())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

import com.datastax.oss.cdc.AgentTestUtil;
import com.datastax.oss.cdc.KafkaSingleNodeTests;
import com.datastax.testcontainers.cassandra.CassandraContainer;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

import java.util.Optional;

@Slf4j
public class KafkaSingleNodeC4Tests extends KafkaSingleNodeTests {

public static final DockerImageName CASSANDRA_IMAGE = DockerImageName.parse(
Optional.ofNullable(System.getenv("CASSANDRA_IMAGE"))
.orElse("cassandra:" + System.getProperty("cassandraVersion"))
).asCompatibleSubstituteFor("cassandra");

public KafkaSingleNodeC4Tests() {
super(AgentTestUtil.Version.C4);
}

@Override
public CassandraContainer<?> createCassandraContainer(int nodeIndex, String kafkaBootstrapServers, Network testNetwork) {
return CassandraContainer.createCassandraContainerWithAgentKafka(
CASSANDRA_IMAGE, testNetwork, nodeIndex, "c4", kafkaBootstrapServers);
}

@Override
public int getSegmentSize() {
return 1024 * 1024;
}
}
18 changes: 15 additions & 3 deletions agent-dse4/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
# DSE CDC agent for Apache Pulsar
# DSE CDC agent for Apache Pulsar and Apache Kafka

## Overview

CDC agent for DataStax Enterprise 4.x with support for both Apache Pulsar and Apache Kafka.

## Build

./gradlew agent-dse4:shadowJar

## Run
## Run with Pulsar (Default)

export JVM_EXTRA_OPTS="-javaagent:agent-dse4/build/libs/agent-dse4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650,cdcWorkingDir=/var/lib/cassandra/cdc"

## Run with Kafka

export JVM_EXTRA_OPTS="-javaagent:agent-dse4/build/libs/agent-dse4-<version>-all.jar=messagingProvider=KAFKA,kafkaBootstrapServers=localhost:9092,cdcWorkingDir=/var/lib/cassandra/cdc"

## Configuration

export JVM_EXTRA_OPTS="-javaagent:agent-dse4/build/libs/agent-dse4-<version>-SNAPSHOT-all.jar=pulsarServiceUrl=pulsar://pulsar:6650,cdcWorkingDir=/var/lib/cassandra/cdc"
See [agent/README.md](../agent/README.md) for full configuration options.


10 changes: 10 additions & 0 deletions agent-dse4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ configurations {
dependencies {
custom project(':commons')
custom project(':agent')
custom project(':messaging-api')
custom project(':messaging-pulsar')
custom project(':messaging-kafka')

implementation project(':commons')
implementation project(':agent')
implementation project(':messaging-api')
implementation project(':messaging-pulsar')
implementation project(':messaging-kafka')

implementation("org.apache.avro:avro:${avroVersion}")
implementation("${pulsarGroup}:pulsar-client:${pulsarVersion}")
Expand Down Expand Up @@ -84,6 +90,10 @@ shadowJar {
inheritFrom project.tasks.jar.manifest
}
configurations = [project.configurations.custom]
// Merge service provider files for SPI
mergeServiceFiles()
// Exclude Netty native libraries; DSE provides its own bundled natives
exclude 'META-INF/native/*'
// relocate AVRO because dse-db depends on avro 1.7.7
relocate 'org.apache.avro', 'com.datastax.oss.cdc.avro'
}
Expand Down
Loading
Loading