diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index 77a4cbfc6..6afd11b8c 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -1,6 +1,7 @@ package connmysql import ( + "bytes" "cmp" "context" "crypto/tls" @@ -44,6 +45,10 @@ func (c *MySqlConnector) binlogStalenessThreshold() time.Duration { return binlogStalenessMultiplier * c.binlogHeartbeatPeriod } +func trimQueryEventSQL(query []byte) string { + return shared.UnsafeFastReadOnlyBytesToString(bytes.TrimRight(query, "\x00")) +} + func (c *MySqlConnector) GetTableSchema( ctx context.Context, env map[string]string, @@ -569,7 +574,7 @@ func (c *MySqlConnector) PullRecords( if mysqlParser == nil { mysqlParser = parser.New() } - stmts, warns, err := mysqlParser.ParseSQL(shared.UnsafeFastReadOnlyBytesToString(ev.Query)) + stmts, warns, err := mysqlParser.ParseSQL(trimQueryEventSQL(ev.Query)) if err != nil { c.logger.Warn("failed to parse QueryEvent", slog.String("query", string(ev.Query)), slog.Any("error", err)) break @@ -577,12 +582,31 @@ func (c *MySqlConnector) PullRecords( if len(warns) > 0 { c.logger.Warn("processing QueryEvent with logged warnings", slog.Any("warns", warns)) } + advanceTransactionCheckpoint := func() { + if gset != nil { + gset = ev.GSet + updatedOffset = gset.String() + req.RecordStream.UpdateLatestCheckpointText(updatedOffset) + } else if event.Header.LogPos > pos.Pos { + pos.Pos = event.Header.LogPos + updatedOffset = posToOffsetText(pos) + req.RecordStream.UpdateLatestCheckpointText(updatedOffset) + } + inTx = false + } for _, stmt := range stmts { - if alterTableStmt, ok := stmt.(*ast.AlterTableStmt); ok { + switch stmt := stmt.(type) { + case *ast.AlterTableStmt: if err := c.processAlterTableQuery( - ctx, catalogPool, req, alterTableStmt, string(ev.Schema), binlogRowMetadataSupported, req.InternalVersion); err != nil { + ctx, catalogPool, req, stmt, string(ev.Schema), binlogRowMetadataSupported, req.InternalVersion); err != nil { return fmt.Errorf("failed to process ALTER TABLE query: %w", err) } + case *ast.CommitStmt: + advanceTransactionCheckpoint() + case *ast.RollbackStmt: + if stmt.SavepointName == "" { + advanceTransactionCheckpoint() + } } } case *replication.RowsEvent: diff --git a/flow/connectors/mysql/cdc_test.go b/flow/connectors/mysql/cdc_test.go index 3ed471271..7daa2e76b 100644 --- a/flow/connectors/mysql/cdc_test.go +++ b/flow/connectors/mysql/cdc_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -50,6 +52,43 @@ func createTestDB(t *testing.T, ctx context.Context, c *MySqlConnector, dbName s }) } +func TestTrimQueryEventSQLParsesTrailingNull(t *testing.T) { + for _, tc := range []struct { + name string + query []byte + assert func(*testing.T, ast.StmtNode) + }{ + { + name: "alter table", + query: []byte("ALTER TABLE t ADD COLUMN c INT\x00"), + assert: func(t *testing.T, stmt ast.StmtNode) { + t.Helper() + require.IsType(t, &ast.AlterTableStmt{}, stmt) + }, + }, + { + name: "commit", + query: []byte("COMMIT\x00"), + assert: func(t *testing.T, stmt ast.StmtNode) { + t.Helper() + require.IsType(t, &ast.CommitStmt{}, stmt) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mysqlParser := parser.New() + _, _, err := mysqlParser.ParseSQL(string(tc.query)) + require.Error(t, err) + + stmts, warns, err := mysqlParser.ParseSQL(trimQueryEventSQL(tc.query)) + require.NoError(t, err) + require.Empty(t, warns) + require.Len(t, stmts, 1) + tc.assert(t, stmts[0]) + }) + } +} + func TestGetTableSchemaCaseSensitiveIdentifiers(t *testing.T) { t.Parallel() ctx := t.Context() diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 727d50d29..25bbac0b9 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -230,6 +230,48 @@ func (s ClickHouseSuite) Test_MySQL_Blobs() { RequireEnvCanceled(s.t, env) } +func (s ClickHouseSuite) Test_MySQL_NonTransactionalEngineCDC() { + if _, ok := s.source.(*MySqlSource); !ok || internal.MySQLTestVersionIsMaria() { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_non_transactional_engine" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := "test_non_transactional_engine_dst" + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY, + val TEXT NOT NULL + ) ENGINE=MyISAM + `, srcFullName))) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (id,val) VALUES (1,'snapshot')`, srcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(srcTableName), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForEqualTablesWithNames(env, s, "snapshot", srcTableName, dstTableName, "id,val") + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (id,val) VALUES (2,'cdc')`, srcFullName))) + + EnvWaitForEqualTablesWithNames(env, s, "cdc", srcTableName, dstTableName, "id,val") + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + // Test_MySQL_Binary_Trailing_Zeros reproduces trailing-0x00 truncation of fixed-length // BINARY(N) columns over CDC. //