Skip to content
10 changes: 10 additions & 0 deletions flow/alerting/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ var (
ErrorNotifyBinlogEventExceededMaxAllowedPacket = ErrorClass{
Class: "NOTIFY_BINLOG_EVENT_EXCEEDED_MAX_ALLOWED_PACKET", action: NotifyUser,
}
ErrorNotifyBinlogPartialRowEventUnsupported = ErrorClass{
Class: "NOTIFY_BINLOG_PARTIAL_ROW_EVENT_UNSUPPORTED", action: NotifyUser,
}
ErrorNotifyBinlogRowMetadataInvalid = ErrorClass{
Class: "NOTIFY_BINLOG_ROW_METADATA_INVALID", action: NotifyUser,
}
Expand Down Expand Up @@ -1154,6 +1157,13 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
}
}

if _, ok := errors.AsType[*exceptions.MySQLUnsupportedPartialRowEventError](err); ok {
return ErrorNotifyBinlogPartialRowEventUnsupported, ErrorInfo{
Source: ErrorSourceMySQL,
Code: "UNSUPPORTED_PARTIAL_ROW_EVENT",
}
}

if mysqlGeometryParseError, ok := errors.AsType[*exceptions.MySQLGeometryParseError](err); ok &&
strings.Contains(mysqlGeometryParseError.Error(), mysqlGeometryLinearRingNotClosedError) {
return ErrorUnsupportedDatatype, ErrorInfo{
Expand Down
12 changes: 12 additions & 0 deletions flow/alerting/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,3 +1243,15 @@ func TestMySQLBinlogIncidentErrorShouldBeNotifyBinlogInvalid(t *testing.T) {
Code: "BINLOG_INCIDENT",
}, errInfo)
}

func TestMySQLUnsupportedPartialRowEventShouldBeNotifyPartialRowEventUnsupported(t *testing.T) {
t.Parallel()

err := exceptions.NewMySQLUnsupportedPartialRowEventError(172, "e2e_test", "partial_rows")
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("pulling records failed: %w", err))
assert.Equal(t, ErrorNotifyBinlogPartialRowEventUnsupported, errorClass)
assert.Equal(t, ErrorInfo{
Source: ErrorSourceMySQL,
Code: "UNSUPPORTED_PARTIAL_ROW_EVENT",
}, errInfo)
}
85 changes: 83 additions & 2 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"log/slog"
"math/rand/v2"
"slices"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -45,6 +46,46 @@ const (
binlogStalenessMultiplier = 3
)

const mariadbPartialRowDataEvent replication.EventType = 172

const (
partialRowsHeaderLen = 4 + 4 + 1
partialRowFlagOrigEventSize = 0x01
binlogCommonHeaderLen = 19
rowsEventTableIDSize = 6
)

// parsePartialRowEventTableID extracts the source table id from a MariaDB PARTIAL_ROW_DATA_EVENT
//
// [0:4] total_fragments (uint32 LE)
// [4:8] seq_no (uint32 LE, 1-based)
// [8] flags (bit 0 = FL_ORIG_EVENT_SIZE)
// [9:17] original size (present only when FL_ORIG_EVENT_SIZE is set, i.e. on the first fragment)
// then content (first fragment: embedded rows event = 19-byte header + post-header)
//
// The embedded rows-event post-header begins with a 6-byte little-endian table id.
func parsePartialRowEventTableID(data []byte) (uint64, uint32, bool) {
if len(data) < partialRowsHeaderLen {
return 0, 0, false
}
totalFragments := binary.LittleEndian.Uint32(data[0:4])
seqNo := binary.LittleEndian.Uint32(data[4:8])
flags := data[8]
if seqNo != 1 {
// continuation fragment: carries only raw row data, no embedded header, so no table id
return 0, totalFragments, false
}
contentStart := partialRowsHeaderLen
if flags&partialRowFlagOrigEventSize != 0 {
contentStart += 8
}
tableIDStart := contentStart + binlogCommonHeaderLen
if len(data) < tableIDStart+rowsEventTableIDSize {
return 0, totalFragments, false
}
return mysql.FixedLengthInt(data[tableIDStart : tableIDStart+rowsEventTableIDSize]), totalFragments, true
}

const (
queryStatusVarFlags2 = 0
queryStatusVarSQLMode = 1
Expand Down Expand Up @@ -540,6 +581,11 @@ func (c *MySqlConnector) PullRecords(

lastEventAt := time.Now()
var mysqlParser *parser.Parser
// table id -> "schema.table", maintained from TABLE_MAP_EVENTs
tableIdToName := make(map[uint64]string)
// When we ignore an out-of-pipe fragmented rows event, its continuation fragments carry no table
// id; skip this many remaining PARTIAL_ROW_DATA_EVENTs before resolving table ids again.
var partialRowSkipFragments uint32
processEvent := func(event *replication.BinlogEvent) error {
switch ev := event.Event.(type) {
case *replication.GTIDEvent:
Expand All @@ -566,12 +612,45 @@ func (c *MySqlConnector) PullRecords(
c.logger.Info("rotate", slog.String("name", pos.Name), slog.Uint64("pos", uint64(pos.Pos)))
}
case *replication.GenericEvent:
// INCIDENT_EVENT (LOST_EVENTS) - fail and require resync
if event.Header.EventType == replication.INCIDENT_EVENT {
switch event.Header.EventType {
case replication.INCIDENT_EVENT:
// INCIDENT_EVENT (LOST_EVENTS) - fail and require resync
incident, message := parseIncidentEvent(ev.Data)
c.logger.Error("[mysql] received binlog incident event, resync required",
slog.Uint64("incident", uint64(incident)), slog.String("message", message))
return exceptions.NewMySQLBinlogIncidentError(incident, message)
case mariadbPartialRowDataEvent:
if partialRowSkipFragments > 0 {
// continuation fragments of an out-of-pipe group we already decided to ignore
partialRowSkipFragments--
break
}
tableID, totalFragments, ok := parsePartialRowEventTableID(ev.Data)
sourceTableName, known := tableIdToName[tableID]
if !ok || !known {
// couldn't recover/resolve the table, fail loudly
c.logger.Error("[mysql] received unresolvable MariaDB partial row data event, resync required",
slog.Uint64("eventType", uint64(mariadbPartialRowDataEvent)), slog.Bool("parsed", ok))
return exceptions.NewMySQLUnsupportedPartialRowEventError(byte(mariadbPartialRowDataEvent), "", "")
}
if req.TableNameSchemaMapping[req.TableNameMapping[sourceTableName].Name] == nil {
// table not in the pipe - ignore this fragment group and its continuation fragments
if totalFragments > 1 {
partialRowSkipFragments = totalFragments - 1
}
c.logger.Warn("[mysql] ignoring MariaDB partial row data event for table outside the pipe",
slog.String("table", sourceTableName), slog.Uint64("totalFragments", uint64(totalFragments)))
break
}
schemaName, tableName, _ := strings.Cut(sourceTableName, ".")
c.logger.Error("[mysql] received MariaDB partial row data event, resync required",
slog.Uint64("eventType", uint64(mariadbPartialRowDataEvent)), slog.String("table", sourceTableName))
return exceptions.NewMySQLUnsupportedPartialRowEventError(byte(mariadbPartialRowDataEvent), schemaName, tableName)
default:
c.logger.Warn("unknown generic event", slog.Any("type", event.Header.EventType))
otelManager.Metrics.UnsupportedBinlogEventCounter.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
Comment thread
dtunikov marked this conversation as resolved.
attribute.Int(otel_metrics.BinlogEventTypeKey, int(event.Header.EventType)),
)))
}
case *replication.QueryEvent:
if !inTx && gset == nil && event.Header.LogPos > pos.Pos {
Expand Down Expand Up @@ -603,6 +682,8 @@ func (c *MySqlConnector) PullRecords(
c.processRenameTableQuery(ctx, otelManager, req, s, string(ev.Schema))
}
}
case *replication.TableMapEvent:
tableIdToName[ev.TableID] = string(ev.Schema) + "." + string(ev.Table)
case *replication.RowsEvent:
sourceTableName := string(ev.Table.Schema) + "." + string(ev.Table.Table) // TODO this is fragile
destinationTableName := req.TableNameMapping[sourceTableName].Name
Expand Down
33 changes: 33 additions & 0 deletions flow/connectors/mysql/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connmysql

import (
"context"
"encoding/hex"
"fmt"
"log/slog"
"slices"
Expand Down Expand Up @@ -438,3 +439,35 @@ func TestIntegrationGetTableSchemaPrimaryKeyVariants(t *testing.T) {
})
}
}

func TestParsePartialRowEventTableID(t *testing.T) {
t.Parallel()

// Real PARTIAL_ROW_DATA_EVENT payloads captured from MariaDB 12.3
decode := func(s string) []byte {
b, err := hex.DecodeString(s)
require.NoError(t, err)
return b
}
// first fragment: total=9, seq=1, flags=FL_ORIG_EVENT_SIZE, then origSize + embedded rows event
firstFragment := decode("09000000010000000126200000000000002cc0476a" +

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are real payloads captured during local testing, so I think we can keep this unit test in addition to e2e one (would be faster to run locally when we make changes to parsing logic)

"17010000002a20000000000000000012000000000001000203fc0100000000200000")
// a continuation fragment: total=9, seq=2, no flags, raw row data
continuationFragment := decode("09000000020000000079797979797979797979")

tableID, total, ok := parsePartialRowEventTableID(firstFragment)
require.True(t, ok, "first fragment table id should be recoverable")
require.Equal(t, uint64(18), tableID)
require.Equal(t, uint32(9), total)

_, total, ok = parsePartialRowEventTableID(continuationFragment)
require.False(t, ok, "continuation fragment carries no table id")
require.Equal(t, uint32(9), total, "total_fragments is still readable on continuation fragments")

// too short for even the post-header
_, _, ok = parsePartialRowEventTableID([]byte{0x09, 0x00, 0x00})
require.False(t, ok)
// post-header present but content truncated before the embedded table id
_, _, ok = parsePartialRowEventTableID(firstFragment[:partialRowsHeaderLen+8+binlogCommonHeaderLen+2])
require.False(t, ok)
}
136 changes: 84 additions & 52 deletions flow/e2e/clickhouse_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ import (
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/PeerDB-io/peerdb/flow/connectors"
connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
"github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
"github.com/PeerDB-io/peerdb/flow/pkg/common"
mysql_validation "github.com/PeerDB-io/peerdb/flow/pkg/mysql"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/types"
Expand Down Expand Up @@ -1625,57 +1622,11 @@ func (s ClickHouseSuite) Test_MySQL_BinlogIncident() {
s.t.Skip("binlog incident injection requires a MySQL debug build; not available for MariaDB")
}

req := testcontainers.ContainerRequest{
Image: "ghcr.io/peerdb-io/mysql-debug:8.0.46",
Env: map[string]string{
"MYSQL_ROOT_PASSWORD": internal.MySQLTestRootPasswordWithFallback("cipass"),
"MYSQL_ROOT_HOST": "%",
},
// Keep the debug server small for the one-off testcontainers
Cmd: []string{
"mysqld",
"--server-id=1",
"--log-bin=mysql-bin",
"--binlog-format=ROW",
"--innodb-buffer-pool-size=64M",
"--performance-schema=OFF",
"--mysqlx=0",
"--max-connections=20",
"--table-open-cache=64",
"--table-definition-cache=128",
"--innodb-log-buffer-size=8M",
},
ExposedPorts: []string{"3306/tcp"},
WaitingFor: wait.ForListeningPort("3306/tcp").WithStartupTimeout(3 * time.Minute),
}

ctr, err := testcontainers.GenericContainer(s.t.Context(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
testcontainers.CleanupContainer(s.t, ctr, testcontainers.StopTimeout(30*time.Second))
require.NoError(s.t, err)

mapped, err := ctr.MappedPort(s.t.Context(), "3306/tcp")
require.NoError(s.t, err)
port, err := strconv.Atoi(mapped.Port())
require.NoError(s.t, err)

suffix := "mydbginc_" + strings.ToLower(common.RandomString(8))
config := &protos.MySqlConfig{
// host.docker.internal resolves both from the test process (to the published port) and
// from the flow worker container (via host-gateway), unlike the container's own host.
Host: internal.MySQLTestHost(),
Port: uint32(port),
User: "root",
Password: internal.MySQLTestRootPasswordWithFallback("cipass"),
DisableTls: true,
src, suffix := SetupMySQLTestContainerSource(s.t, "mydbginc", MySQLTestContainerConfig{
Image: "ghcr.io/peerdb-io/mysql-debug:8.0.46",
Flavor: protos.MySqlFlavor_MYSQL_MYSQL,
ReplicationMechanism: mySource.Config.ReplicationMechanism,
}
src, err := setupMyConnector(s.t, suffix, config, "mysql_debug_"+suffix)
require.NoError(s.t, err)
s.t.Cleanup(func() { src.Teardown(s.t, context.Background(), suffix) })
})

srcTableName := "incident"
srcFullName := fmt.Sprintf("e2e_test_%s.%s", suffix, srcTableName)
Expand Down Expand Up @@ -1724,6 +1675,87 @@ func (s ClickHouseSuite) Test_MySQL_BinlogIncident() {
RequireEnvCanceled(s.t, env)
}

// Test_MariaDB_PartialRowEvent verifies how the mirror handles a MariaDB PARTIAL_ROW_DATA_EVENT
// (MariaDB 12.3+) - an oversized rows event fragmented across multiple binlog events.
func (s ClickHouseSuite) Test_MariaDB_PartialRowEvent() {
if s.cluster {
s.t.Skip("source-side partial row event coverage does not need to run against ClickHouse cluster")
}
mySource, ok := s.source.(*MySqlSource)
if !ok {
s.t.Skip("only applies to mysql")
}
if mySource.Config.Flavor == protos.MySqlFlavor_MYSQL_MYSQL {
s.t.Skip("only applies to maria flavor")
}

// binlog_row_event_fragment_threshold is pinned to its 1024-byte minimum so a modest row
// overflows a single rows event and MariaDB splits it into PARTIAL_ROW_DATA_EVENT fragments.
src, suffix := SetupMySQLTestContainerSource(s.t, "mdbpart", MySQLTestContainerConfig{
Comment thread
dtunikov marked this conversation as resolved.
Image: "mariadb:12.3",
Flavor: protos.MySqlFlavor_MYSQL_MARIA,
ReplicationMechanism: protos.MySqlReplicationMechanism_MYSQL_GTID,
ExtraServerFlags: []string{
Comment thread
dtunikov marked this conversation as resolved.
"--binlog-row-metadata=FULL",
"--binlog-row-event-fragment-threshold=1024",
},
})

srcTableName := "partial_rows"
srcFullName := fmt.Sprintf("e2e_test_%s.%s", suffix, srcTableName)
dstTableName := "partial_rows_dst"

require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`CREATE TABLE %s (id INT PRIMARY KEY, payload LONGTEXT)`, srcFullName)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: "test_mariadb_partial_row_" + suffix,
TableMappings: []*protos.TableMapping{{
SourceTableIdentifier: srcFullName,
DestinationTableIdentifier: dstTableName,
ShardingKey: "id",
}},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
// The suite source is the shared CI MySQL; point the mirror at our MariaDB source instead.
flowConnConfig.SourceName = src.GeneratePeer(s.t).Name

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

catalogPool, err := internal.GetCatalogConnectionPoolFromEnv(s.t.Context())
require.NoError(s.t, err)

// A fragmented rows event on a table OUTSIDE the pipe must NOT fail the mirror
ignoredFullName := fmt.Sprintf("e2e_test_%s.partial_rows_ignored", suffix)
require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`CREATE TABLE %s (id INT PRIMARY KEY, payload LONGTEXT)`, ignoredFullName)))
require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (id, payload) VALUES (1, REPEAT('y', 8192))`, ignoredFullName)))
require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (id, payload) VALUES (1, 'small')`, srcFullName)))
EnvWaitForCount(env, s, "waiting for in-pipe row past out-of-pipe partial row event",
dstTableName, "id,payload", 1)

// An oversized row larger than the 1024-byte fragment threshold on the mirrored table itself is
// fragmented into PARTIAL_ROW_DATA_EVENTs, which we don't reassemble and must fail loudly on.
require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (id, payload) VALUES (2, REPEAT('x', 8192))`, srcFullName)))
EnvWaitFor(s.t, env, 3*time.Minute, "waiting for partial row event error", func() bool {
count, err := GetLogCount(s.t.Context(), catalogPool, flowConnConfig.FlowJobName, "error", "fragmented oversized row events")
if err != nil {
s.t.Log("Error querying flow_errors:", err)
return false
}
return count > 0
})

env.Cancel(s.t.Context())
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_MySQL_Default_Partition_Key_Parallel_Snapshot() {
if _, ok := s.source.(*MySqlSource); !ok {
s.t.Skip("only applies to mysql")
Expand Down
Loading
Loading