一、概述

分库分表的悲观协调方法是 2018 年开发的特性,是 DM 首次支持 MySQL 分库分表的迁移。由于分库分表在各个公司中的应用实在太过广泛,所以只有在支持分库分表迁移后,DM 才有了工程实践的意义,否则只能算作一个玩具。这对 DM 来说意义重大。

由于悲观协调的内容庞大,本节只讲述悲观协调的准备过程:

  • Master 的准备过程
  • Worker 的准备过程

以及在悲观协调过程中至关重要的结构体:

  • Lock
  • Info
  • Operation
  • ShardingMeta
  • ShardingGroup

注:为了专注于我们的目的(悲观协调),本文不会对无关代码进行解读

这一节外链中的代码,读者可能会产生这个逻辑为什么是这里的疑问。这是因为本节并没有完全按照顺序读代码。所以读者可以仅带着学习的心态阅读本节,暂时不需要知道它为什么在代码的这个地方。在下一节中会顺序阅读代码

本节基于 DM release-6.0.0

二、Master

1、Pessimist 和 LockKeeper

  • NewPessimist:Pessimist 是 Master 处理悲观协调的结构体,其中最重要的成员是 LockKeeper
  • LockKeeper 的作用是管理各种各样的 lock。
1
2
3
4
5
6
7
8
9
10
11
// LockKeeper used to keep and handle DDL lock conveniently.

// The lock information do not need to be persistent, and can be re-constructed from the shard DDL info.

type LockKeeper struct {

mu sync.RWMutex

locks map[string]*Lock // lockID -> Lock

}
  • Lock 可以看作是每一个 ddl 影响的库表,这些库表分布在不同的 MySQL/source 中。由于在悲观协调过程中,数据流(binlog 流)不是停滞的,所以在数据流中,会源源不断地出现新 DDL,即新 lock。
  • LockKeeper 就是管理这些 lock。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Lock represents the shard DDL lock in memory.

// This information does not need to be persistent, and can be re-constructed from the shard DDL info.

type Lock struct {

mu sync.RWMutex



ID string // lock's ID

Task string // lock's corresponding task name

Owner string // Owner's source ID (not DM-worker's name)

DDLs []string // DDL statements

remain int // remain count of sources needed to receive DDL info



// whether the DDL info received from the source.

// if all of them have been ready, then we call the lock `synced`.

ready map[string]bool



// whether the operations have done (exec/skip the shard DDL).

// if all of them have done, then we call the lock `resolved`.

done map[string]bool

}

2、Lock

  • ID 表现形式为 ${task}-${schema}.${table},从 LockKeeper 的结构可以看到同一任务同一表下只能有一个 lock,如果遇到了不同的 DDL,影响的又是同任务同表,则报错
  • DDLs 为什么是 []string?因为这里是经过 Split 的 DDLs,单条 DDL 可能会被 Split 成多条 DDL。
  • Remain 表示剩余的还未收到 DDL 的 source/worker(以下 worker 都表示 source/worker 对)数量。即如果 worker 把这个 Lock 相关的 DDLs(实际上是 info 的形式)发给 master,master 则把 remain --
  • Ready 和 remain 表达相同的意思,remain 即表示 ready 中为 true 的个数
  • Done 表示 operation 结束的 worker 的状态

注意:下面说的所有 worker 都 ready 即是 remain <= 0,代码中即是 synced = true。这里创造的概念过多,应该收束一下,减少代码的理解难度。

TODO:可以把代码中的 remain 改成 noReadies;synced 改成 allReady

3、Info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Info represents the shard DDL information.

// This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed.

// NOTE: `Task` and `Source` are redundant in the etcd key path for convenient.

type Info struct {

Task string `json:"task"` // data migration task name

Source string `json:"source"` // upstream source ID

Schema string `json:"schema"` // schema name of the DDL

Table string `json:"table"` // table name of the DDL

DDLs []string `json:"ddls"` // DDL statements

}

Info 并没有什么特别的地方,只是用作 worker 通知 master,某 lock ready 的消息。

4、Operation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Operation represents a shard DDL coordinate operation.

// This information should be persistent in etcd so can be retrieved after the DM-master leader restarted or changed.

// NOTE: `Task` and `Source` are redundant in the etcd key path for convenient.

type Operation struct {

ID string `json:"id"` // the corresponding DDL lock ID

Task string `json:"task"` // data migration task name

Source string `json:"source"` // upstream source ID

DDLs []string `json:"ddls"` // DDL statements

Exec bool `json:"exec"` // execute or skip the DDL statements

Done bool `json:"done"` // whether the `Exec` operation has done



// only used to report to the caller of the watcher, do not marsh it.

// if it's true, it means the Operation has been deleted in etcd.

IsDeleted bool `json:"-"`

}
  • Exec:是 master 向 worker 发送消息时所用到的字段。它告知该 worker 是否需要执行 DDL,只需要 owner 执行 DDL
  • Done:是 worker 向 master 发送消息时所用到的字段。它告知 master 该 worker done 了。done 并不代表该 worker 执行了 DDL,而是表示一种状态,因为只有 owner 可以执行 DDL,non-owner done 可以当作 non-worker 接受到了 DDL 已执行的信息的确认。

3、Start

以下可以直接在代码中搜索对应的名字,请务必与对应代码一起阅读!

  • s.pessimist.Start

  • p.run

    • watchInfoOperation

      • pessimism.WatchInfoPut:等待 worker 发送 info

      • p.handleInfoPut:

      • pessimism.WatchOperationPut:

        • 等待 worker 发送 operation done。
        • 这里的 operation 和 上面发送的 operation 其实是一样的,这里监听的没有指定 task 和 source,上面发送的 operation 是指定 task 和 source 的。所以上面发送的 operation 这里也会监听到,
      • p.handleOperationPut:

        • 收到 not done operation 这里并不进行操作。即上面发送 exec operation 的时候没有 done。

        • 当收到 operation is done 时,才进行一系列操作。即从 worker 发送过来的 done operation。

          • 首先 markDone
          • 如果是 owner done 了,还会对 non-owner 发送 exec operation,督促 non-owner done
          • 如果所有的 worker 都 done 了(即 resolved),则把 lock 释放
    • 错误处理

三、Worker

1、Pessimist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Pessimist used to coordinate the shard DDL migration in pessimism mode.

type Pessimist struct {

mu sync.RWMutex



logger log.Logger

cli *clientv3.Client

task string

source string



// the shard DDL info which is pending to handle.

pendingInfo *pessimism.Info

// the shard DDL lock operation which is pending to handle.

pendingOp *pessimism.Operation

}
  • NewPessimist:worker 中的 pessimist 不同于 master 中的 pessimist。它只保存了 pendingInfopendingOp,用于保存当前的 Info 和 Operation

2、ShardingGroupKeeper

  • NewShardingGroupKeeper:其中最重要的结构体就是 shardingGroupshardingGroup 和 master 中的 Lock 类似,也是用于管理 DDL 影响到的 table(在 Lock 中是 source/worker)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ShardingGroupKeeper used to keep ShardingGroup.

type ShardingGroupKeeper struct {

sync.RWMutex

groups map[string]*ShardingGroup // target table ID -> ShardingGroup

cfg *config.SubTaskConfig



shardMetaSchema string

shardMetaTable string

shardMetaTableName string



db *conn.BaseDB

dbConn *dbconn.DBConn



tctx *tcontext.Context

}

3、ShardingGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// ShardingGroup represents a sharding DDL sync group.

type ShardingGroup struct {

sync.RWMutex

// remain count waiting for syncing

// == len(sources): DDL syncing not started or resolved

// == 0: all DDLs synced, will be reset to len(sources) after resolved combining with other dm-workers

// (0, len(sources)): waiting for syncing

// NOTE: we can make remain to be configurable if needed

remain int

sources map[string]bool // source table ID -> whether source table's DDL synced

IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later



sourceID string // associate dm-worker source ID

meta *shardmeta.ShardingMeta // sharding sequence meta storage



firstLocation *binlog.Location // first DDL's binlog pos and gtid, used to restrain the global checkpoint when un-resolved

firstEndLocation *binlog.Location // first DDL's binlog End_log_pos and gtid, used to re-direct binlog streamer after synced

ddls []string // DDL which current in syncing



flavor string

enableGTID bool

}
  • Remain 和 Lock 中的 remain 意义完全一样,但是这里指 table 是否 ready(现代码中用的 synced)

whether source table’s DDL synced

TODO:应该统一概念,这里也用 ready 表示

  • sources:和 Lock 中的 ready 类似
  • firstLocation/firstEndLocation/ddls:这三个其实在 meta 中都有保存,所以感觉其实没啥用
  • meta:里面保存着所有的有用的信息,下面详细说

4、ShardingMeta

这些信息会被持久化到磁盘中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ShardingMeta stores sharding ddl sequence

// including global sequence and each source's own sequence

// NOTE: sharding meta is not thread safe, it must be used in thread safe context.

type ShardingMeta struct {

activeIdx int // the first unsynced DDL index

global *ShardingSequence // merged sharding sequence of all source tables

sources map[string]*ShardingSequence // source table ID -> its sharding sequence

tableName string // table name (with schema) used in downstream meta db



enableGTID bool // whether enableGTID, used to compare location

}
  • activeIdx:表示当前活跃的 DDL 下标,即现在被哪个 DDL 卡住了
  • global:表示全局的 DDL 序列,即所有 table 中最长的
  • sources:表示各个 table 的 DDL 序列

5、ShardingSequence 和 DDLItem

1
2
3
4
5
6
7
// ShardingSequence records a list of DDLItem.

type ShardingSequence struct {

Items []*DDLItem `json:"items"`

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// DDLItem records ddl information used in sharding sequence organization.

type DDLItem struct {

FirstLocation binlog.Location `json:"-"` // first DDL's binlog Pos, not the End_log_pos of the event

DDLs []string `json:"ddls"` // DDLs, these ddls are in the same QueryEvent

Source string `json:"source"` // source table ID



// just used for json's marshal and unmarshal, because gtid.Set in FirstLocation is interface,

// can't be marshal and unmarshal

FirstPosition mysql.Position `json:"first-position"`

FirstGTIDSet string `json:"first-gtid-set"`

}

这里的 DDLItem 和 Lock 中的 []DDL 其实是一样的,可以看到一个 DDLItem 中也包含多个 DDL。但是这里的 DDLItem 中封装了更多的信息:

DDLs []string `json:”ddls”` // DDLs, these ddls are in the same QueryEvent

  • FirstLocation:DDL 开始的位点,主要用于识别该 DDL 之前是否来过了。

6、Init

  • s.sgk.Init():在下游创建库表 dm_meta.($task)_syncer_sharding_meta
  • s.initShardingGroups

TODO:可用 utils.FetchTargetDoTables() 替换 fromDB.FetchAllDoTables

  1. s.sgk.LoadShardMeta:把数据读出来,存到内存里
  2. s.sgk.AddGroup:把 ShardingGroup 恢复
  • Reset

7、Start

  • 启动 syncDDL 线程,等待 DDL job

四、总结

本小节主要介绍了在悲观协调中:

  • 会用到的各种数据结构
  • Master 运行了几个线程,等待 worker 中信息的来临
  • Worker 运行了 syncDDL 线程,等待着 DDL 到来

这一节中,罗列了特别多的概念,看不懂是正常的!在下一节,将从 worker 接受到一条 DDL 开始,直到这条 DDL 的 lock 解除的过程。在这个过程中,我们来学习这些结构体到底是怎么用的。

抱歉第二章就是悲观协调这个功能😂,本来想让系列文章的难度更加平滑一点的,但是人太懒了一拖再拖😂。为了避免再拖下去,先把写了的存活发出来吧!