DM 数据旅程 01:第一次 start task
一、概述
本文以 start task 为目的,带着读者从 0 到 1 启动一个数据迁移任务,旨在让读者了解到最基础的 DM 逻辑。本文将直接参照集成测试 start_task 的过程,从以下几个方面展开:
- Start dm-master
- Start dm-worker
- 绑定 source 和 dm-worker
- Start task
注:为了专注于我们的目的(start task),本文不会对无关代码进行解读
大家可使用 start/stop 流程 辅助阅读
由于写这篇的文章的时间是 2021 年 12 月份,所以所有的链接都是原 DM repo 的😂
二、start dm-master
- ./dm-master(in run_dm_master) 启动二进制文件,即调用 main 函数,其中 master-server start
- go electionNotify:这个是为了等待
etcd election
成功,并在其成功后做⬇️
DM master 中内嵌了一个 etcd,用于存储各种元数据,并且借此保证 DM master 的高可用。后面非常多的数据存储都会用到 etcd。
startLeaderComponent,其中我们这次只需要关注 s.scheduler.Start 中的go observeWorkerEvent,主要分为两部分
go WatchWorkerEvent:该函数通过 etcd client 监听是否有 workerEvent 出现
handleWorkerEv:有 workerEvent 出现时,handle it
这个时候,dm-master 等待 workerEvent 到来
三、start dm-worker
./dm-worker(in run_dm_worker)启动二进制文件,即调用 main 函数,其中 worker-server start
JoinMaster:先告诉 master,我来了!
- worker 先在这 RegisterWorker,然后会触发 master 调用 RegisterWorker
- Master 会调用 AddWorker,然后 PutWorkerInfo,把相应的 key-value 写到 etcd 中
- 可以看到写到 etcd 用的是
clientv3.OpPut(key, value)
,也就是说 kv 要执行 put 操作 - 之前的 go WatchWorkerEvent 中就监听到有事件来了,并且判断其为
mvccpb.PUT
类型,event 处理之后会通过 outCh 传到 handleWorkerEv 中进行具体的上线处理 - 刚上线的时候,就会去各种找 source 去 bound,但是现在我们还没有 create source,所以也找不到 source,暂时可以不关注这里
Start task 还需要 bound source,那 worker 首先要做的就是 observeSourceBound,这里同 observeWorkerEvent 是类似的:
- go WatchSourceBound:通过 etcd client 监听是否有 sourceBound 出现
- handleSourceBound:上面监听到了之后,则 operateSourceBound
接下来,dm-worker 等待 source bound
四、operate-source create
DM 用的命令行工具是 cobra,有兴趣的读者可深入了解一下
命令行执行 operate-source create(in test_prepare),
operate-source
这个命令在 NewOperateSourceCmd 注册,具体实现在 operateSourceFunc读取到该命令后,开始解析第一个参数(即
create
)并转换,最后被打包送到 master,开始执行 master 的 OperateSource 函数该函数中,master 会从命令行中给出的配置文件路径
- 解析并调整 source config
- 把 source cfg 也存到 etcd 里,因为 worker 待会要用
- Try to bound it to a free worker:因为我们是第一次 start task,并且也没有开启 relay 功能(test 中是开启了,但本篇文章假设不开启),所以我们就只能 bound a free worker 了。
- 最终,通过 PutSourceBound,把 SourceBound 通过 etcd client 发送
发送之后,worker 就通过 go WatchSourceBound 监听到有 SourceBound 出现,然后进行 operateSourceBound
- 首先需要拿到 source cfg,因为上面的操作都是在 master 执行的,worker 这里并没有 source cfg
- Source cfg 也是通过 etcd 拿到的,正好上面存了
之后就可以开始 subtask 了吧!
- 但是并没有。。。我们还没开始 start task 呢!
- 所以 fetchSubTasksAndAdjust 并不能拿到 subtask。拿到是空的
那没办法了,继续等呗(又是同样的 watch/handle 机制)
五、start-task
命令行执行 start-task(in test_prepare),
start-task
命令的注册和实现参考operate-source
,最后执行 master 的 StartTask 函数直接开始就 generateSubTask(
req.Task
直接传递的就是解析好的task.yaml
字符串,原来在命令的实现中就帮我们解析好啦)。简单的说,就是经过一些 adjust 和 check, 帮助我们生成了 SubTask struct重点来了,AddSubTasks -> NewSubTaskStage,subTask 终于创建好了,stage=running;再 put 进 etcd,完美。可以看到我们分别把 SubTaskCfg 和 SubTaskStage 都 put 进 etcd 了。
那上面就 watch 到 stage 来了,对 SubTaskCfg 进行处理,如果我们是要进行 run 的操作,我们还得先把 cfg 拿出来,最后 startSubTask
startSubTask 中,会 NewSubTask,再 runSubTask。subTask 内部具体的执行组建是由 unit 负责的,所以它会
- initUnits
- st.run 其实也是由 currentUnit 来 Process
六、结语
在 unit Process 后,start-task 就结束啦!是不是还意犹未尽呢?到底有哪些 unit 呢?这些 unit 内部到底是怎么 Process 的呢?在后续的文章中会陆续和大家见面哦。
其实再复读一下全文,我们发现本篇文章并没有太多很难的东西,大部分篇幅都在描述一些「准备活动」,全程用 etcd watch——master 等待 worker 到来、worker 等待 source 到来、source-worker 等待 subtask 到来。等就完事了。
任何建议和反馈都欢迎告诉我。下期再见!