如何自动化的生成 Nginx 配置文件

本文讨论如何自动化的生成 Nginx 配置文件,Nginx 配置文件指的是 server 的配置,这里有一个很重要的概念,就是:服务树,我们希望基于服务树来维护 server 配置需要的信息。

 

服务树的每一个节点就是一个服务,那么,我们对每个节点定义必要的信息,包括:

node_id – 节点 id,代表服务;

domain_uris – 代表服务绑定的域名和路径,支持多个,因为一个服务可能同时需要绑定外网域名和内网域名,而且有绑定多个路径的需求,它的格式如下:

[{‘domain_id': domain_id, ‘uri': uri }, …] ;

upstream node name – 这个服务的 upstream 名称;

servers – node_id 下绑定的机器列表(每台机器可以绑定不同的权重,以实现流量控制);

port – 这个服务监听的端口;

ip_hash – 是否对 upstream 启用 ip hash。

 

上面的 domain_id 表示域名信息,事实上域名信息包括:

domain – 域名;

ports – 端口,可以包含多个端口,并且支持是否开启 ssl;

access_log_path – 正常日志路径,默认为 ${domain}_access.log;

error_log_path – 错误日志路径,默认为 ${domain}_error.log;

ssl_cert_path – 证书文件路径,如果 ports 中没有开启 ssl,此处可以留空;

ssl_cert_key_path – 私钥文件路径,如果 ports 中没有开启 ssl,此处可以留空;

要注意的一点,如果 ssl 文件配置在 server 中,如果客户端不支持 SNI,ssl 连接可能会失败,这里我们不讨论多个域名证书不放在 server 中的配置问题。

生成配置文件基于模板,很简单,不过要注意的是,location $uri 有顺序要求,只要能保证按照 uri 的长度从高到低就没问题了。

 

最后说一下这么设计的一点好处,就是我们可以很方面的计算每个服务的可用率和延迟(并展示),而且当一个服务可用率出现问题的时候我们可以计算是否有哪台 server 出现了问题(可以自动剔除有问题的 server),并且在发现此服务出现问题的时候可以查看到是否有其他服务出现了问题,一并报警,这样可以确认是此服务本身挂了还是它依赖的服务挂了。

 

swarm task 的更新流程

swarm 声明了 13 中 task 状态,如下:

TaskStateNew TaskState = 0
TaskStateAllocated TaskState = 64
TaskStatePending TaskState = 128
TaskStateAssigned TaskState = 192
TaskStateAccepted TaskState = 256
TaskStatePreparing TaskState = 320
TaskStateReady TaskState = 384
TaskStateStarting TaskState = 448
TaskStateRunning TaskState = 512
TaskStateCompleted TaskState = 576
TaskStateShutdown TaskState = 640
TaskStateFailed TaskState = 704
TaskStateRejected TaskState = 768

 

1. Orchestrator 判断如果 service 指定的 slot 数量大于正在运行的 slot 数量, 则创建 task,状态为 TaskStateNew;

2. TaskStateAllocated 为 task 收集网络资源之后把状态改成 TaskStateAllocated;

3. TaskStatePending 表示 Scheduler 收到请求但是还未 unassigned,在代码中我未见到有把状态改成 TaskStatePending;

4. Scheduler 收到 task 之后会把 task 调度到合适的 node 上,状态改为 TaskStateAssigned;

5. 进入 Agent 的处理流程:

1). Agent 收到 task,并且获取到 task 的 Controller 之后改成 TaskStateAccepted;

2). 如果状态为 TaskStateAccepted,改成 TaskStatePreparing;

3). 如果状态为 TaskStatePreparing,调用 Controller 的 Prepare 方法,把状态改成 TaskStateReady;

4). 如果状态为 TaskStateReady,改成 TaskStateStarting。

5). 如果状态为 TaskStateStarting,调用 Controller 的 Start 方法,把状态改成 TaskStateRunning;

6). 如果状态为 TaskStateRunning,则调用 Controller 的 Wait 方法,如果 task 执行完了,状态改为 TaskStateCompleted;

当 task 状态有变化,Agent 会通过到 Dispatcher 的 session 把状态汇报给 Dispatcher。

7). 当 task 的 DesiredState 为 TaskStateShutdown 而且状态小于 TaskStateCompleted 时,调用 Controller 的 Shutdown 方法,关闭 task。

task 的 DesiredState 在 task 创建之后是 TaskStateRunning,如果 manager 想关闭 task,把 DesiredState 设置成 TaskStateShutdown(DesiredState 只能被 manager 修改)。

 

docker swarmkit 源码阅读: Agent 部分

相关的数据结果如下。

e2939924-632e-4512-8526-ec1420ab0e05

85a0d8de-6e44-48ce-a701-c416f85e9bfa

e8abffa3-e4ec-42a1-aba8-3a435b74931a

30441360-5af9-4106-ac53-d40cea2a071c

worker 负责管理任务的执行、状态上报和保存,taskManagers 用来管理任务的执行、listeners 负责监听要任务上报的信息并上报给 Dispatcher,额外的,任务信息会报错在本地 DB 中。

83602e28-7f61-4709-8aa0-df5a4ec42968

reporter 用来报告任务的状态,包括本地 DB 和 Manger,reporter 由 worker 声明。

630d0620-0fe1-492f-a6d2-69c685bbd429

 

1. 输入一个 Config 配置创建一个 agent,会检查 Config 有效性,Credentials、Executor、DB 不能为空;

2. agent 开始 run,从 config.Executor.Describe 获取 NodeDescription,重写掉 config.Hostname;

3. 首先基于 NodeDescription 创建到 manager (基于算法选一个)的 session, 会把 sessionID 和 stream 保存到 session 结构中,然后拿到第一个 SessionMessage,放入 session.messages 中等待处理。

然后,分别开启三个 goroutine:heartbeat、watch、listen,三者产生的 error 会放入 session.errs。

heartbeat 用于保持 session 不断,发送请求的格式是 HeartbeatRequest(包含 sessionID),返回的结果包含 period,等待 period 之后继续请求,一直如此下去。

watch 用于监听 task 信息,先使用 Dispatcher 的 Assignments 方法,如果失败则使用 Dispatcher 的 Tasks 方法,拿到的 task 信息写入 session.assignments;

listen 用于监听 session stream 的内容,如果有内容放入  session.messages。

4. 初始化 worker,Worker 声明了四个方法:Init、AssignTasks、UpdateTasks、Listen。

初始化调用 Init 方法,遍历 DB(本地存储,task 会存在本地) 中的 task,如果 task 没有 assigned 给一个 node,则删除 task,否则从 DB 中获取 task 的状态覆盖掉当前状态,然后启动 task。Init 方法事实上是确保已经在本地 DB 中的 task 正确运行,对于新的 task 在后面看如何处理。

启动 task 的过程,调用 worker 的 taskManager 方法,如果 task 在 taskManagers,返回。

否则调用 worker 的 newTaskManager 创建,newTaskManager 先调用 exec.Resolve 方法获取 Controller 和 状态,其中状态会被改成 TaskStateAccepted,然后保存状态,包括 保存到 DB 存储和调用 listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus 保存到 Dispatcher,最后调用另一个 newTaskManager 来创建 taskManager 结构并运行。

说说运行的处理流程:

1). 如果状态为 TaskStatePreparing,则调用 Controller 的 Prepare 方法,并设置状态为 TaskStateReady;

2). 如果状态为 TaskStateStarting,则调用 Controller 的 Start 方法,并设置状态为 TaskStateRunning;

3). 如果状态为 TaskStateRunning,则调用 Controller 的 Wait 方法,如果 task 执行完了,状态改为 TaskStateCompleted;

4). 如果状态为 TaskStateNew、TaskStateAllocated、TaskStateAssigned 则改为 TaskStateAccepted;

5). 如果状态为 TaskStateAccepted,则改成 TaskStatePreparing;

6). 如果状态为 TaskStateReady,则改成 TaskStateStarting。

上面所说,实际上是新建一个 task 的情况,状态是 TaskStateAccepted,套用状态变更流程,先会改成 TaskStatePreparing,然后改成 TaskStateReady,然后改成 TaskStateStarting,再改成 TaskStateRunning。

事实上,taskManager 结构在运行开始就会调用一次处理流程,以保证 TaskStateAccepted 状态的 task 立即被执行,而且 taskManager 还接收 task 的 update,如果和当前维护 task 的配置(CPU、MEMORY 等)不同,则进入处理流程(如果是 CPU、MEMORY 不同,docker 应该不会重启 task,但是其他的呢,比如 Network?)。

5. 上面说保存任务状态会调用 (worker.)listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus,这里的核心是 StatusReporter 这个接口。

1). agent 声明了 UpdateTaskStatus 函数,所以是 StatusReporter 这个接口的实例;

2). agent 的 run 会调用 newStatusReporter 创建 statusReporter 结构并运行,statusReporter 结构中的 reporter 就是 agent 的 UpdateTaskStatus;

3). agent 的 run 调用 worker.Listen 把 statusReporter 注册到 listeners 中(由于 statusReporter 声明了UpdateTaskStatus 函数,所以 statusReporter 是  StatusReporter 接口,所以才能加入到 listeners 中)。

4). statusReporter 的运行:不断从 statuses(map[string]*api.TaskStatus) 中获取 status 并调用 reporter.UpdateTaskStatus 完成上报;

5). statusReporter 并提供 UpdateTaskStatus 向 statuses 增加新的 status;

6). 是谁调用 UpdateTaskStatus 增加 status 呢?

答案是 worker 中的 taskManager,worker 对每一个 task 都一个 taskManager,taskManager 中有一个 reporter,每当有状态变化时,会执行此 reporter。

这个 reporter 函数是 worker 封装的,包括保存到本地 DB 和 调用 listeners 中所有 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus,而 listeners 中 statusReporterKey 中的 StatusReporter 的 UpdateTaskStatus 是什么?是 statusReporter 的 UpdateTaskStatus,执行此函数相当于向 statusReporter.statuses 增加 status。

实在是绕,简单的说:

1). agent 基于自己的 UpdateTaskStatus 包装出满足 StatusReporter 接口的 statusReporter,并把 statusReporter 注册到到 worker.listeners,当有新状态时 statusReporter 后台自动调用 agent 的 UpdateTaskStatus 上报(到Dispatcher);

2). worker 会为每一个 task 分配 taskManager,taskManager 会包含上报函数,当有状态变更时,会执行 statusReporter 的 UpdateTaskStatus 实现向 statusReporter 的状态列表中增加状态。

6. 说说 agent 的  UpdateTaskStatus 函数,通过 session 的 sendTaskStatus 函数调用到 Dispatcher RPC 请求,保存到 store 中,Dispatcher 的 UpdateTaskStatus 实现此功能;

7. 上面说了 agent 会监听 Dispatcher 来的信息,包括 session.assignments 和 session.messages,agent 会分别处理这两个信息。

1). 对于 session.assignments,是任务信息,分为全量信息和增量信息, 增量信息包括该更新的任务和该删除的任务。

对于更新的任务:

i). 先把任务保存到本地 DB 并标记为 assigned;

ii). 如果任务已经在 taskManagers,则更新,任务的配置不一样时才会真正更新,状态会被忽略,比如 CPU、MEMORY 变化,会更新;

iii). 如果任务不在 taskManagers,看本地 DB 里是否存在,如果存在,更新任务的状态(本地 DB 的状态更准),如果不在本地 DB,则保存状态,然后启动任务。

启动任务,会调用 worker.taskManager,创建新的 taskManager 并加到 taskManagers。

对于删除的任务:

i). 删除本地 DB 的 assigned 状态;

ii). 从 taskManagers 中删除;

iii). 关闭 taskManager。

全量信息先全部当做更新的任务处理,但是当前运行的任务如果不在全量列表里则会被删除。

2). 对于 session.messages,包括的功能有:

i). 更新 Manager 列表,保持最新(agent 来负责更新 Manager 列表);

ii). worker 的角色是否有变(NotifyRoleChange);

iii). 保持 NetworkBootstrapKeys 最新。

 

docker swarmkit 源码阅读: Dispatcher 部分

Dispatcher 负责 agent 的连接管理,包括注册、session 管理、node 任务状态更新、任务列表等工作,文档中这么描述:

The dispatcher directly handles all agent connections. This includes registration, session management, and notification of task assignment.

 

相关的数据结构如下:

5a58371a-6cee-4eb9-ad00-8af39eaf9d8e 18fdd7c6-f22a-4bad-b0c3-050dd99e495a 62f89adf-658e-4ea9-a83f-aee62c04e5ca

Dispatcher 只运行在 manager 中的 leader 上面。

1. Config 里面定义了 node 到 Dispatcher 的 Heartbeat 超时信息,HeartbeatPeriod 是固定时间,HeartbeatEpsilon 用于随机生成校正时间,defaultGracePeriodMultiplier 定义乘数,(固定时间+校正时间)*乘数 才是真正超时时间。

注意的是,当 Dispatcher 时会把所有 node 置为 NodeStatus_UNKNOWN 状态,此时的超时时间是正常超时时间的两倍。

2. nodeStore 保存每一个 node 的注册信息;

3. taskUpdates 保存 task 的更新信息;

4. nodeUpdates 保存 node 的更新信息;

5. processUpdatesTrigger 用来表示是否要处理 task 和 node 的更新信息,默认当更新的数量超过 10000(由 maxBatchItems 定义)时会触发处理更新操作,或者等待 10ms 后触发(由 maxBatchInterval 定义);

 

处理过程如下:

1. Dispatcher 启动之后先对每一个 node 标记为 NodeStatus_UNKNOWN 状态,并且把每一个 node 封装成 registeredNode 放入 nodeStore.nodes,registeredNode 会绑定一个 Heartbeat 结构(等待 agent 来注册)。

Heartbeat 结构实现了过期后自动把 node 状态改成 NodeStatus_DOWN,并把 node 从 nodeStore 中删掉,然后把 node 加到 nodeUpdates 中,如果 nodeUpdates 的长度大于等于 maxBatchItems,直接触发处理更新操作。

2. 查看并监控 store 中的 cluster 信息,保证 Dispatcher.config.HeartbeatPeriod 等于 Cluster.Spec.Dispatcher.HeartbeatPeriod, Dispatcher.networkBootstrapKeys 等于 Cluster.Cluster.NetworkBootstrapKeys;

3. 订阅 manager 信息,如有变化修改 Dispatcher.lastSeenManagers;

4. 说一下,处理更新操作的过程,从 taskUpdates 取出 task 的 status,和 store 中的对比,如果相同则不处理,如果如果前者的状态小于后者的状态,也不处理,否则更新 task 到 store;对于 node,从 nodeUpdates 取出 Status 和 Description,更新到 store。

那么,taskUpdates 和 nodeUpdates 中的更新是从哪里获取的呢?看下一点。

5. 事实上,agent 和 Dispatcher 是通过 RPC 交互,Dispatcher 声明了一组接口,每个接口都是一个 RPC Server。

eed963f3-09d8-434d-95b3-2a44b0674cb6

Session 用户 agent 注册,Heartbeat 用于保持 session 存活,UpdateTaskStatus 用于更新 task 状态,Tasks 返回 node 上的 task 列表,Assignments 返回 node 上的 task 全量列表和增量列表 。

其中 Session、Tasks、Assignments 是 stream,agent 起来后连一次 RPC Server,之后 RPC Server 有变化是会主动向 agent 推送;而 HeartBeat 和 UpdateTaskStatus 是单向的,只是 agent 端调用,Server 端不会主动推送。

1). agent 调用 Session 时,会检查 session 是否存在,如果不存在就注册(会限制注册频率),注册会生成 sessionID,并把 registeredNode
信息放入 nodeStore.nodes 中,而且会把 node 信息放入 nodeUpdates;如果 session 存在且匹配,仅把 node 信息放入 nodeUpdates。所以,每当 agent 调用 Session 时都会更新 node 资源信息(还有状态)。

Session 还通过 RPC 向 agent 推送什么呢?推送的是 Managers 和 NetworkBootstrapKeys,用来保持 agent 信息准确,而且一有变化就推送。

2). 再看下 Heartbeat,当 agent 调用 Heartbeat 时,超时时间会向后延迟,不断的 Heartbeat 可以使 node 一直存活下去。

3). 再看 UpdateTaskStatus,上面说 taskUpdates 的更新从哪里获取,就是从 UpdateTaskStatus。UpdateTaskStatus 收到一个 node 的任务状态更新请求,把它放入 taskUpdates。

4). Tasks 从 store 中收集一个 node 上的任务列表,发给 agent,任务必须大于等于 TaskStateAssigned。这个列表是全量的,如果 node 上某个任务不在列表里,会被 terminate。

 

docker swarmkit 源码阅读: Scheduler 部分

Scheduler 负责把 task 调度到合适的 node 上,文档中这么描述:

The scheduler assigns tasks to available nodes.

 

先看个相关的数据结构。

5067655b-c018-4410-b09f-da94e6bbd25b 2224c4e7-655f-479d-83bc-3ab1f13a9f1f c1212eee-7637-4d0f-8ec7-fa8d3a348fb8

Scheduler 的 unassignedTasks 保存没有 attach node 的 task;

preassignedTasks 保存已经 attach node 但是需要确认资源的 task;

nodeSet 以 node id 保存 node 上的 task 信息,同时会保存 node 自身信息和 node 可用资源信息(包括 CPU 和 MEMORY);

allTasks 记录所有 task(必须是 ALLOCATED 状态,也就是必须先收集网络资源才会被调度);

pipeline 记录 Filter 信息,用于判断过滤 node。

 

处理过程:

1. 首先生成 nodeSet 数据,从 store 中获取所有 task,对每一个 task 处理逻辑如下:

1). 如果 task 状态小于 TaskStateAllocated(必须先收集网络资源),或者大于 TaskStateRunning,忽略此 task;

2). 把 task 记录到 allTasks;

3). 如果 task 没有 attach node,放入 unassignedTasks,继续下一个;

4). 如果 task 状态是 TaskStateAllocated,放入 preassignedTasks,继续下一个;

这四步之后剩下的 task 是有效的 task,这些 task 会被记录到以 node id 为 key 的临时数据结构中。然后对所有 node 遍历,对每一个 node 生成 NodeInfo,生成 NodeInfo 的时候会计算可用资源,node.Description.Resources 减去 node 上每个 task 使用的资源是可用的资源,然后把 NodeInfo 保存到 nodeSet 中(以 node id 为 key,value 为 NodeInfo)。

补充一点:对于 service 的 task 来说,除了使用资源(叫做 Reservations ),是 task 最低被分配的资源,还有一个 Limits,定义 task 使用多少资源之后被 kill。

2. 处理 preassignedTasks 中的 task。

这里的 task 需要确认资源,也就是通过 Filter 的检测,如果通过检测,task 状态被改成 TaskStateAssigned,allTasks 中的 task 被覆盖,task 也会被加入到 nodeSet。

对于检测通过的 task,因为修改了状态,需要保存到 strore,保存成功的 task 从 preassignedTasks 删掉,保存失败的需要回滚 allTasks 信息 和 nodeSet 信息。

这里再补充一下 Filter 的机制,默认有三个 Filter:ReadyFilter、ResourceFilter 和 ConstraintFilter,ReadyFilter 表示此 node 是否可用,ResourceFilter 表示是否满足资源条件(CPU 和 MEMORY),ConstraintFilter 用于自定义 Label,node 需匹配 Label 定义的条件才算可用。

每个 Filter 都声明两个接口:SetTask 和 Check,SetTask 用于判断是否需要过滤,输入是 task,ReadyFilter 返回 true;ResourceFilter 如果 task 需要资源(Reservations 不为空)则返回 true,会把 Reservations 赋给 ResourceFilter;ConstraintFilter 如果 task 有 Spec.Placement.Constraints,解析出来赋给 ConstraintFilter,然后返回 true。

Check 才真正用于判断是否满足过滤条件,如果 SetTask 返回 false,就不用执行 Check,Check 的输入是 node,也就是判断此 node 是否满足 SetTask 设定的条件, 满足返回 true。

3.执行 tick,也就是对 unassignedTasks 进行调度,以 ServiceID 和 Spec 把 task 分组,然后分组调度,调度成功把 task 报存到 store 并从 unassignedTasks 删除,失败重新放入 unassignedTasks。

先为每一个组分配 Filter(执行 SetTask 逻辑,组中的 task 的 Filter 相同),然后调用 Filter 的 Check 逻辑,判断所有 node 是否满足 Filter 条件,对于满足条件的 node,按优先级排序,优先级按照 node 上的 task 数量,数量越少优先级越高。

然后对组中的 task,从优先级排序结果进行 attach,如果第 i 个 node 中的 task 数量小于第 i+1 个 node 上的 task 数量,那么还是使用第 i 个 node(所以一个 node 可以绑定一个 service 的多个 task),由于组中的 task 数量可能比排序结果 node 数量多,所以当 node 轮训一遍之后,会继续从开始轮训。

而且,每 attach 一个 node,会检查下一次要 attch 的 node 是否满足 Filter,如果不满足,那么继续下一个 node,如果「循环一圈」所有 node 都不满足,则把剩下所有没 attach 的 task 重新放入 unassignedTasks。

总结一下调度的策略:尽量先保持 node 上的 task 数量均衡,尽量保持 task 数量均衡的同时尽量做到所有 task 都分配到资源。这个策略下,存在多个 task 分配到一个 node 上的可能性。

4. 监控集群的 Event,对不同的 Event 做出不同的反应,对某些 Event 需要修改 unassignedTasks 和 preassignedTasks,这两者的修改会由一个调度器来执行。事实上,当 EventCommit 来临时才会执行,但不是立即执行,最快要等 50 ms 执行,最慢要等 1s,这段的代码写的挺好的。