Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connmysql

import (
"bytes"
"cmp"
"context"
"crypto/tls"
Expand Down Expand Up @@ -44,6 +45,10 @@ func (c *MySqlConnector) binlogStalenessThreshold() time.Duration {
return binlogStalenessMultiplier * c.binlogHeartbeatPeriod
}

func trimQueryEventSQL(query []byte) string {
return shared.UnsafeFastReadOnlyBytesToString(bytes.TrimRight(query, "\x00"))
}

func (c *MySqlConnector) GetTableSchema(
ctx context.Context,
env map[string]string,
Expand Down Expand Up @@ -569,20 +574,39 @@ func (c *MySqlConnector) PullRecords(
if mysqlParser == nil {
mysqlParser = parser.New()
}
stmts, warns, err := mysqlParser.ParseSQL(shared.UnsafeFastReadOnlyBytesToString(ev.Query))
stmts, warns, err := mysqlParser.ParseSQL(trimQueryEventSQL(ev.Query))
if err != nil {
c.logger.Warn("failed to parse QueryEvent", slog.String("query", string(ev.Query)), slog.Any("error", err))
break
}
if len(warns) > 0 {
c.logger.Warn("processing QueryEvent with logged warnings", slog.Any("warns", warns))
}
advanceTransactionCheckpoint := func() {
if gset != nil {
gset = ev.GSet
updatedOffset = gset.String()
req.RecordStream.UpdateLatestCheckpointText(updatedOffset)
} else if event.Header.LogPos > pos.Pos {
pos.Pos = event.Header.LogPos
updatedOffset = posToOffsetText(pos)
req.RecordStream.UpdateLatestCheckpointText(updatedOffset)
}
inTx = false
}
for _, stmt := range stmts {
if alterTableStmt, ok := stmt.(*ast.AlterTableStmt); ok {
switch stmt := stmt.(type) {
case *ast.AlterTableStmt:
if err := c.processAlterTableQuery(
ctx, catalogPool, req, alterTableStmt, string(ev.Schema), binlogRowMetadataSupported, req.InternalVersion); err != nil {
ctx, catalogPool, req, stmt, string(ev.Schema), binlogRowMetadataSupported, req.InternalVersion); err != nil {
return fmt.Errorf("failed to process ALTER TABLE query: %w", err)
}
case *ast.CommitStmt:
advanceTransactionCheckpoint()
case *ast.RollbackStmt:
if stmt.SavepointName == "" {
advanceTransactionCheckpoint()
}
}
}
case *replication.RowsEvent:
Expand Down
39 changes: 39 additions & 0 deletions flow/connectors/mysql/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"testing"

"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -50,6 +52,43 @@ func createTestDB(t *testing.T, ctx context.Context, c *MySqlConnector, dbName s
})
}

func TestTrimQueryEventSQLParsesTrailingNull(t *testing.T) {
for _, tc := range []struct {
name string
query []byte
assert func(*testing.T, ast.StmtNode)
}{
{
name: "alter table",
query: []byte("ALTER TABLE t ADD COLUMN c INT\x00"),
assert: func(t *testing.T, stmt ast.StmtNode) {
t.Helper()
require.IsType(t, &ast.AlterTableStmt{}, stmt)
},
},
{
name: "commit",
query: []byte("COMMIT\x00"),
assert: func(t *testing.T, stmt ast.StmtNode) {
t.Helper()
require.IsType(t, &ast.CommitStmt{}, stmt)
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mysqlParser := parser.New()
_, _, err := mysqlParser.ParseSQL(string(tc.query))
require.Error(t, err)

stmts, warns, err := mysqlParser.ParseSQL(trimQueryEventSQL(tc.query))
require.NoError(t, err)
require.Empty(t, warns)
require.Len(t, stmts, 1)
tc.assert(t, stmts[0])
})
}
}

func TestGetTableSchemaCaseSensitiveIdentifiers(t *testing.T) {
t.Parallel()
ctx := t.Context()
Expand Down
42 changes: 42 additions & 0 deletions flow/e2e/clickhouse_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,48 @@ func (s ClickHouseSuite) Test_MySQL_Blobs() {
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_MySQL_NonTransactionalEngineCDC() {
if _, ok := s.source.(*MySqlSource); !ok || internal.MySQLTestVersionIsMaria() {
s.t.Skip("only applies to mysql")
}

srcTableName := "test_non_transactional_engine"
srcFullName := s.attachSchemaSuffix(srcTableName)
dstTableName := "test_non_transactional_engine_dst"

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY,
val TEXT NOT NULL
) ENGINE=MyISAM
`, srcFullName)))

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (id,val) VALUES (1,'snapshot')`, srcFullName)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix(srcTableName),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

EnvWaitForEqualTablesWithNames(env, s, "snapshot", srcTableName, dstTableName, "id,val")

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (id,val) VALUES (2,'cdc')`, srcFullName)))

EnvWaitForEqualTablesWithNames(env, s, "cdc", srcTableName, dstTableName, "id,val")

env.Cancel(s.t.Context())
RequireEnvCanceled(s.t, env)
}

// Test_MySQL_Binary_Trailing_Zeros reproduces trailing-0x00 truncation of fixed-length
// BINARY(N) columns over CDC.
//
Expand Down
Loading