Skip to content

Commit 18939ac

Browse files
committed
*: try leader by label
1 parent 19c80ef commit 18939ac

File tree

4 files changed

+105
-1
lines changed

4 files changed

+105
-1
lines changed

cmd/xenon/main.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,12 @@ func leaderStop() error {
193193
ch <- err
194194
}
195195
log.Info("flushed binary logs:", stmt)
196+
// Step 8: Unlock tables
197+
stmt, err = UnlockTables(conn)
198+
if err != nil {
199+
ch <- err
200+
}
201+
log.Info("unlock tables:", stmt)
196202
pidfile, err = getPidFile(conn)
197203
if err != nil {
198204
ch <- err
@@ -418,6 +424,12 @@ func FlushTablesWithReadLock(db *sql.DB) (string, error) {
418424
return query, err
419425
}
420426

427+
func UnlockTables(db *sql.DB) (string, error) {
428+
query := "UNLOCK TABLES"
429+
_, err := db.Exec(query)
430+
return query, err
431+
}
432+
421433
func FlushBinaryLogs(db *sql.DB) (string, error) {
422434
_, err := db.Exec("FLUSH BINARY LOGS")
423435
return "FLUSH BINARY LOGS", err

internal/pod_executor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,15 @@ func (p *PodExecutor) CloseXenonSemiCheck(namespace, podName string) error {
116116
}
117117
return nil
118118
}
119+
120+
func (p *PodExecutor) XenonTryLeader(namespace, podName string) error {
121+
cmd := []string{"xenoncli", "raft", "trytoleader"}
122+
_, stderr, err := p.Exec(namespace, podName, "xenon", cmd...)
123+
if err != nil {
124+
return err
125+
}
126+
if len(stderr) != 0 {
127+
return fmt.Errorf("run command %s in xenon failed: %s", cmd, stderr)
128+
}
129+
return nil
130+
}

mysqlcluster/syncer/status.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,24 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
114114

115115
// get ready nodes.
116116
var readyNodes []corev1.Pod
117+
var PodLeader, PodTryLeader *corev1.Pod
118+
PodLeader, PodTryLeader = nil, nil
117119
for _, pod := range list.Items {
118120
if len(pod.ObjectMeta.Labels[utils.LableRebuild]) > 0 {
119121
if err := s.AutoRebuild(ctx, &pod, list.Items); err != nil {
120122
s.log.Error(err, "failed to AutoRebuild", "pod", pod.Name, "namespace", pod.Namespace)
121123
}
122124
continue
123125
}
126+
if pod.ObjectMeta.Labels != nil {
127+
if len(pod.ObjectMeta.Labels[utils.LabelTryLeader]) != 0 {
128+
PodTryLeader = &pod
129+
}
130+
if pod.ObjectMeta.Labels["role"] == string(utils.Leader) {
131+
PodLeader = &pod
132+
}
133+
}
134+
124135
for _, cond := range pod.Status.Conditions {
125136
switch cond.Type {
126137
case corev1.ContainersReady:
@@ -142,7 +153,17 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
142153
}
143154
}
144155
}
145-
156+
// try leader
157+
if PodTryLeader != nil {
158+
if PodLeader != nil {
159+
if err := s.SetLeaderReadOnly(PodLeader); err != nil {
160+
s.log.Info("set leader readonly", "error", err.Error())
161+
}
162+
}
163+
if err := s.TryLeader(ctx, PodTryLeader); err != nil {
164+
s.log.Error(err, "failed to Try leader", "pod", PodTryLeader.Name, "namespace", PodTryLeader.Namespace)
165+
}
166+
}
146167
s.Status.ReadyNodes = len(readyNodes)
147168
if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 {
148169
if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil {
@@ -802,3 +823,61 @@ func (s *StatusSyncer) DoRoRebuild(ctx context.Context, pod *corev1.Pod, items [
802823
}
803824
return nil
804825
}
826+
827+
func (s *StatusSyncer) TryLeader(ctx context.Context, pod *corev1.Pod) error {
828+
829+
// 1. close the xenon's SemiCheck.
830+
executor, err := internal.NewPodExecutor()
831+
if err != nil {
832+
return err
833+
}
834+
835+
err = executor.XenonTryLeader(s.Namespace, pod.Name)
836+
s.log.Info("the xenon's tryleader", "pod", pod.Name)
837+
delete(pod.ObjectMeta.Labels, utils.LabelTryLeader)
838+
if err != nil {
839+
return err
840+
}
841+
842+
if err := s.cli.Update(ctx, pod); err != nil {
843+
return err
844+
}
845+
return nil
846+
}
847+
848+
func (s *StatusSyncer) SetLeaderReadOnly(pod *corev1.Pod) error {
849+
var sqlRunner internal.SQLRunner
850+
closeCh := make(chan func())
851+
852+
var closeConn func()
853+
errCh := make(chan error)
854+
host := fmt.Sprintf("%s.%s-mysql.%s", pod.Name, s.Name, s.Namespace)
855+
cfg, errOut := internal.NewConfigFromClusterKey(
856+
s.cli, s.MysqlCluster.GetClusterKey(), utils.RootUser, host)
857+
go func(sqlRunner *internal.SQLRunner, errCh chan error, closeCh chan func()) {
858+
var err error
859+
*sqlRunner, closeConn, err = s.SQLRunnerFactory(cfg, errOut)
860+
if err != nil {
861+
s.log.V(1).Info("failed to get sql runner", "error", err)
862+
errCh <- err
863+
return
864+
}
865+
if closeConn != nil {
866+
closeCh <- closeConn
867+
return
868+
}
869+
errCh <- nil
870+
}(&sqlRunner, errCh, closeCh)
871+
872+
select {
873+
case errOut = <-errCh:
874+
return errOut
875+
case closeConn := <-closeCh:
876+
defer closeConn()
877+
case <-time.After(time.Second * 5):
878+
}
879+
if sqlRunner != nil {
880+
return sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=on"))
881+
}
882+
return nil
883+
}

utils/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ const (
203203
const LableRebuild = "rebuild"
204204
const LabelRebuildFrom = "rebuild-from"
205205
const LabelMaintain = "maintain"
206+
const LabelTryLeader = "tryleader"
206207

207208
// XenonHttpUrl is a http url corresponding to the xenon instruction.
208209
type XenonHttpUrl string

0 commit comments

Comments
 (0)