Skip to content

OLR loses messages after the Debezium connector restart during batch message replication #324

@JJaneq

Description

@JJaneq

Bug reporter information

Full name
Jan Bejgier

Email or GitHub username
jan.bejgier@goldenore.com

Company or organization
Goldenore Sp. z o.o.


Bug description

When the Debezium connector is stopped while OLR is streaming a batch of messages some messages after the connector resumes are lost.

After receiving CONTINUE request from the connector OLR does not begin streaming data from requested c_scn and c_idx. Instead, it skips a portion of messages and continues from higher values, resulting in data loss.

Step-by-step reproduction instructions

  1. Clone the required repository:

    git clone https://github.com/bersler/OpenLogReplicator-tutorials.git
  2. Create the required images:

    cd OpenLogReplicator-tutorials/images/
    ./oracle_database_21.3.0-xe.sh
    ./bersler_openlogreplicator_tutorial_dev.sh
  3. Start the Oracle database with modified compose.yaml:

    services:
       oracle:
          image: "${DB_IMAGE}"
          container_name: ORA1
          volumes:
             - ./fra:/opt/oracle/fra
             - ./oradata:/opt/oracle/oradata
             - ./sql:/opt/sql
             - ./setup:/opt/oracle/scripts/setup
          networks:
             - isolated_network
          environment:
             - ORACLE_CHARACTERSET=AL32UTF8
             - ORACLE_PWD=oracle
          build:
             context: .
             shm_size: '2gb'
          shm_size: '2gb'
          ports:
             - "1521:1521"
    
       kafka:
          image: "${KAFKA_IMAGE}"
          container_name: KAFKA1
          profiles: ["kafka"]
          volumes:
             - ./kafka:/var/lib/kafka/data
          networks:
             - isolated_network
          ports:
             - "9092:9092"
          environment:
          # KRaft parameters
             - CLUSTER_ID=${KAFKA_CLUSTER}
             - KAFKA_NODE_ID=1
             - KAFKA_PROCESS_ROLES=broker,controller
             - KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS=kafka:9093
          # Listeners
             - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
             - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
             - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
             - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
          # Performance & configuration
             - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
             - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
             - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
             - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
             - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
          healthcheck:
             test: ["CMD", "/kafka/bin/kafka-topics.sh", "--bootstrap-server", "${KAFKA_BROKER}", "--list"]
             interval: 10s
             timeout: 5s
             retries: 5
             start_period: 60s
    
       openlogreplicator:
          image: "${OLR_IMAGE}"
          container_name: "${OLR_CONTAINER}"
          profiles: ["openlogreplicator"]
          depends_on:
             - oracle
             - kafka
          volumes:
             - ./bin:/opt/bin
             - ./checkpoint:/opt/OpenLogReplicator/checkpoint
             - ./fra:/opt/fra
             - ./log:/opt/OpenLogReplicator/log
             - ./oradata:/opt/oradata
             - ./scripts:/opt/OpenLogReplicator/scripts
          restart: "no"
          ports:
             - "50000:50000"
          networks:
             - isolated_network
    
       kafka-connect:
          image: quay.io/debezium/connect:latest
          container_name: KAFKA-CONNECT
          profiles: ["connect"]
          environment:
             BOOTSTRAP_SERVERS: kafka:9092
             GROUP_ID: 1
             CONFIG_STORAGE_TOPIC: connect_configs
             OFFSET_STORAGE_TOPIC: connect_offsets
             STATUS_STORAGE_TOPIC: connect_statuses
          ports:
             - "8083:8083"
          depends_on:
             - oracle
             - kafka
          networks:
             - isolated_network
    
    networks:
       isolated_network:
    cd ../tutorials/Oracle-to-Kafka/
    ./2.db-start.sh
  4. Create directory kafka:

    mkdir kafka
    chmod 777 kafka
  5. Add line to sql/schema-usrolr.sql:

    GRANT SELECT ON SYS.V_$PDBS TO USROLR;
  6. Configure OpenLogReplicator:

    {
       "version": "1.9.0",
       "log-level": 4,
       "source": [
          {
             "alias": "S1",
             "name": "ORA1",
             "reader": {
             "type": "online",
             "path-mapping": ["/opt/oracle/oradata", "/opt/oradata", "/opt/oracle/fra", "/opt/fra"],
             "user": "USROLR",
             "password": "USROLRPWD",
             "server": "//oracle:1521/XEPDB1"
             },
             "format": {
                "type": "json",
                "db": 3,
                "interval-dts": 9,
                "interval-ytm": 4,
                "message": 2,
                "rid": 1,
                "column": 2,
                "schema": 7,
                "scn-type": 5,
                "timestamp-type": 4,
                "flush-buffer": 0
             },
             "filter": {
             "table": [
                {"owner": "USRTBL", "table": "TAB1.*"}
             ]
             }
          }
       ],
       "target": [
          {
             "alias": "T1",
             "source": "S1",
             "writer": {
             "type": "network",
             "uri": "0.0.0.0:50000",
             "queue-size": 1000000
             }
          }
       ]
    }
  7. Modify file and run file 4.olr-start.sh:

    set -e
    
    . cfg.sh
    . ../common/functions.sh
    
    echo "4. creating and starting OpenLogReplicator container"
    olr_files
    db_sql "${DB_CONTAINER}" /opt/sql/schema-usrolr.sql /opt/sql/schema-usrolr.out
    docker_up --profile openlogreplicator --profile kafka --profile connect
    finish
    ./4.olr-start.sh
  8. Create debezium connector:

    curl -X POST http://localhost:8083/connectors/ \
    -H "Content-Type: application/json" \
    -d '{
       "name": "oracle-olr-connector",
       "config": {
          "connector.class": "io.debezium.connector.oracle.OracleConnector",
    
          "database.hostname": "oracle",
          "database.port": "1521",
          "database.user": "sys as sysdba",
          "database.password": "oracle",
          "database.dbname": "XEPDB1",
    
          "database.connection.adapter": "olr",
    
          "openlogreplicator.host": "openlogreplicator",
          "openlogreplicator.port": "50000",
          "openlogreplicator.source": "ORA1",
    
          "topic.prefix": "oracle_olr",
          "snapshot.mode": "no_data",
    
          "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
          "schema.history.internal.kafka.topic": "schema-changes.oracle",
    
          "table.include.list": "USRTBL.TAB1",
          "database.history.store.only.captured.tables.ddl": "true"
       }
    }'
  9. Start:

    CREATE TABLE USRTBL.TAB1(
       ID         NUMBER NOT NULL,
       COUNT      NUMBER
    );
    ALTER TABLE USRTBL.TAB1 ADD CONSTRAINT ID_PK PRIMARY KEY(ID);
    ALTER TABLE USRTBL.TAB1 ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
    
    INSERT INTO USRTBL.TAB1 (ID, COUNT)
    SELECT 
       LEVEL - 1 AS ID,
       MOD(LEVEL, 100) AS COUNT
    FROM DUAL
    CONNECT BY LEVEL <= 500000;
    
    COMMIT;
  10. Stop and resume connector during transaction:

curl -X PUT http://localhost:8083/connectors/oracle-olr-connector/stop
curl -X PUT http://localhost:8083/connectors/oracle-olr-connector/resume
  1. Observe results:
docker exec -it KAFKA1 /kafka/bin/kafka-get-offsets.sh \ 
--bootstrap-server localhost:9092 \
--topic oracle_olr.USRTBL.TAB1 \
--time -1

Actual result:
The offset value may vary, but it is always lower than expected (less than 500000).

oracle_olr.USRTBL.TAB1:0:490545

Expected result:

oracle_olr.USRTBL.TAB1:0:500000

It seems that OLR using the network writer resets the message queue after receiving a CONTINUE request, even if it still contains data that should be sent.


Environment details

  • Official binary signature/version: v1.9.0
  • OpenLogReplicator version: 1.9.0
  • Oracle version and edition: Oracle Database 21c Express Edition Release 21.0.0.0.0 - Production / Oracle Database 19c Standard Edition 2 Release 19.0.0.0.0 - Production
  • Operating system: Debian 13.0 / Red Hat 9.7
  • Docker version: 29.2.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions