- Check out this repo.
- Run
docker compose up -d - The run this CURL https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-required-configuration-properties Above link shows all config properties.
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "debezium-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgresql",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "test",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.autocreate.mode": "all_tables",
"publication.name": "dbz_publication",
"database.history.kafka.bootstrap.servers": "kafka1:9092",
"database.history.kafka.topic": "schema-changes.test",
"topic.prefix": "test",
"replica.identity.autoset.values": "*:FULL",
"tombstones.on.delete": "true",
"skipped.operations": "none",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}'
Few things to note:
- Debezium needs a user with
replication_role - It also needs ownership of the table which you want to watch.
Thus, I am using
postgresuser which us a super user in the bitnami images.
- An
employeeshould be automatically created when to bring up docker. - Run following cURL to check the status, it should be in running state.
curl -X GET http://localhost:8083/connectors/debezium-postgres-connector/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 183 100 183 0 0 16456 0 --:--:-- --:--:-- --:--:-- 16636
{
"name": "debezium-postgres-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.19.0.5:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.19.0.5:8083"
}
],
"type": "source"
}
- Do a insert like
insert into employee (id, name, email) values (1, 'vivek', 'foo@bar.com'); - Goto
http://localhost:8080/ui/clusters/debz_connector_cluster/all-topics/there should be a topic with nametest.public.employee - The message should be sent to the topic.
- Start
com.github.vivekkothari.cdc.DebeziumConsumerto start consumer. - Now start making some inserts/update/deletes to employee table.
- You should see logs like,
025-03-23 20:17:56 [main] INFO o.a.k.c.c.internals.ConsumerUtils - Setting offset for partition test.public.employee-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 2 rack: null isFenced: false)], epoch=0}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"}],"optional":true,"name":"test.public.employee.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"}],"optional":true,"name":"test.public.employee.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"test.public.employee.Envelope","version":2},"payload":{"before":null,"after":{"id":2,"name":"kothari","email":"foo1@bar.com"},"source":{"version":"3.0.0.Final","connector":"postgresql","name":"test","ts_ms":1742741274875,"snapshot":"false","db":"test","sequence":"[\"26737640\",\"26738904\"]","ts_us":1742741274875866,"ts_ns":1742741274875866000,"schema":"public","table":"employee","txId":752,"lsn":26738904,"xmin":null},"transaction":null,"op":"c","ts_ms":1742741275319,"ts_us":1742741275319425,"ts_ns":1742741275319425296}}

