docker swarmkit 源码阅读: Dispatcher 部分

Dispatcher 负责 agent 的连接管理,包括注册、session 管理、node 任务状态更新、任务列表等工作,文档中这么描述:

The dispatcher directly handles all agent connections. This includes registration, session management, and notification of task assignment.

 

相关的数据结构如下:

5a58371a-6cee-4eb9-ad00-8af39eaf9d8e 18fdd7c6-f22a-4bad-b0c3-050dd99e495a 62f89adf-658e-4ea9-a83f-aee62c04e5ca

Dispatcher 只运行在 manager 中的 leader 上面。

1. Config 里面定义了 node 到 Dispatcher 的 Heartbeat 超时信息,HeartbeatPeriod 是固定时间,HeartbeatEpsilon 用于随机生成校正时间,defaultGracePeriodMultiplier 定义乘数,(固定时间+校正时间)*乘数 才是真正超时时间。

注意的是,当 Dispatcher 时会把所有 node 置为 NodeStatus_UNKNOWN 状态,此时的超时时间是正常超时时间的两倍。

2. nodeStore 保存每一个 node 的注册信息;

3. taskUpdates 保存 task 的更新信息;

4. nodeUpdates 保存 node 的更新信息;

5. processUpdatesTrigger 用来表示是否要处理 task 和 node 的更新信息,默认当更新的数量超过 10000(由 maxBatchItems 定义)时会触发处理更新操作,或者等待 10ms 后触发(由 maxBatchInterval 定义);

 

处理过程如下:

1. Dispatcher 启动之后先对每一个 node 标记为 NodeStatus_UNKNOWN 状态,并且把每一个 node 封装成 registeredNode 放入 nodeStore.nodes,registeredNode 会绑定一个 Heartbeat 结构(等待 agent 来注册)。

Heartbeat 结构实现了过期后自动把 node 状态改成 NodeStatus_DOWN,并把 node 从 nodeStore 中删掉,然后把 node 加到 nodeUpdates 中,如果 nodeUpdates 的长度大于等于 maxBatchItems,直接触发处理更新操作。

2. 查看并监控 store 中的 cluster 信息,保证 Dispatcher.config.HeartbeatPeriod 等于 Cluster.Spec.Dispatcher.HeartbeatPeriod, Dispatcher.networkBootstrapKeys 等于 Cluster.Cluster.NetworkBootstrapKeys;

3. 订阅 manager 信息,如有变化修改 Dispatcher.lastSeenManagers;

4. 说一下,处理更新操作的过程,从 taskUpdates 取出 task 的 status,和 store 中的对比,如果相同则不处理,如果如果前者的状态小于后者的状态,也不处理,否则更新 task 到 store;对于 node,从 nodeUpdates 取出 Status 和 Description,更新到 store。

那么,taskUpdates 和 nodeUpdates 中的更新是从哪里获取的呢?看下一点。

5. 事实上,agent 和 Dispatcher 是通过 RPC 交互,Dispatcher 声明了一组接口,每个接口都是一个 RPC Server。

eed963f3-09d8-434d-95b3-2a44b0674cb6

Session 用户 agent 注册,Heartbeat 用于保持 session 存活,UpdateTaskStatus 用于更新 task 状态,Tasks 返回 node 上的 task 列表,Assignments 返回 node 上的 task 全量列表和增量列表 。

其中 Session、Tasks、Assignments 是 stream,agent 起来后连一次 RPC Server,之后 RPC Server 有变化是会主动向 agent 推送;而 HeartBeat 和 UpdateTaskStatus 是单向的,只是 agent 端调用,Server 端不会主动推送。

1). agent 调用 Session 时,会检查 session 是否存在,如果不存在就注册(会限制注册频率),注册会生成 sessionID,并把 registeredNode
信息放入 nodeStore.nodes 中,而且会把 node 信息放入 nodeUpdates;如果 session 存在且匹配,仅把 node 信息放入 nodeUpdates。所以,每当 agent 调用 Session 时都会更新 node 资源信息(还有状态)。

Session 还通过 RPC 向 agent 推送什么呢?推送的是 Managers 和 NetworkBootstrapKeys,用来保持 agent 信息准确,而且一有变化就推送。

2). 再看下 Heartbeat,当 agent 调用 Heartbeat 时,超时时间会向后延迟,不断的 Heartbeat 可以使 node 一直存活下去。

3). 再看 UpdateTaskStatus,上面说 taskUpdates 的更新从哪里获取,就是从 UpdateTaskStatus。UpdateTaskStatus 收到一个 node 的任务状态更新请求,把它放入 taskUpdates。

4). Tasks 从 store 中收集一个 node 上的任务列表,发给 agent,任务必须大于等于 TaskStateAssigned。这个列表是全量的,如果 node 上某个任务不在列表里,会被 terminate。

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注