Conversation
|
|
||
| type DBInfo struct { | ||
| Conn *client.Conn | ||
| Shard *DBShard |
There was a problem hiding this comment.
1个connection只有1个shard?
There was a problem hiding this comment.
这里是一条 sql 记录对应一个 DBInfo, 包括它所属的 shard 和 这个 shard 对应的connection
go/src/binlogsync/binlog_sync.go
Outdated
|
|
||
| type OutStatus struct { | ||
| err error | ||
| routineIndex int |
There was a problem hiding this comment.
叫goroutine吧...routine的解释有点多
go/src/binlogsync/binlog_sync.go
Outdated
| type OutStatus struct { | ||
| err error | ||
| routineIndex int | ||
| logPos uint32 |
There was a problem hiding this comment.
这里如果可以的话用有符号整数. 所有代码中避免使用无符号数, 一遇到减法就容易溢出
go/src/binlogsync/binlog_sync.go
Outdated
|
|
||
| type Config struct { | ||
| ChCap int | ||
| ChCnt int |
There was a problem hiding this comment.
这2个变量不造是啥意思...配置项当做全局变量,尽量用完整单词.
There was a problem hiding this comment.
所有的channel不放在一个slice里吗?如果放的话用slice的len维护数量就行了...
There was a problem hiding this comment.
这里想的在配置文件里 指定一下 需要创建多少个 channel
go/src/binlogsync/binlog_sync.go
Outdated
| TableIndex []string | ||
|
|
||
| BinlogFile string | ||
| BinlogPos uint32 |
There was a problem hiding this comment.
咱们应该都是用的gtid来追踪binlog的. 文件+pos的方式在多个源上不一样.不太好处理.
不造你这里为啥是要加这2个东西
There was a problem hiding this comment.
之前参考的python版 的同步用的是 文件+pos 的方式,这里也用这样来读binlog 的。我再看看 gtid 的方式
There was a problem hiding this comment.
恩...好像读的话还必须对源还必须制定binlog的file+pos, 不造有没有通过gtid来指定的方式...
drmingdrmer
left a comment
There was a problem hiding this comment.
writeThreadCount 跟 ChannelCapacity 是啥关系...在这里跟整个代码的逻辑的关系是啥...加点注释解释下吧...
go/src/binlogsync/binlog_sync.go
Outdated
| ChCap int | ||
| ChCnt int | ||
| ChannelCapacity int | ||
| WriteThreadCount int |
There was a problem hiding this comment.
thread 似乎不准确 这里用的是goroutine 吧?这里用来控制并发吧,而且控制并发数就这一个变量,干脆叫 worker 怎么样?
There was a problem hiding this comment.
这里的 Config 是用来解 binary -> json 的吧。最好定义下一下 json 字段名
| FileLog = log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) | ||
|
|
||
| // read config | ||
| jsonParser := NewJsonStruct() |
There was a problem hiding this comment.
配置用yaml, rpc消息用json. 分别偏重可读性和解析的效率.
配置允许用json写, 但用yaml解析. 这样支持可读性更好的yaml配置.
go/src/binlogsync/util.go
Outdated
|
|
||
| func calcHashToInt64(src []byte) (int64, error) { | ||
| h := sha256.New() | ||
| h.Write(src) |
There was a problem hiding this comment.
计算hash时, 因为不是为了校验, 只是为了sharding, 可以用弱1点的, 只要差不多均匀就可以了. @templexxx 那应该调研过很多快速的hash. 求1个够快够均匀的, 输出是整数就行.
这个环节可以考虑降低点cpu开销, 字符串不要拼接, 一个一个update到hash函数里, 类似sha1:init(); sha1:update(buf); sha1:final() 酱.
There was a problem hiding this comment.
看上去 murmurhash fnv adler32 包括 crc32 应该都 okay
go/src/binlogsync/sqlUtil.go
Outdated
| } | ||
|
|
||
| func quote(src, quote string) string { | ||
| rst := strings.Replace(src, quote, "\\"+quote, -1) |
go/src/binlogsync/sqlUtil.go
Outdated
| } | ||
|
|
||
| func quote(src, quote string) string { | ||
| rst := strings.Replace(src, quote, "\\"+quote, -1) |
There was a problem hiding this comment.
quote必须也replace \, 否则mysql解析出错. 这类基础操作涉及到很多细节, 需要测试. 另外没有现成的 blabla-escape的函数用咩?
` --> \`
\` --> \\\` ## 不应该是 \\`
There was a problem hiding this comment.
有 escape 函数。这里 quote 的作用只是用 " 把 src 括起来。好像也不太需要这个函数,escape 之后直接 + " 就好...
There was a problem hiding this comment.
what? 不是用来替换table名字中的 反引号的吗?
| "TableField": ["bucket_id", "scope", "key", "ts", "is_del", "owner", "acl", "sha1", | ||
| "ver", "md5", "crc32", "size", "file_meta", "group_id", "origo", "expires", "multipart"], | ||
| "TableShard": [ "bucket_id" , "scope", "key"], | ||
| "TableIndex": [ "bucket_id" , "scope", "key"], |
|
|
||
| func writeToDB(chIdx int, inCh chan *WriteEvent, outCh chan *OutStatus) { | ||
| for ev := range inCh { | ||
| mutex.Lock() |
There was a problem hiding this comment.
getEventConnection 里面查找 shard 时有一段 二分查找的逻辑,会有值的修改和比较 需要锁上
There was a problem hiding this comment.
是根据下标 找 slice 里的 shard ,这个 shard 是全局的,过程中下标的修改可能导致找到错误的 shard 吧
| writeEV := &WriteEvent{ | ||
| event: ev, | ||
| } | ||
| writeChs[0] <- writeEV |
There was a problem hiding this comment.
writeToDB 这个函数应该处理不了 replication.ROTATE_EVENT 这个event
There was a problem hiding this comment.
对的。处理 rotate 和记录已经同步到的位置和恢复执行要再考虑下,这部分后面更新一个提交点。
|
参考一下 ec 项目 go 的文件布局调整一下吧。 在这里 binlogsync 就是一个单纯的 app, 不需要外部引用他吧。那么建立一个 app 目录,下面再建立一个 binlogsync 目录用来存放你的文件。 下面包含: binlogsync.go binlogsync.conf(作为配置示例) sql.go util.go |
go/src/binlogsync/binlog_sync.go
Outdated
| func main() { | ||
|
|
||
| // set log | ||
| ShellLog = log.New(os.Stdout, "", 0) |
| // set log | ||
| ShellLog = log.New(os.Stdout, "", 0) | ||
|
|
||
| logFile, err := os.Create(logFileName) |
go/src/binlogsync/binlog_sync.go
Outdated
| var dbPool = make(map[string]*client.Conn) | ||
| var conf = Config{} | ||
| var logFileName = "binlog_sync.out" | ||
| var confName = "./config.json" |
There was a problem hiding this comment.
logFileName confName 还是作为命令行参数传进来吧,方便调试
go/src/binlogsync/binlog_sync.go
Outdated
| ) | ||
|
|
||
| var dbPool = make(map[string]*client.Conn) | ||
| var conf = Config{} |
| type JsonStruct struct { | ||
| } | ||
|
|
||
| func NewJsonStruct() *JsonStruct { |
go/src/binlogsync/binlog_sync.go
Outdated
| ChCap int | ||
| ChCnt int | ||
| ChannelCapacity int | ||
| WriteThreadCount int |
There was a problem hiding this comment.
thread 似乎不准确 这里用的是goroutine 吧?这里用来控制并发吧,而且控制并发数就这一个变量,干脆叫 worker 怎么样?
go/src/binlogsync/binlog_sync.go
Outdated
| ChCap int | ||
| ChCnt int | ||
| ChannelCapacity int | ||
| WriteThreadCount int |
There was a problem hiding this comment.
这里的 Config 是用来解 binary -> json 的吧。最好定义下一下 json 字段名
go/src/binlogsync/binlog_sync.go
Outdated
| } | ||
|
|
||
| type Config struct { | ||
| ChannelCapacity int |
| return &conf.Shards[0] | ||
| } | ||
|
|
||
| func main() { |
go/src/binlogsync/binlog_sync.go
Outdated
|
|
||
| var logFile *os.File | ||
|
|
||
| var mutex sync.Mutex |
| rst := strings.Replace(src, quote, "\\"+quote, -1) | ||
| func quoteString(src, quote string) string { | ||
| safeQuote := strings.Replace(quote, "\\", "\\\\", -1) | ||
| rst := strings.Replace(src, quote, "\\"+safeQuote, -1) |
There was a problem hiding this comment.
感觉逻辑不太对...替换quote里的\是啥意思
There was a problem hiding this comment.
嗯.. 感觉这里应该限制 len(quote) == 1 ,这样只用替换 src 中的quote就好了。len(quote) > 1 的话不太好处理replace src 里的 quote
There was a problem hiding this comment.
为了解决这种情况出现, 但是没想对..
` -> \`
\` -> \\\` # 不是 \\`
| i := sort.Search(len(conf.Shards), func(i int) bool { | ||
| shard := conf.Shards[i].From | ||
|
|
||
| rst, err := compareStringSlice(shard, tbShards) |
There was a problem hiding this comment.
额...row里的所有field都是string? 没有整数吗?
| func main() { | ||
|
|
||
| // set log | ||
| shellLog = log.New(os.Stdout, "", 0) |
There was a problem hiding this comment.
还是加个shelllog吧.以后替换容易点...最终肯定不会往shell里打印吧...
| var ( | ||
| mutex *sync.Mutex | ||
|
|
||
| wg sync.WaitGroup |
There was a problem hiding this comment.
以面向对象的方式来实现吧, 一个机器上肯定不止1个复制关系在跑, 可能一个进程里要处理多个复制.
| ) | ||
|
|
||
| func validRow(srcRow []interface{}) map[string]string { | ||
| // the first column `id` should not put in new rowValue |
There was a problem hiding this comment.
不太明白, 都处理成string的好处是啥?
函数名不太合适, normalize 好点...valid是形容词
There was a problem hiding this comment.
都变成 string 方便一个row放在一个map。保留原格式像 int []byte 好像也没特别用处..
从一机器读取 mysql binlog 数据向另一数据库执行 sql 语句进行 binlog 同步。
使用 go 实现,依赖 的 go-mysql 包比较大,没有在这个目录。
详细描述: #2