open-falcon 在豌豆荚的应用

现有监控系统的问题

目前我们使用的监控系统是豌豆荚自己开发的,有几个问题:

1. 报警功能不够稳定,程序会出现僵死的情况,而且 HTTP 报警偶尔报不出来。

2. 报警添加比较麻烦,没有模板和继承的概念。

3. 对于需要和机器关联的监控,是通过指定机器名的方式,如果机器改名或迁移,需要手动修改监控,非常麻烦,这种情况应该使用 主机组( hostGroup )。

4. 没有完整的报警组的功能,报警组信息只是简单的放在 redis 里,增加和删除只能通过调用 API 修改。

5. 现有报警系统没有解决离职员工还会收到短信的问题,这个问题可以在现有报警中优化掉,但是既然现有监控系统要废弃掉,那么就准备在新监控中解决这个问题。

总结一下,现有监控系统是一个缺乏好的顶层设计的系统,而且不是一个完善的系统。

 

为什么使用 open-falcon

open-falcon 是小米开源的监控系统,经历过大规模机器的考验,我相信在稳定性方面表现出色,而且在设计上也更为先进,我相信能够解决大部分的监控需求。

还有一点,open-falcon 在代码阅读和二次开发上会有一些便利。

 

改造了哪些东西

1. 最大的一点改进是基于 hostGroup,使用 hostGroup 好处多多,只要 hostGroup 和模板绑定,hostGroup 中的每台机器都会去使用模板,而只要维护好机器的增加和删除就好了,非常方便。

对于这一点,需要改的东西有:

1). open-falcon 自己维护了一个 host 表,hbs 会把 agent 上报的主机名和 IP 信息写入 host 表,由于我们有自己的资产系统,所以把这个功能删除。

2). 写一个同步任务从资产系统同步所有机器名到 host 表,host 表的 IP 字段可以不使用,这个功能可以在 task 中增加。

3). open-falcon 中 grp 表的 id 是自增的,由于我们有自己独立的服务树,我们决定使用 服务数节点的 id 作为 grp id,那么 id 就不能是自增的了,所以我们去掉了自增属性。

4). 由于 grp id 完全基于服务树节点 id,我们需要写一个同步任务把服务树倒数第二级节点的 id 同步到 grp 表中,grp name 我们定为节点的全路径(类似 /wandoujia/xxx/yyy/service_node)。

这里为什么是倒数第二级节点,因为我们的服务树的设计倒数第二级节点是 服务节点,最后一级是 机房节点(会绑定机器),比如:

154.pic

这个图,puppet 是倒数第二层,是服务节点,db 和 hlg 是倒数第一层机房节点。

同步倒数第二级节点后,我们可以对这些 grp 做一些通用监控(包括系统监控和 nodata 监控,而且可以对这些 grp 绑定加上 负责人的报警,后面有说)。

5). 上一个同步任务建立了 grp,那么 grp 对应的 hostname 也要定期更新,所以也要写一个任务把 grp 的 hostname 更新到 grp_host 表中(grp 的 hostname 列表基于 grp id 从资产系统中获取)。

6). 上面也提到了 grp 的 name 是基于节点的 full path,由于节点可以改名,所以要写一个同步任务更新 grp name。

7). 由于服务树节点可能被删除,所以要写一个同步任务来更新 grp_tpl、grp_host 和 grp  表。

2. 添加监控和服务树整合,使添加监控更加方便。

CAF1DB92-F724-41AC-A7D8-23B89A25D0AA

界面如上图,我们使用模板继承的概念,右面列表的模板被继承到左边时,会先创建一个新的模板 (i_nodeid_tplname 格式),这个模板的父模板是后边选中的模板,然后为新模板创建一个 action,新模板的 action uic 初始化为父模板的 action uic(初始的 uic 一般是运维部门 team,以保证运维人员可以收到报警)。

1). 为什么使用继承,因为好维护,只要修改父模板的策略,子模板就可以继承。

2). 为什么要为新模板创建一个 action,因为我们会把服务节点的负责人加到对应 grp 绑定的模板的 action.uic,新创建 action 不会污染父模板的 action.uic。

3. 报警联系人和服务树整合,使得增加报警联系人非常方便。

像上面所说的,模板继承时会把服务节点的负责人加到 grp 绑定的模板的 action.uic,那么只要维护好每个节点的负责人就好了。

两个点:

1). 需要写一个同步任务,如果节点负责人不在 action.uic 里,则添加。

2). 由于 uic 字段内容只能是用户组,那么我们写一个同步任务,对每一个公司员工创建一个名称为「用户名」的组。

4. Fe 模块的改造

1). 改造 Fe 的登陆验证,登陆方式使用公司的 SSO,去掉 ldap 验证,去掉注册功能。

2). 登陆 fe 后会设置 .DOMAIN.com cookie ( sig ),使得登陆 portal.DOMAIN.com 、dashboard.DOMAIN.com 和 alarm.DOMAIN.com 无需再验证。

这里, portal、dashboard 和 alarm 要基于 cookie 去 fe 验证(如果需要验证),fe 要实现验证的功能( fe 设置 cookie 后也会把 cookie 放入 redis)。

3). 用户从公司的员工管理系统同步,写入 user 表。

4). 由于用户的手机号可能会修改,也要写一个任务把最新手机号写入 user 表。

5). 需要判断员工是否离职,如果离职,从 rel_team_user、user 和 group 表中删除。

5. 自动添加系统监控和 nodata 监控

可以看到在第二张图中,已经继承的模板有 sa.sys.common 和 sa.agent.nodata,我们是写一个同步任务把所有倒数第二级节点都绑定这两个模板。

第一个模板是系统监控模板,监控内容包括 cpu、mem、disk、file num 等。

第二个是 nodata 模板,用于监控 agent 存活,模板的策略是当 agent.alive 为 -1 时报警。

agent 在存活的时候 agent.alive 不可能为 -1,open-falcon 提供了一个当 agent 死了没数据的情况下补偿数据的功能( Nodata 功能 ),我们对每台机器设置了当 agent 没数据补偿 agent.alive 为 -1,所以就可以根据 agent.alive 来添加 nodata 模板。

Nodata 功能可以根据 grp id 或者 hostname 来补偿数据,由于使用 grp id 程序内部还会做转换,所以直接使用 hostname 来补偿。所有 hostname 的补偿我们是写了一个同步任务,基于 host 表。

6. 关于集群监控和趋势(波动)监控

上面说的系统监控只是针对一台机器的,我们有对集群某个指标的监控和对集群某个指标趋势监控的需求。

对于集群某个指标的监控,比如平均 cpu.idle,需要在用到 aggregator 这个模块,aggregator 会根据定义的 分子、分母等信息去抓取集群的数据,计算之后再 push 到 transfer(我们同样是写了一个任务把服务树倒数第二级节点对应的 grp 加上 aggregator 配置)。然后再根据新定义的 metric 增加相应的策略。

C955CBC2-11D2-4A21-BA0B-82C1D0525E11

D097F47D-09F6-4A36-8C3F-E959669B0133

另外,对于趋势监控,只要把策略的 all(#3) 改成 diff(#3) 或者 pdiff(#3) 就可以了。

 

遇到的问题

1. 对于 url 的检测( url.check.health ),由于我们的机器访问不了外网,所以对于监控外网 url 的需求,我们采用通过 http/https 代理的方式,要改一下 agent 代码。

2. 由于我们使用 Cobar 作为 Mysql 中间件,由于 Cobar 不支持 prepare 功能,所以我们遇到了大量的 Prepare unsupported 的报错。

解决办法:对于要执行的 sql 先用 fmt.Sprintf 格式化,再整体执行一条 sql。

3. open-falcon 的 url.check.health 监控只能监控简单的 GET 请求,也不能设置 header 或者对 response content 做判断,而我们有这样的需求。和 open-faclon 作者 laiwei 聊了一下,他们基本不用 url 监控,主要用端口监控和「业务自动上报 metrics」,这是一个很好的思路,可是我们目前没搞业务自动上报。

考虑到修改 open-falcon 支持复杂的 url 监控比较麻烦,我们决定使用「插件」。

 

总结

open-falcon 是一个功能很全的系统,很好的 nodata 监控、我需要的集群监控和趋势监控、还有减少报警数量的报警合并功能,最主要的是它实现了 hostGroup 的报警方式,非常有用。

 

open-falcon 源码解析(二)

Judge 部分

1. Transfer 会把 MetaData 格式的数据改成 JudgeItem 格式之后再发送 Judge,  JudgeItem 格式是:

type JudgeItem struct {
Endpoint string `json:”endpoint”`
Metric string `json:”metric”`
Value float64 `json:”value”`
Timestamp int64 `json:”timestamp”`
JudgeType string `json:”judgeType”`
Tags map[string]string `json:”tags”`
}

JudgeType 是 MetaData  的 CounterType 值。

2. Judge 收到数据之后,放在一个大 Map 里,初始化如下:

8D316166-2447-467B-A9D7-8366DBFF4475

JudgeItemMap 结构如下:
type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}

M 的 key 是 Endpoint、Metric 和 Tags 的 md5,SafeLinkedList 中的元素则是 JudgeItem。

3. Judge 收到数据之后,以 md5 的前两位为 key 存在 HistoryBigMap 中,然后开始判断是否可以报警。在配置文件中有一个参数叫做 remain,用来定义一条报警数据(以 md5 区分)最多的条数,如果超过删除多余的旧数据。

4. 用于判断报警的 Strategy 结构如下:

type Strategy struct {
Id int `json:”id”`
Metric string `json:”metric”`
Tags map[string]string `json:”tags”`
Func string `json:”func”` // e.g. max(#3) all(#3)
Operator string `json:”operator”` // e.g. < !=
RightValue float64 `json:”rightValue”` // critical value
MaxStep int `json:”maxStep”`
Priority int `json:”priority”`
Note string `json:”note”`
Tpl *Template `json:”tpl”`
}
Func 类似 all(#3)、sum(#3)、avg(#10)、diff(#10),第一部分是说数据如何计算,数字是 limit 值,limit 是指用于判定的监控数据条数,少于则不判定;
MaxStep 表示最多报多少次警。

strategy 也保存在 map 里,以 Endpoint_Metric 为 key,判断是否报警的时候也要判断JudgeItem 和 Strategy 的 Tags 是否匹配,否则不处理。

判断报警主要根据 Func、Operator、RightValue,比如 Func 是 avg(#10),如果一个 key 的数据少于 10 就放弃,大于 10 则算出平均值,这个值叫做 leftValue,然后用 leftValue、Operator 和 rightValue 比较,如果成功则表示触发。

5. 然后会创建一个 event 来判断是否发送 event 到 alarm 的 redis,event 的格式是:

type Event struct {
Id string `json:”id”`
Strategy *Strategy `json:”strategy”`
Expression *Expression `json:”expression”`
Status string `json:”status”` // OK or PROBLEM
Endpoint string `json:”endpoint”`
LeftValue float64 `json:”leftValue”`
CurrentStep int `json:”currentStep”`
EventTime int64 `json:”eventTime”`
PushedTags map[string]string `json:”pushedTags”`
}

Id 用 s_strategy.Id_md5 标识。

如果此次 event 触发报警,而上一次 event 是 OK 的,则发送此次 event,CurrentStep 设置为1;如果报警次数已经大于等于 MaxStep,不发送 event;如果报警太频繁,间隔时间小于 MinInterval,不发送;否则发送,CurrentStep 加1。

如果此次 event 是 OK 的,而上次 event 是 PROBLEM,也发送。

发送 event 到 alarm redis,是使用 LPUSH 指令,默认 key 是event:p{Priority},Priority 是优先级( 取自 Event.Strategy.Priority 或者 Event.Expression.Priority)。

6. 除了上面的 Strategy 之外,还会同时执行 Expression 判断。

type Expression struct {
Id int `json:”id”`
Metric string `json:”metric”`
Tags map[string]string `json:”tags”`
Func string `json:”func”` // e.g. max(#3) all(#3)
Operator string `json:”operator”` // e.g. < !=
RightValue float64 `json:”rightValue”` // critical value
MaxStep int `json:”maxStep”`
Priority int `json:”priority”`
Note string `json:”note”`
ActionId int `json:”actionId”`
}

方式和 Strategy 基本类似,Strategy 倾向于基于机器和组的报警,Expression 则是自定义 tag 报警,更加灵活。

7. 会起一个 goroutine 不断从 Hbs 同步 Strategy 和 Expression。

8. 还会起一个 goroutine 清理 JudgeItem 数据,防止占用太多内存,默认清理 7 天前的数据。

 

alarm 部分

1. alarm 在配置文件中定义了 highQueues 和 lowQueues 两个 redis 队列,前者是高优先级,后者是低优先级。对于 highQueues 和 lowQueues 都会起一个 goroutine,读取这些队列中的 event,使用的指令如下:

BRPOP  highQueue1 highQueue2 highQueue3 … 0

如果前面的队列有数据则返回,如果没有则按照顺序检查下一个队列,0 表示无限的等待下去( 如果是正数表示等待超时时间 )。

一个 BRPOP 指令会读取一个 event,然后去消费。

2. 消费的过程,先取出 event 的 actionId,actionId 为 Event.Expression.ActionId 或者 Event.Strategy.Tpl.ActionId,然后根据 actionId 去 Portal 服务取出 action,action 的格式为:

type Action struct {
Id int `json:”id”`
Uic string `json:”uic”`
Url string `json:”url”`
Callback int `json:”callback”`
BeforeCallbackSms int `json:”before_callback_sms”`
BeforeCallbackMail int `json:”before_callback_mail”`
AfterCallbackSms int `json:”after_callback_sms”`
AfterCallbackMail int `json:”after_callback_mail”`
}

如果 action.Callback 为 1,则先去调用 callback,Action 定义了是否在 callback 之前或之后发送短信或邮件,有四个变量表示:BeforeCallbackSms、BeforeCallbackMail、AfterCallbackSms、AfterCallbackMail,Action.Url 就是要 callback 的地址。

如果在 callback 之前或者之后发送短信或邮件,根据 Action.Uic (teams) 去 Uic 服务请求手机和邮箱,然后写入( LPUSH )短信或者邮件的 redis 队列,短信和邮箱的格式分别是:
type Sms struct {
Tos string `json:”tos”`
Content string `json:”content”`
}

type Mail struct {
Tos string `json:”tos”`
Subject string `json:”subject”`
Content string `json:”content”`
}

3. 处理完 callback 的逻辑之后,才开始真正的消费,对于高优先级队列,直接写入短信和邮件发送队列(对于短信代码里写了当 Priority < 3 才发送),对于低优先级队列,做合并处理(对于短信当 Priority < 3 才合并)。

合并之前先把 event 转换成 SmsDto 和 MailDto 格式,然后分别扔进 UserSmsQueue 和 UserMailQueue 队列(还是通过 LPUSH )。

type SmsDto struct {
Priority int `json:”priority”`
Metric string `json:”metric”`
Content string `json:”content”`
Phone string `json:”phone”`
Status string `json:”status”`
}

type MailDto struct {
Priority int `json:”priority”`
Metric string `json:”metric”`
Subject string `json:”subject”`
Content string `json:”content”`
Email string `json:”email”`
Status string `json:”status”`
}

4. 然后起两个 goroutine,分别合并 Sms 和 Mail,合并间隔是 1 分钟。

UserSmsQueue 和 UserMailQueue 的合并规则类似,以 UserSmsQueue 举例:

1). 先 pop 出 UserSmsQueue 中的所有数据;

2). 以 Priority、Status、Phone、Metric 为 key,data 为 []*SmsDto。

如果 data 长度为 1,写入短信发送队列;如果 data > 1,则把数据发送到 Links 服务,以便提供一个可以查看的 url,然后把 Priority、Status、Metric、url、 data 长度等信息写入短信发送队列。

 

sender 部分

sender 非常简单,它从 alarm 写入的短信和邮件发送队列读取数据,发送短信和邮件。

配置文件中要指定短信和邮件的发送  api,sender 使用  post 方法来调用 api,短信 api  接受的参数需要是:tos 和 content,邮件 api 接受的参数需要是:tos、subject 和 content。

sender 使用 带缓存的 channel 来控制发送并发量。

 

Heartbeat Server 部分

也就是 hbs – 心跳服务器,它其实是个缓存服务器,从 Mysql 中同步数据放在内存中,供其他组件读取。

1. 从 Mysql 同步的数据分布放在下面的结构中:

GroupPlugins
GroupTemplates
HostGroupsMap
HostMap
TemplateCache
Strategies
HostTemplateIds
ExpressionCache
MonitoredHosts

2. Agent 会通过 rpc 上报机器信息给 hbs,hbs 会把机器信息存入 Mysql host 表中,同时打上时间戳放入 Agents 内存变量中。额外的,hbs 起一个 goroutine 每天清理一次数据,一天之内没上报的机器信息会被删除,考虑一下机器改名的情况,由于 Agents 以 hostname 为 key,新 hostname 会被上报,而老 hostname 的数据在一天之后被删除;

3. hbs 还给 Agent 提供 插件、信任 IP 列表 和 BuiltinMetrics( 包括 net.port.listen、proc.num、du.bs、url.check.health 四项);

4. hbs 还负责给 Judge 提供 Strategy 和 Expressions。

Strategy 的逻辑还挺复杂的,得说说:

1). 首先拿到 host 和 Template id 对应关系:hidTids;

2). 拿到 host  列表:hosts,包括 hostId 和 hostname;

3). 拿到 Template 信息:tpls;

4). 拿到 Strategy 信息:strategies;

5). 计算出 Template 对应的 Strategy:tpl2Strategies,一个 Template 可以被很多个 Strategy 使用;

6). 然后对 hidTids 中的每一个 hostId 获取它的 Strategy,hostId 对应一个 tplIds( []int{} ),也就是一台机器可以对应多个 Template;

7). 开始处理 tplIds,先获取 tplIds 中每一个 tplId 的 Parent 以及 Parent 的 Parent (包括 tplId 自己,递归处理),tplId 和 Parent 们会排序,从根 Parent 开始,到 tplId 结束,放在 tpl_buckets 中( [][]int{} );

8). 处理 tpl_buckets,有继承关系的放在同一个 bucket 中(就是有继承的话取最长的),比如

11123ou23213

会变成

23432423oj1

放入 uniq_tpl_buckets 中( [][]int{} )。

9). 对 uniq_tpl_buckets 中的每一个 bucket 做处理:bucket 中的 每个 tid 都是从父模板到子模板顺序,取得每个 tid 的 strategies,每个 Strategy 以 metric:{metric}/tags:{tags} 为 key 存储,然后根据 key 可以使子模板覆盖父模板,覆盖之后拿到 Strategy 列表,列表中的 Strategy 的 Strategy.Tpl 再设置成这个 bucket 的最新( 最后 ) tid 模板。

把每个 bucket 合起来就是一个 hostId 的 Strategy 了,所有 hostId 合起来就是所有的 Strategy 了。

 

Portal 部分

Portal 用来配置报警策略,它的建库 SQL 是整个 open-falcon 的核心,文件是:scripts/schema.sql, Heartbeat Server 从 Portal 数据库读取数据,如果不看 Portal 只看 Heartbeat Server 可能不好理解。

schema.sql 包括的表有:
host
grp
grp_host
tpl
strategy
expression
grp_tpl
plugin_dir
action
cluster
mockcfg

它们的关系:机器可以放入机器组,机器组和模板绑定,模板可以继承;策略和模板对应,一个模板可以对应多个策略,事实上基于模板来查询策略(子模块会覆盖父模板的策略);模板会绑定 action_id,action 中包含 用户手机号和邮箱、callback 等信息;额外的,机器组也会绑定插件目录;对于 Expression,直接和 action 关联。

 

 

总结一下,open-falcon 是一套挺复杂的监控系统,其中我觉得 Graph 和 Judge 最复杂。如果我用起来,可以优化的点:

1. Portal 中的 host 表可以不使用,机器信息通过 机器管理系统 获取;

2. 每个组件配置文件中的机器列表,最好通过 zookeeper 来动态获取;

3. 目前使用了 rrd 存储,是否可以尝试下 opentsdb 或者 influxdb。

 

open-falcon 源码解析(一)

open-falcon 是小米开源的企业级监控系统,我最近在抽空读它的源码,是为了我能够在很短的时间内搭建出一套好用的监控系统,而且如果有不满足的需求可以很快修改。

下面是 open-falcon 的架构图,组件还挺多的,到目前为止我读完了 Agent、Transfer、Graph 和 Query 四个部分。

falcon-arch

 

Agent 部分

Agent 获取的数据格式为:
type MetricValue struct {
Endpoint  string      `json:”endpoint”`
Metric    string      `json:”metric”`
Value     interface{} `json:”value”`
Step      int64       `json:”step”`
Type      string      `json:”counterType”`
Tags      string      `json:”tags”`
Timestamp int64       `json:”timestamp”`
}

1. Type 是 rrd DsType(数据源类型),比如 GAUGE、COUNTER 和 DERIVE;
2. 当收集 网卡、硬盘使用率、硬盘 IO、进程、端口、目录大小、url 时,会使用 Tags,其他不会,因为这些信息需要额外知道比如 网卡设备、硬盘分区、硬盘设备、进程名称、端口号、目录名称、url 链接等信息;
3. Agent 会起一个 goroutine,获取要检测的 url 链接、端口号、进程名、目录大小等信息,这些信息是本机无法知道;
4. Step 值由配置文件中的 Transfer.Interval 决定;
5. 每个 Metric 收集和传输时间间隔由 Transfer.Interval 决定,收集到的信息会传递给 Transfer;
6. 支持「插件」,插件目录列表通过 RPC 获取,目录列表中的所有目录下的脚本都会当做插件执行,脚本名称要包括执行超时时间(以 _ 分割),而且脚本的输出要符合 MetricValue 的格式。

 

Transfer 部分

Transfer 收到 Agent 发来的数据后,会先做一下清理,不合法的数据会被忽略,比如 Type 不合法,Value 为空,Step <= 0 等。

然后 Transfer 把格式改成 MetaData:
type MetaData struct {
Metric string `json:”metric”`
Endpoint string `json:”endpoint”`
Timestamp int64 `json:”timestamp”`
Step int64 `json:”step”`
Value float64 `json:”value”`
CounterType string `json:”counterType”`
Tags map[string]string `json:”tags”`
}
1. CounterType 是 MetricValue 的 Type 值 。
2. Tags 会从 key1=value1,key2=value2 字符串变成类似 {key1:value1, key2:value2} 的 map。

然后 Transfer 把数据插入 Graph 和 Judge 内存队列,插入到哪台 Graph 机器 或者 Judge 机器 (称为 node )由一致性 hash 决定,队列以 node 为 key,根据 node 可以拿到队列。

然后对于 Graph 和 Judge 的每一个 node,都起一个 goroutine 来发送,根据 node 拿到 addr,对每一个 addr 已经初始化了 RPC 连接池,从连接池中选一个连接,然后 RPC 调用。

这里我有一个疑问:如果 Graph 或者 Judge 的一个 node 挂了,那么一致性 hash 会自动摘除吗?目前看起来不会。一个解决办法是所有 Graph 或者 Judge 机器都向 zookeeper 注册一个临时节点,而 Transfer 来监听变化,如果变化就更新一致性 hash。

另外,数据在插入 Graph 的队列之前会被改成下面的格式:
type GraphItem struct {
Endpoint  string            `json:”endpoint”`
Metric    string            `json:”metric”`
Tags      map[string]string `json:”tags”`
Value     float64           `json:”value”`
Timestamp int64             `json:”timestamp”`
DsType    string            `json:”dstype”`
Step      int               `json:”step”`
Heartbeat int               `json:”heartbeat”`
Min       string            `json:”min”`
Max       string            `json:”max”`
}

1. DsType、Step、Heartbeat、Min 和 Max 都是 rrd 的概念;
2. Step 不能小于 30s;
3. 如果 MetaData 的 CounterType 是 GAUGE,则 DsType 也是 GAUGE,如果 CounterType 是 COUNTER 或 DERIVE,DsType 都会被改成 DERIVE;
4. DsType 如果是 GAUGE,Min 和 Max 分别是 U、U,如果是 DERIVE,Min 和 Max 分别是 0、U。

rrd 的相关内容参考这里

 

Graph 部分

收到的 GraphItem 数据存储在 GraphItemMap 结构的 GraphItems 变量中,GraphItemMap 结构如下:
type GraphItemMap struct {
sync.RWMutex
A    []map[string]*SafeLinkedList
Size int
}

1. map[string]*SafeLinkedList 的 key 格式是 checksum_dsType_step(称为 ckey),其中 checksum 是 Endpoint, Metric 和 Tags 三者的 md5,*SafeLinkedList 存的则是 GraphItem;
2. A 中 有 Size 个 map[string]*SafeLinkedList,0 到 Size-1 这些数字由 hashKey(ckey) % Size 取得。

Graph 收到数据后,会做三件事:
1. 存入定义的 GraphItems 结构中,Graph 会事先启动一个叫 rrdtool 的 goroutine,不断从 GraphItems 中读取 GraphItem 存入 rrd 数据库。

rrd 文件路径格式是:基础目录/md5前两位/md5/dsType/step.rrd

为了让 rrd 性能满足需求,设置了各种合并策略,比如 1 分钟一个点存 12 小时,5 分钟一个点存 2 天(取平均、最大、最小) 等。

2. 更新索引。

索引由两个缓存变量保存:unIndexedItemCache 和 indexedItemCache,前者保存未建立索引的数据,后者保存已经建立索引的数据。
它们结构一样,它们都以 Endpoint、Metric 和 Tags 的 md5 为 key,value 结构如下:
type IndexCacheItem struct {
UUID string
Item *cmodel.GraphItem
}
UUID 是 endpoint/metric/tags/dstype/step 组成的字符串。

有一个专门的 goroutine 从 unIndexedItemCache 去数据,来循环建立(增量)索引。

有三种索引,分别是 endpoint_ts 索引、tag_endpoint 索引 和 endpoint_counter 索引,索引信息存在 Mysql 中。

额外的,Graph 提供 /proc http 接口来更新全量索引(数据从 indexedItemCache 中获取),默认建立两天内数据的索引。

3. 存入 HistoryCache,HistoryCache 供查看最近收到的 GraphItem,HistoryCache同样以 Endpoint、Metric 和 Tags 的 Checksum 为 key,每个 key 默认只保存三条 GraphItem。

 

Query 部分

查询组件,向 Graph 查询数据。

1. history 接口。

查询参数为:
type GraphHistoryParam struct {
Start int `json:”start”`
End int `json:”end”`
CF string `json:”cf”`
EndpointCounters []cmodel.GraphInfoParam `json:”endpoint_counters”`
}

GraphInfoParam 结构为:
type GraphInfoParam struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

上两个结构合并成如下一个个结构,然后通过 RPC 向 Graph 查询:
type GraphQueryParam struct {
Start int64 `json:”start”`
End int64 `json:”end”`
ConsolFun string `json:”consolFuc”`
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

查询结果保存在下面的结构:
type GraphQueryResponse struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
DsType string `json:”dstype”`
Step int `json:”step”`
Values []*RRDData `json:”Values”`
}

RRDData 如下:
type RRDData struct {
Timestamp int64 `json:”timestamp”`
Value JsonFloat `json:”value”`
}

查询过程:
1). 根据 Endpoint 和 Counter 查询出 dsType 和 step,其中 Counter 需要是 Metric/Tags 格式;
2). 计算出 rrd 文件路径,获取 rrd 中的数据;
3). 算出 ckey,获取缓存中的数据。
4). 聚合处理。

2. info 接口。

请求结构是:
type GraphInfoParam struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

返回结构是:
type GraphInfoResp struct {
ConsolFun string `json:”consolFun”`
Step int `json:”step”`
Filename string `json:”filename”`
}

3. last 接口。

请求结构是:
type GraphLastParam struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

返回结构是:
type GraphLastResp struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
Value *RRDData `json:”value”`
}