机器环境管理

对于机器的环境管理,已经痛苦很长时间了,每次机器创建之后,需要先部署基础环境,比如 Java、tomcat 等,然后再发布应用程序。对于这个问题,我之前一直想用 Docker 解决,但是事实上 Docker 也不能解决所有的问题,比如 Hadoop 并不适合 Docker,对于 Hadoop 这种适合部署在机器上的组件还需要手动维护基础环境。

作为一个有追求的不断提高效率的人,我现在终于决定要解决这个问题了,也就是机器的环境管理问题。

事实上,机器分为 物理机 和 虚拟机,对于大部分类 Web 程序我们使用虚拟机,对于 Hadoop、Mysql 等资源消耗型程序我们使用物理机,而且物理机和虚拟机基本没有「混布」,这是前提,如果混布,管理起来会麻烦很多(需要分目录来管理,不能共用)。

 

因为一个服务的环境是一样的,所以我们基于服务树来统一管理服务的环境,思路很简单:

1. 把 服务节点 和 Puppet class 做一个关联,即表示服务节点下的所有机器都使用此 Puppet class 来配置基础环境。

2. Puppet 有多个 class,class 之间的继承等关联关系通过 Puppet 配置文件来维护;

3. Puppet class 需要暴露出来一个 class 的列表来供做关联的时候做选择,这个列表可以放在 Puppet 配置中的一个文件里,文件对外提供 HTTP 下载(然后再封装成 API)。

那么,当机器来请求 Puppet 获取配置的时候,怎么返回呢?

参考 这篇文章,我们使用 Puppet 的 ENC 功能,写一个自定义脚本,传入机器名,返回包含 class 的 yaml 格式文本。脚本的流程:

1. 根据机器名获取机器属于哪个(些)服务节点(基于服务树 API);

2. 根据服务节点获取 Puppet class;

3. 拼成 yaml 格式文本并打印。

如果属于多个服务节点,而且服务节点的 Puppet class 不一样,此时可以使用 default class (只配置系统基础环境,不配置应用基础环境)。

 

但是,还有一个问题,如何在机器创建之后立即配置环境?

现状是:

1. 机器安装好之后会立即执行 Puppet;

2. Puppet 执行之后会自动安装资产系统 agent,agent 把机器上报到资产系统,表示此机器安装完成;

3. 然后调用 API 把机器关联到安装机器时指定的服务节点。

像上面讲的,Puppet 配置(应用)基础环境需要知道是哪个服务节点,而第 1 步的时候不知道是哪个服务节点。

这里抛出一个修改 ENC 脚本的方法:

1. ENC 调用服务树 API,获取机器绑定的服务节点;

2. 如果找不到节点,那么继续调用装机系统 API,获取服务节点。

这种方法只有「正在装」的机器才会调用装机系统 API,增加的调用不多,所以能接受。

 

如何自动化的生成 Nginx 配置文件

本文讨论如何自动化的生成 Nginx 配置文件,Nginx 配置文件指的是 server 的配置,这里有一个很重要的概念,就是:服务树,我们希望基于服务树来维护 server 配置需要的信息。

 

服务树的每一个节点就是一个服务,那么,我们对每个节点定义必要的信息,包括:

node_id – 节点 id,代表服务;

domain_uris – 代表服务绑定的域名和路径,支持多个,因为一个服务可能同时需要绑定外网域名和内网域名,而且有绑定多个路径的需求,它的格式如下:

[{‘domain_id’: domain_id, ‘uri’: uri }, …] ;

upstream node name – 这个服务的 upstream 名称;

servers – node_id 下绑定的机器列表(每台机器可以绑定不同的权重,以实现流量控制);

port – 这个服务监听的端口;

ip_hash – 是否对 upstream 启用 ip hash。

 

上面的 domain_id 表示域名信息,事实上域名信息包括:

domain – 域名;

ports – 端口,可以包含多个端口,并且支持是否开启 ssl;

access_log_path – 正常日志路径,默认为 ${domain}_access.log;

error_log_path – 错误日志路径,默认为 ${domain}_error.log;

ssl_cert_path – 证书文件路径,如果 ports 中没有开启 ssl,此处可以留空;

ssl_cert_key_path – 私钥文件路径,如果 ports 中没有开启 ssl,此处可以留空;

要注意的一点,如果 ssl 文件配置在 server 中,如果客户端不支持 SNI,ssl 连接可能会失败,这里我们不讨论多个域名证书不放在 server 中的配置问题。

生成配置文件基于模板,很简单,不过要注意的是,location $uri 有顺序要求,只要能保证按照 uri 的长度从高到低就没问题了。

 

自动生成配置文件的好处:

1. 在页面上点点就能增加、修改或删除服务的 Nginx 配置,比较方面,而且彻底解决了使用 git 管理配置带来的各种 rewrite 、if 等冗长的配置问题;

2. 可以基于服务做可用率的报警,根据服务可以查到 domain 和 path,然后过滤 Nginx 日志来计算可用率。而且当服务出问题的时候可以查看到服务的每台机器的可用率状况,然后做出自动剔除机器的动作。

swarm task 的更新流程

swarm 声明了 13 中 task 状态,如下:

TaskStateNew TaskState = 0
TaskStateAllocated TaskState = 64
TaskStatePending TaskState = 128
TaskStateAssigned TaskState = 192
TaskStateAccepted TaskState = 256
TaskStatePreparing TaskState = 320
TaskStateReady TaskState = 384
TaskStateStarting TaskState = 448
TaskStateRunning TaskState = 512
TaskStateCompleted TaskState = 576
TaskStateShutdown TaskState = 640
TaskStateFailed TaskState = 704
TaskStateRejected TaskState = 768

 

1. Orchestrator 判断如果 service 指定的 slot 数量大于正在运行的 slot 数量, 则创建 task,状态为 TaskStateNew;

2. TaskStateAllocated 为 task 收集网络资源之后把状态改成 TaskStateAllocated;

3. TaskStatePending 表示 Scheduler 收到请求但是还未 unassigned,在代码中我未见到有把状态改成 TaskStatePending;

4. Scheduler 收到 task 之后会把 task 调度到合适的 node 上,状态改为 TaskStateAssigned;

5. 进入 Agent 的处理流程:

1). Agent 收到 task,并且获取到 task 的 Controller 之后改成 TaskStateAccepted;

2). 如果状态为 TaskStateAccepted,改成 TaskStatePreparing;

3). 如果状态为 TaskStatePreparing,调用 Controller 的 Prepare 方法,把状态改成 TaskStateReady;

4). 如果状态为 TaskStateReady,改成 TaskStateStarting。

5). 如果状态为 TaskStateStarting,调用 Controller 的 Start 方法,把状态改成 TaskStateRunning;

6). 如果状态为 TaskStateRunning,则调用 Controller 的 Wait 方法,如果 task 执行完了,状态改为 TaskStateCompleted;

当 task 状态有变化,Agent 会通过到 Dispatcher 的 session 把状态汇报给 Dispatcher。

7). 当 task 的 DesiredState 为 TaskStateShutdown 而且状态小于 TaskStateCompleted 时,调用 Controller 的 Shutdown 方法,关闭 task。

task 的 DesiredState 在 task 创建之后是 TaskStateRunning,如果 manager 想关闭 task,把 DesiredState 设置成 TaskStateShutdown(DesiredState 只能被 manager 修改)。

 

docker swarmkit 源码阅读: Agent 部分

相关的数据结构如下。

e2939924-632e-4512-8526-ec1420ab0e05

85a0d8de-6e44-48ce-a701-c416f85e9bfa

e8abffa3-e4ec-42a1-aba8-3a435b74931a

30441360-5af9-4106-ac53-d40cea2a071c

worker 负责管理任务的执行、状态上报和保存,taskManagers 用来管理任务的执行、listeners 负责监听要任务上报的信息并上报给 Dispatcher,额外的,任务信息会报错在本地 DB 中。

83602e28-7f61-4709-8aa0-df5a4ec42968

reporter 用来报告任务的状态,包括本地 DB 和 Manger,reporter 由 worker 声明。

630d0620-0fe1-492f-a6d2-69c685bbd429

 

  1. 输入一个 Config 配置创建一个 agent,会检查 Config 有效性,Credentials、Executor、DB 不能为空;

  2. agent 开始 run,从 config.Executor.Describe 获取 NodeDescription,重写掉 config.Hostname;

  3. 首先基于 NodeDescription 创建到 manager (基于算法选一个)的 session, 会把 sessionID 和 stream 保存到 session 结构中,然后拿到第一个 SessionMessage,放入 session.messages 中等待处理。

然后,分别开启三个 goroutine:heartbeat、watch、listen,三者产生的 error 会放入 session.errs。

heartbeat 用于保持 session 不断,发送请求的格式是 HeartbeatRequest(包含 sessionID),返回的结果包含 period,等待 period 之后继续请求,一直如此下去。

watch 用于监听 task 信息,先使用 Dispatcher 的 Assignments 方法,如果失败则使用 Dispatcher 的 Tasks 方法,拿到的 task 信息写入 session.assignments;

listen 用于监听 session stream 的内容,如果有内容放入  session.messages。

  1. 初始化 worker,Worker 声明了四个方法:Init、AssignTasks、UpdateTasks、Listen。

初始化调用 Init 方法,遍历 DB(本地存储,task 会存在本地) 中的 task,如果 task 没有 assigned 给一个 node,则删除 task,否则从 DB 中获取 task 的状态覆盖掉当前状态,然后启动 task。Init 方法事实上是确保已经在本地 DB 中的 task 正确运行,对于新的 task 在后面看如何处理。

启动 task 的过程,调用 worker 的 taskManager 方法,如果 task 在 taskManagers,返回。

否则调用 worker 的 newTaskManager 创建,newTaskManager 先调用 exec.Resolve 方法获取 Controller 和 状态,其中状态会被改成 TaskStateAccepted,然后保存状态,包括 保存到 DB 存储和调用 listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus 保存到 Dispatcher,最后调用另一个 newTaskManager 来创建 taskManager 结构并运行。

说说运行的处理流程:

1). 如果状态为 TaskStatePreparing,则调用 Controller 的 Prepare 方法,并设置状态为 TaskStateReady;

2). 如果状态为 TaskStateStarting,则调用 Controller 的 Start 方法,并设置状态为 TaskStateRunning;

3). 如果状态为 TaskStateRunning,则调用 Controller 的 Wait 方法,如果 task 执行完了,状态改为 TaskStateCompleted;

4). 如果状态为 TaskStateNew、TaskStateAllocated、TaskStateAssigned 则改为 TaskStateAccepted;

5). 如果状态为 TaskStateAccepted,则改成 TaskStatePreparing;

6). 如果状态为 TaskStateReady,则改成 TaskStateStarting。

上面所说,实际上是新建一个 task 的情况,状态是 TaskStateAccepted,套用状态变更流程,先会改成 TaskStatePreparing,然后改成 TaskStateReady,然后改成 TaskStateStarting,再改成 TaskStateRunning。

事实上,taskManager 结构在运行开始就会调用一次处理流程,以保证 TaskStateAccepted 状态的 task 立即被执行,而且 taskManager 还接收 task 的 update,如果和当前维护 task 的配置(CPU、MEMORY 等)不同,则进入处理流程(如果是 CPU、MEMORY 不同,docker 应该不会重启 task,但是其他的呢,比如 Network?)。

  1. 上面说保存任务状态会调用 (worker.)listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus,这里的核心是 StatusReporter 这个接口。

1). agent 声明了 UpdateTaskStatus 函数,所以是 StatusReporter 这个接口的实例;

2). agent 的 run 会调用 newStatusReporter 创建 statusReporter 结构并运行,statusReporter 结构中的 reporter 就是 agent 的 UpdateTaskStatus;

3). agent 的 run 调用 worker.Listen 把 statusReporter 注册到 listeners 中(由于 statusReporter 声明了UpdateTaskStatus 函数,所以 statusReporter 是  StatusReporter 接口,所以才能加入到 listeners 中)。

4). statusReporter 的运行:不断从 statuses(map[string]*api.TaskStatus) 中获取 status 并调用 reporter.UpdateTaskStatus 完成上报;

5). statusReporter 并提供 UpdateTaskStatus 向 statuses 增加新的 status;

6). 是谁调用 UpdateTaskStatus 增加 status 呢?

答案是 worker 中的 taskManager,worker 对每一个 task 都一个 taskManager,taskManager 中有一个 reporter,每当有状态变化时,会执行此 reporter。

这个 reporter 函数是 worker 封装的,包括保存到本地 DB 和 调用 listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus,而 listeners 中 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus 是什么?是 statusReporter 的 UpdateTaskStatus,执行此函数相当于向 statusReporter.statuses 增加 status。

实在是绕,简单的说:

1). agent 基于自己的 UpdateTaskStatus 包装出满足 StatusReporter 接口的 statusReporter,并把 statusReporter 注册到到 worker.listeners,当有新状态时 statusReporter 后台自动调用 agent 的 UpdateTaskStatus 上报(到Dispatcher);

2). worker 会为每一个 task 分配 taskManager,taskManager 会包含上报函数,当有状态变更时,会执行 statusReporter 的 UpdateTaskStatus 实现向 statusReporter 的状态列表中增加状态。

  1. 说说 agent 的  UpdateTaskStatus 函数,通过 session 的 sendTaskStatus 函数调用到 Dispatcher RPC 请求,保存到 store 中,Dispatcher 的 UpdateTaskStatus 实现此功能;

  2. 上面说了 agent 会监听 Dispatcher 来的信息,包括 session.assignments 和 session.messages,agent 会分别处理这两个信息。

1). 对于 session.assignments,是任务信息,分为全量信息和增量信息, 增量信息包括该更新的任务和该删除的任务。

对于更新的任务:

i). 先把任务保存到本地 DB 并标记为 assigned;

ii). 如果任务已经在 taskManagers,则更新,任务的配置不一样时才会真正更新,状态会被忽略,比如 CPU、MEMORY 变化,会更新;

iii). 如果任务不在 taskManagers,看本地 DB 里是否存在,如果存在,更新任务的状态(本地 DB 的状态更准),如果不在本地 DB,则保存状态,然后启动任务。

启动任务,会调用 worker.taskManager,创建新的 taskManager 并加到 taskManagers。

对于删除的任务:

i). 删除本地 DB 的 assigned 状态;

ii). 从 taskManagers 中删除;

iii). 关闭 taskManager。

全量信息先全部当做更新的任务处理,但是当前运行的任务如果不在全量列表里则会被删除。

2). 对于 session.messages,包括的功能有:

i). 更新 Manager 列表,保持最新(agent 来负责更新 Manager 列表);

ii). worker 的角色是否有变(NotifyRoleChange);

iii). 保持 NetworkBootstrapKeys 最新。

 

docker swarmkit 源码阅读: Dispatcher 部分

Dispatcher 负责 agent 的连接管理,包括注册、session 管理、node 任务状态更新、任务列表等工作,文档中这么描述:

The dispatcher directly handles all agent connections. This includes registration, session management, and notification of task assignment.

 

相关的数据结构如下:

5a58371a-6cee-4eb9-ad00-8af39eaf9d8e 18fdd7c6-f22a-4bad-b0c3-050dd99e495a 62f89adf-658e-4ea9-a83f-aee62c04e5ca

Dispatcher 只运行在 manager 中的 leader 上面。

1. Config 里面定义了 node 到 Dispatcher 的 Heartbeat 超时信息,HeartbeatPeriod 是固定时间,HeartbeatEpsilon 用于随机生成校正时间,defaultGracePeriodMultiplier 定义乘数,(固定时间+校正时间)*乘数 才是真正超时时间。

注意的是,当 Dispatcher 时会把所有 node 置为 NodeStatus_UNKNOWN 状态,此时的超时时间是正常超时时间的两倍。

2. nodeStore 保存每一个 node 的注册信息;

3. taskUpdates 保存 task 的更新信息;

4. nodeUpdates 保存 node 的更新信息;

5. processUpdatesTrigger 用来表示是否要处理 task 和 node 的更新信息,默认当更新的数量超过 10000(由 maxBatchItems 定义)时会触发处理更新操作,或者等待 10ms 后触发(由 maxBatchInterval 定义);

 

处理过程如下:

1. Dispatcher 启动之后先对每一个 node 标记为 NodeStatus_UNKNOWN 状态,并且把每一个 node 封装成 registeredNode 放入 nodeStore.nodes,registeredNode 会绑定一个 Heartbeat 结构(等待 agent 来注册)。

Heartbeat 结构实现了过期后自动把 node 状态改成 NodeStatus_DOWN,并把 node 从 nodeStore 中删掉,然后把 node 加到 nodeUpdates 中,如果 nodeUpdates 的长度大于等于 maxBatchItems,直接触发处理更新操作。

2. 查看并监控 store 中的 cluster 信息,保证 Dispatcher.config.HeartbeatPeriod 等于 Cluster.Spec.Dispatcher.HeartbeatPeriod, Dispatcher.networkBootstrapKeys 等于 Cluster.Cluster.NetworkBootstrapKeys;

3. 订阅 manager 信息,如有变化修改 Dispatcher.lastSeenManagers;

4. 说一下,处理更新操作的过程,从 taskUpdates 取出 task 的 status,和 store 中的对比,如果相同则不处理,如果如果前者的状态小于后者的状态,也不处理,否则更新 task 到 store;对于 node,从 nodeUpdates 取出 Status 和 Description,更新到 store。

那么,taskUpdates 和 nodeUpdates 中的更新是从哪里获取的呢?看下一点。

5. 事实上,agent 和 Dispatcher 是通过 RPC 交互,Dispatcher 声明了一组接口,每个接口都是一个 RPC Server。

eed963f3-09d8-434d-95b3-2a44b0674cb6

Session 用户 agent 注册,Heartbeat 用于保持 session 存活,UpdateTaskStatus 用于更新 task 状态,Tasks 返回 node 上的 task 列表,Assignments 返回 node 上的 task 全量列表和增量列表 。

其中 Session、Tasks、Assignments 是 stream,agent 起来后连一次 RPC Server,之后 RPC Server 有变化是会主动向 agent 推送;而 HeartBeat 和 UpdateTaskStatus 是单向的,只是 agent 端调用,Server 端不会主动推送。

1). agent 调用 Session 时,会检查 session 是否存在,如果不存在就注册(会限制注册频率),注册会生成 sessionID,并把 registeredNode
信息放入 nodeStore.nodes 中,而且会把 node 信息放入 nodeUpdates;如果 session 存在且匹配,仅把 node 信息放入 nodeUpdates。所以,每当 agent 调用 Session 时都会更新 node 资源信息(还有状态)。

Session 还通过 RPC 向 agent 推送什么呢?推送的是 Managers 和 NetworkBootstrapKeys,用来保持 agent 信息准确,而且一有变化就推送。

2). 再看下 Heartbeat,当 agent 调用 Heartbeat 时,超时时间会向后延迟,不断的 Heartbeat 可以使 node 一直存活下去。

3). 再看 UpdateTaskStatus,上面说 taskUpdates 的更新从哪里获取,就是从 UpdateTaskStatus。UpdateTaskStatus 收到一个 node 的任务状态更新请求,把它放入 taskUpdates。

4). Tasks 从 store 中收集一个 node 上的任务列表,发给 agent,任务必须大于等于 TaskStateAssigned。这个列表是全量的,如果 node 上某个任务不在列表里,会被 terminate。