5151 checkErr bool
5252 pathBR string
5353 pathDumpling string
54+ pathCDC string
55+ addressCDC string
56+ downstream string
57+
58+ downStreamHost string
59+ downStreamPort string
60+ downStreamUser string
61+ downStreamPassword string
62+ downStreamDB string
5463)
5564
5665func init () {
@@ -67,8 +76,11 @@ func init() {
6776 flag .IntVar (& retryConnCount , "retry-connection-count" , 120 , "The max number to retry to connect to the database." )
6877 flag .BoolVar (& checkErr , "check-error" , false , "if --error ERR does not match, return error instead of just warn" )
6978 flag .BoolVar (& collationDisable , "collation-disable" , false , "run collation related-test with new-collation disabled" )
70- flag .StringVar (& pathBR , "path-br" , "" , "Path of BR" )
71- flag .StringVar (& pathDumpling , "path-dumpling" , "" , "Path of Dumpling" )
79+ flag .StringVar (& pathBR , "path-br" , "" , "Path of BR binary" )
80+ flag .StringVar (& pathDumpling , "path-dumpling" , "" , "Path of Dumpling binary" )
81+ flag .StringVar (& pathCDC , "path-cdc" , "" , "Path of TiCDC binary" )
82+ flag .StringVar (& addressCDC , "address-cdc" , "127.0.0.1:8300" , "Address of Server" )
83+ flag .StringVar (& downstream , "downstream" , "" , "Connection string of downstream TiDB cluster" )
7284}
7385
7486const (
@@ -165,6 +177,12 @@ type tester struct {
165177
166178 // dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE'
167179 dumpAndImport * SourceAndTarget
180+
181+ // replication checkpoint database name
182+ replicationCheckpointDB string
183+
184+ // replication checkpoint ID
185+ replicationCheckpointID int
168186}
169187
170188func newTester (name string ) * tester {
@@ -179,6 +197,8 @@ func newTester(name string) *tester {
179197 t .enableConcurrent = false
180198 t .enableInfo = false
181199
200+ t .replicationCheckpointDB = "checkpoint-" + uuid .NewString ()
201+ t .replicationCheckpointID = 0
182202 return t
183203}
184204
@@ -219,7 +239,7 @@ func isTiDB(db *sql.DB) bool {
219239 return true
220240}
221241
222- func (t * tester ) addConnection (connName , hostName , userName , password , db string ) {
242+ func (t * tester ) addConnection (connName , hostName , port , userName , password , db string ) {
223243 var (
224244 mdb * sql.DB
225245 err error
@@ -285,6 +305,64 @@ func (t *tester) disconnect(connName string) {
285305 t .currConnName = default_connection
286306}
287307
308+ func parseUserInfo (userInfo string ) (string , string , error ) {
309+ colonIndex := strings .Index (userInfo , ":" )
310+ if colonIndex == - 1 {
311+ return "" , "" , fmt .Errorf ("missing password in userinfo" )
312+ }
313+ return userInfo [:colonIndex ], userInfo [colonIndex + 1 :], nil
314+ }
315+
316+ func parseHostPort (hostPort string ) (string , string , error ) {
317+ colonIndex := strings .Index (hostPort , ":" )
318+ if colonIndex == - 1 {
319+ return "" , "" , fmt .Errorf ("missing port in host:port" )
320+ }
321+ return hostPort [:colonIndex ], hostPort [colonIndex + 1 :], nil
322+ }
323+
324+ func parseDownstream (connStr string ) (dbname string , host string , port string , user string , password string ) {
325+ // Splitting into userinfo and network/database parts
326+ parts := strings .SplitN (connStr , "@" , 2 )
327+ if len (parts ) != 2 {
328+ fmt .Println ("Invalid connection string format" )
329+ return
330+ }
331+
332+ // Parsing userinfo
333+ userInfo := parts [0 ]
334+ user , password , err := parseUserInfo (userInfo )
335+ if err != nil {
336+ fmt .Println ("Error parsing userinfo:" , err )
337+ return
338+ }
339+
340+ // Splitting network type and database part
341+ networkAndDB := parts [1 ]
342+ networkTypeIndex := strings .Index (networkAndDB , "(" )
343+ if networkTypeIndex == - 1 {
344+ fmt .Println ("Invalid connection string format: missing network type" )
345+ return
346+ }
347+
348+ // Extracting host, port, and database name
349+ hostPortDB := networkAndDB [networkTypeIndex + 1 :]
350+ hostPortDBParts := strings .SplitN (hostPortDB , ")/" , 2 )
351+ if len (hostPortDBParts ) != 2 {
352+ fmt .Println ("Invalid connection string format" )
353+ return
354+ }
355+
356+ host , port , err = parseHostPort (hostPortDBParts [0 ])
357+ if err != nil {
358+ fmt .Println ("Error parsing host and port:" , err )
359+ return
360+ }
361+
362+ dbname = hostPortDBParts [1 ]
363+ return
364+ }
365+
288366func (t * tester ) preProcess () {
289367 dbName := "test"
290368 mdb , err := OpenDBWithRetry ("mysql" , user + ":" + passwd + "@tcp(" + host + ":" + port + ")/" + dbName + "?time_zone=%27Asia%2FShanghai%27&allowAllFiles=true" + params , retryConnCount )
@@ -313,13 +391,25 @@ func (t *tester) preProcess() {
313391 log .Fatalf ("Executing create db %s err[%v]" , dbName , err )
314392 }
315393 t .mdb = mdb
394+
316395 conn , err := initConn (mdb , user , passwd , host , dbName )
317396 if err != nil {
318397 log .Fatalf ("Open db err %v" , err )
319398 }
320399 t .conn [default_connection ] = conn
321400 t .curr = conn
322401 t .currConnName = default_connection
402+
403+ if downstream != "" {
404+ // create replication checkpoint database
405+ if _ , err := t .mdb .Exec (fmt .Sprintf ("create database if not exists `%s`" , t .replicationCheckpointDB )); err != nil {
406+ log .Fatalf ("Executing create db %s err[%v]" , t .replicationCheckpointDB , err )
407+ }
408+
409+ downStreamDB , downStreamHost , downStreamPort , downStreamUser , downStreamPassword = parseDownstream (downstream )
410+ t .addConnection ("downstream" , downStreamHost , downStreamPort , downStreamUser , downStreamPassword , downStreamDB )
411+ }
412+ t .switchConnection (default_connection )
323413}
324414
325415func (t * tester ) postProcess () {
@@ -329,6 +419,7 @@ func (t *tester) postProcess() {
329419 }
330420 t .mdb .Close ()
331421 }()
422+ t .switchConnection (default_connection )
332423 if ! reserveSchema {
333424 rows , err := t .mdb .Query ("show databases" )
334425 if err != nil {
@@ -384,6 +475,11 @@ func generateBRStatements(source, target string) (string, string) {
384475}
385476
386477func (t * tester ) dumpTable (source string ) (string , error ) {
478+ // Check if the file exists
479+ if _ , err := os .Stat (pathDumpling ); os .IsNotExist (err ) {
480+ return "" , errors .New (fmt .Sprintf ("path-dumpling [%s] does not exist." , pathDumpling ))
481+ }
482+
387483 log .Warnf ("Start dumping table: %s" , source )
388484 path := "/tmp/" + source + "_" + uuid .NewString ()
389485 cmdArgs := []string {
@@ -392,6 +488,8 @@ func (t *tester) dumpTable(source string) (string, error) {
392488 fmt .Sprintf ("-u%s" , user ),
393489 fmt .Sprintf ("-T%s.%s" , t .name , source ),
394490 fmt .Sprintf ("-o%s" , path ),
491+ "--output-filename-template" ,
492+ "tempDump" ,
395493 "--no-header" ,
396494 "--filetype" ,
397495 "csv" ,
@@ -405,9 +503,7 @@ func (t *tester) dumpTable(source string) (string, error) {
405503
406504 output , err := cmd .CombinedOutput ()
407505 if err != nil {
408- log .Warnf ("Failed executing commands: %s, output: %s)" ,
409- cmd .String (), string (output ))
410- return "" , err
506+ return "" , errors .Annotate (err , fmt .Sprintf ("Dumpling failed: %s, output: %s." , cmd .String (), string (output )))
411507 }
412508 log .Warnf ("Done executing commands: %s, output: %s)" ,
413509 cmd .String (), string (output ))
@@ -417,10 +513,57 @@ func (t *tester) dumpTable(source string) (string, error) {
417513func (t * tester ) importTableStmt (path , target string ) string {
418514 return fmt .Sprintf (`
419515 IMPORT INTO %s
420- FROM '%s/example.t.000000000 .csv'
516+ FROM '%s/tempDump .csv'
421517 ` , target , path )
422518}
423519
520+ func (t * tester ) startReplication (tables string ) error {
521+ return nil
522+ }
523+
524+ func (t * tester ) waitForReplicationCheckpoint () error {
525+ curr := t .currConnName
526+ defer t .switchConnection (curr )
527+
528+ if err := t .executeStmt (fmt .Sprintf ("use `%s`" , t .replicationCheckpointDB )); err != nil {
529+ return err
530+ }
531+
532+ markerTable := fmt .Sprintf ("marker_%d" , t .replicationCheckpointID )
533+ if err := t .executeStmt (fmt .Sprintf ("create table `%s`.`%s` (id int primary key)" , t .replicationCheckpointDB , markerTable )); err != nil {
534+ return err
535+ }
536+
537+ t .switchConnection ("downstream" )
538+
539+ checkInterval := 1 * time .Second
540+ queryTimeout := 10 * time .Second
541+
542+ // Keep querying until the table is found
543+ for {
544+ ctx , cancel := context .WithTimeout (context .Background (), queryTimeout )
545+ defer cancel ()
546+
547+ query := fmt .Sprintf ("select * from information_schema.tables where table_schema = '%s' and table_name = '%s';" , t .replicationCheckpointDB , markerTable )
548+ rows , err := t .mdb .QueryContext (ctx , query )
549+ if err != nil {
550+ log .Printf ("Error checking for table: %v" , err )
551+ return err
552+ }
553+
554+ if rows .Next () {
555+ fmt .Printf ("Table '%s' found!\n " , markerTable )
556+ break
557+ } else {
558+ fmt .Printf ("Table '%s' not found. Retrying in %v...\n " , markerTable , checkInterval )
559+ }
560+
561+ time .Sleep (checkInterval )
562+ }
563+
564+ return nil
565+ }
566+
424567func (t * tester ) Run () error {
425568 t .preProcess ()
426569 defer t .postProcess ()
@@ -543,7 +686,7 @@ func (t *tester) Run() error {
543686 for i := 0 ; i < 4 ; i ++ {
544687 args = append (args , "" )
545688 }
546- t .addConnection (args [0 ], args [1 ], args [2 ], args [3 ], args [4 ])
689+ t .addConnection (args [0 ], args [1 ], port , args [2 ], args [3 ], args [4 ])
547690 case Q_CONNECTION :
548691 q .Query = strings .TrimSpace (q .Query )
549692 if q .Query [len (q .Query )- 1 ] == ';' {
@@ -593,16 +736,17 @@ func (t *tester) Run() error {
593736 }
594737 t .replaceRegex = regex
595738 case Q_BACKUP_AND_RESTORE :
739+ if ! isTiDB (t .mdb ) {
740+ return errors .New (fmt .Sprintf ("backup_and_restore is only supported on TiDB, line: %d sql:%v" , q .Line , q .Query ))
741+ }
596742 t .backupAndRestore , err = parseSourceAndTarget (q .Query )
597743 if err != nil {
598744 return errors .Annotate (err , fmt .Sprintf ("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v" , q .Line , q .Query ))
599745 }
600746 backupStmt , restoreStmt := generateBRStatements (t .backupAndRestore .sourceTable , t .backupAndRestore .targetTable )
601- log .WithFields (log.Fields {"stmt" : backupStmt , "line" : q .Line }).Warn ("Backup started" )
602747 if err := t .executeStmt (backupStmt ); err != nil {
603748 return err
604749 }
605- log .WithFields (log.Fields {"stmt" : backupStmt , "line" : q .Line }).Warn ("Backup end" )
606750 tempTable := t .backupAndRestore .sourceTable + uuid .NewString ()
607751 renameStmt := fmt .Sprintf ("RENAME TABLE `%s` TO `%s`" , t .backupAndRestore .sourceTable , tempTable )
608752 if err := t .executeStmt (renameStmt ); err != nil {
@@ -612,11 +756,9 @@ func (t *tester) Run() error {
612756 if err := t .executeStmt (dupTableStmt ); err != nil {
613757 return err
614758 }
615- log .WithFields (log.Fields {"stmt" : restoreStmt , "line" : q .Line }).Warn ("Restore start" )
616759 if err := t .executeStmt (restoreStmt ); err != nil {
617760 return err
618761 }
619- log .WithFields (log.Fields {"stmt" : restoreStmt , "line" : q .Line }).Warn ("Restore end" )
620762 renameStmt = fmt .Sprintf ("RENAME TABLE `%s` TO `%s`" , t .backupAndRestore .sourceTable , t .backupAndRestore .targetTable )
621763 if err := t .executeStmt (renameStmt ); err != nil {
622764 return err
@@ -626,6 +768,9 @@ func (t *tester) Run() error {
626768 return err
627769 }
628770 case Q_DUMP_AND_IMPORT :
771+ if ! isTiDB (t .mdb ) {
772+ return errors .New (fmt .Sprintf ("dump_and_import is only supported on TiDB, line: %d sql:%v" , q .Line , q .Query ))
773+ }
629774 t .dumpAndImport , err = parseSourceAndTarget (q .Query )
630775 if err != nil {
631776 return err
@@ -634,19 +779,28 @@ func (t *tester) Run() error {
634779 if err != nil {
635780 return err
636781 }
637-
638- dupTableStmt := fmt .Sprintf ("CREATE TABLE `%s` LIKE `%s`" , t .dumpAndImport .targetTable , t .backupAndRestore .sourceTable )
782+ dupTableStmt := fmt .Sprintf ("CREATE TABLE `%s` LIKE `%s`" , t .dumpAndImport .targetTable , t .dumpAndImport .sourceTable )
639783 if err := t .executeStmt (dupTableStmt ); err != nil {
640784 return err
641785 }
642-
643786 importStmt := t .importTableStmt (path , t .dumpAndImport .targetTable )
644- log .WithFields (log.Fields {"stmt" : importStmt , "line" : q .Line }).Warn ("Import start" )
645787 if err = t .executeStmt (importStmt ); err != nil {
646788 return err
647789 }
648- log .WithFields (log.Fields {"stmt" : importStmt , "line" : q .Line }).Warn ("Restore end" )
649-
790+ case Q_REPLICATION :
791+ if ! isTiDB (t .mdb ) {
792+ return errors .New (fmt .Sprintf ("replication is only supported on TiDB, line: %d sql:%v" , q .Line , q .Query ))
793+ }
794+ if err := t .startReplication (q .Query ); err != nil {
795+ return err
796+ }
797+ case Q_REPLICATION_CHECKPOINT :
798+ if ! isTiDB (t .mdb ) {
799+ return errors .New (fmt .Sprintf ("replication_checkpoint is only supported on TiDB, line: %d sql:%v" , q .Line , q .Query ))
800+ }
801+ if err := t .waitForReplicationCheckpoint (); err != nil {
802+ return err
803+ }
650804 default :
651805 log .WithFields (log.Fields {"command" : q .firstWord , "arguments" : q .Query , "line" : q .Line }).Warn ("command not implemented" )
652806 }
@@ -663,7 +817,6 @@ func (t *tester) Run() error {
663817 if xmlPath != "" {
664818 t .addSuccess (& testSuite , & startTime , testCnt )
665819 }
666-
667820 return t .flushResult ()
668821}
669822
0 commit comments