open-falcon 源码解析(二)

Judge 部分

1. Transfer 会把 MetaData 格式的数据改成 JudgeItem 格式之后再发送 Judge,  JudgeItem 格式是:

type JudgeItem struct {
Endpoint string `json:”endpoint”`
Metric string `json:”metric”`
Value float64 `json:”value”`
Timestamp int64 `json:”timestamp”`
JudgeType string `json:”judgeType”`
Tags map[string]string `json:”tags”`
}

JudgeType 是 MetaData  的 CounterType 值。

2. Judge 收到数据之后,放在一个大 Map 里,初始化如下:

8D316166-2447-467B-A9D7-8366DBFF4475

JudgeItemMap 结构如下:
type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}

M 的 key 是 Endpoint、Metric 和 Tags 的 md5,SafeLinkedList 中的元素则是 JudgeItem。

3. Judge 收到数据之后,以 md5 的前两位为 key 存在 HistoryBigMap 中,然后开始判断是否可以报警。在配置文件中有一个参数叫做 remain,用来定义一条报警数据(以 md5 区分)最多的条数,如果超过删除多余的旧数据。

4. 用于判断报警的 Strategy 结构如下:

type Strategy struct {
Id int `json:”id”`
Metric string `json:”metric”`
Tags map[string]string `json:”tags”`
Func string `json:”func”` // e.g. max(#3) all(#3)
Operator string `json:”operator”` // e.g. < !=
RightValue float64 `json:”rightValue”` // critical value
MaxStep int `json:”maxStep”`
Priority int `json:”priority”`
Note string `json:”note”`
Tpl *Template `json:”tpl”`
}
Func 类似 all(#3)、sum(#3)、avg(#10)、diff(#10),第一部分是说数据如何计算,数字是 limit 值,limit 是指用于判定的监控数据条数,少于则不判定;
MaxStep 表示最多报多少次警。

strategy 也保存在 map 里,以 Endpoint_Metric 为 key,判断是否报警的时候也要判断JudgeItem 和 Strategy 的 Tags 是否匹配,否则不处理。

判断报警主要根据 Func、Operator、RightValue,比如 Func 是 avg(#10),如果一个 key 的数据少于 10 就放弃,大于 10 则算出平均值,这个值叫做 leftValue,然后用 leftValue、Operator 和 rightValue 比较,如果成功则表示触发。

5. 然后会创建一个 event 来判断是否发送 event 到 alarm 的 redis,event 的格式是:

type Event struct {
Id string `json:”id”`
Strategy *Strategy `json:”strategy”`
Expression *Expression `json:”expression”`
Status string `json:”status”` // OK or PROBLEM
Endpoint string `json:”endpoint”`
LeftValue float64 `json:”leftValue”`
CurrentStep int `json:”currentStep”`
EventTime int64 `json:”eventTime”`
PushedTags map[string]string `json:”pushedTags”`
}

Id 用 s_strategy.Id_md5 标识。

如果此次 event 触发报警,而上一次 event 是 OK 的,则发送此次 event,CurrentStep 设置为1;如果报警次数已经大于等于 MaxStep,不发送 event;如果报警太频繁,间隔时间小于 MinInterval,不发送;否则发送,CurrentStep 加1。

如果此次 event 是 OK 的,而上次 event 是 PROBLEM,也发送。

发送 event 到 alarm redis,是使用 LPUSH 指令,默认 key 是event:p{Priority},Priority 是优先级( 取自 Event.Strategy.Priority 或者 Event.Expression.Priority)。

6. 除了上面的 Strategy 之外,还会同时执行 Expression 判断。

type Expression struct {
Id int `json:”id”`
Metric string `json:”metric”`
Tags map[string]string `json:”tags”`
Func string `json:”func”` // e.g. max(#3) all(#3)
Operator string `json:”operator”` // e.g. < !=
RightValue float64 `json:”rightValue”` // critical value
MaxStep int `json:”maxStep”`
Priority int `json:”priority”`
Note string `json:”note”`
ActionId int `json:”actionId”`
}

方式和 Strategy 基本类似,Strategy 倾向于基于机器和组的报警,Expression 则是自定义 tag 报警,更加灵活。

7. 会起一个 goroutine 不断从 Hbs 同步 Strategy 和 Expression。

8. 还会起一个 goroutine 清理 JudgeItem 数据,防止占用太多内存,默认清理 7 天前的数据。

 

alarm 部分

1. alarm 在配置文件中定义了 highQueues 和 lowQueues 两个 redis 队列,前者是高优先级,后者是低优先级。对于 highQueues 和 lowQueues 都会起一个 goroutine,读取这些队列中的 event,使用的指令如下:

BRPOP  highQueue1 highQueue2 highQueue3 … 0

如果前面的队列有数据则返回,如果没有则按照顺序检查下一个队列,0 表示无限的等待下去( 如果是正数表示等待超时时间 )。

一个 BRPOP 指令会读取一个 event,然后去消费。

2. 消费的过程,先取出 event 的 actionId,actionId 为 Event.Expression.ActionId 或者 Event.Strategy.Tpl.ActionId,然后根据 actionId 去 Portal 服务取出 action,action 的格式为:

type Action struct {
Id int `json:”id”`
Uic string `json:”uic”`
Url string `json:”url”`
Callback int `json:”callback”`
BeforeCallbackSms int `json:”before_callback_sms”`
BeforeCallbackMail int `json:”before_callback_mail”`
AfterCallbackSms int `json:”after_callback_sms”`
AfterCallbackMail int `json:”after_callback_mail”`
}

如果 action.Callback 为 1,则先去调用 callback,Action 定义了是否在 callback 之前或之后发送短信或邮件,有四个变量表示:BeforeCallbackSms、BeforeCallbackMail、AfterCallbackSms、AfterCallbackMail,Action.Url 就是要 callback 的地址。

如果在 callback 之前或者之后发送短信或邮件,根据 Action.Uic (teams) 去 Uic 服务请求手机和邮箱,然后写入( LPUSH )短信或者邮件的 redis 队列,短信和邮箱的格式分别是:
type Sms struct {
Tos string `json:”tos”`
Content string `json:”content”`
}

type Mail struct {
Tos string `json:”tos”`
Subject string `json:”subject”`
Content string `json:”content”`
}

3. 处理完 callback 的逻辑之后,才开始真正的消费,对于高优先级队列,直接写入短信和邮件发送队列(对于短信代码里写了当 Priority < 3 才发送),对于低优先级队列,做合并处理(对于短信当 Priority < 3 才合并)。

合并之前先把 event 转换成 SmsDto 和 MailDto 格式,然后分别扔进 UserSmsQueue 和 UserMailQueue 队列(还是通过 LPUSH )。

type SmsDto struct {
Priority int `json:”priority”`
Metric string `json:”metric”`
Content string `json:”content”`
Phone string `json:”phone”`
Status string `json:”status”`
}

type MailDto struct {
Priority int `json:”priority”`
Metric string `json:”metric”`
Subject string `json:”subject”`
Content string `json:”content”`
Email string `json:”email”`
Status string `json:”status”`
}

4. 然后起两个 goroutine,分别合并 Sms 和 Mail,合并间隔是 1 分钟。

UserSmsQueue 和 UserMailQueue 的合并规则类似,以 UserSmsQueue 举例:

1). 先 pop 出 UserSmsQueue 中的所有数据;

2). 以 Priority、Status、Phone、Metric 为 key,data 为 []*SmsDto。

如果 data 长度为 1,写入短信发送队列;如果 data > 1,则把数据发送到 Links 服务,以便提供一个可以查看的 url,然后把 Priority、Status、Metric、url、 data 长度等信息写入短信发送队列。

 

sender 部分

sender 非常简单,它从 alarm 写入的短信和邮件发送队列读取数据,发送短信和邮件。

配置文件中要指定短信和邮件的发送  api,sender 使用  post 方法来调用 api,短信 api  接受的参数需要是:tos 和 content,邮件 api 接受的参数需要是:tos、subject 和 content。

sender 使用 带缓存的 channel 来控制发送并发量。

 

Heartbeat Server 部分

也就是 hbs – 心跳服务器,它其实是个缓存服务器,从 Mysql 中同步数据放在内存中,供其他组件读取。

1. 从 Mysql 同步的数据分布放在下面的结构中:

GroupPlugins
GroupTemplates
HostGroupsMap
HostMap
TemplateCache
Strategies
HostTemplateIds
ExpressionCache
MonitoredHosts

2. Agent 会通过 rpc 上报机器信息给 hbs,hbs 会把机器信息存入 Mysql host 表中,同时打上时间戳放入 Agents 内存变量中。额外的,hbs 起一个 goroutine 每天清理一次数据,一天之内没上报的机器信息会被删除,考虑一下机器改名的情况,由于 Agents 以 hostname 为 key,新 hostname 会被上报,而老 hostname 的数据在一天之后被删除;

3. hbs 还给 Agent 提供 插件、信任 IP 列表 和 BuiltinMetrics( 包括 net.port.listen、proc.num、du.bs、url.check.health 四项);

4. hbs 还负责给 Judge 提供 Strategy 和 Expressions。

Strategy 的逻辑还挺复杂的,得说说:

1). 首先拿到 host 和 Template id 对应关系:hidTids;

2). 拿到 host  列表:hosts,包括 hostId 和 hostname;

3). 拿到 Template 信息:tpls;

4). 拿到 Strategy 信息:strategies;

5). 计算出 Template 对应的 Strategy:tpl2Strategies,一个 Template 可以被很多个 Strategy 使用;

6). 然后对 hidTids 中的每一个 hostId 获取它的 Strategy,hostId 对应一个 tplIds( []int{} ),也就是一台机器可以对应多个 Template;

7). 开始处理 tplIds,先获取 tplIds 中每一个 tplId 的 Parent 以及 Parent 的 Parent (包括 tplId 自己,递归处理),tplId 和 Parent 们会排序,从根 Parent 开始,到 tplId 结束,放在 tpl_buckets 中( [][]int{} );

8). 处理 tpl_buckets,有继承关系的放在同一个 bucket 中(就是有继承的话取最长的),比如

11123ou23213

会变成

23432423oj1

放入 uniq_tpl_buckets 中( [][]int{} )。

9). 对 uniq_tpl_buckets 中的每一个 bucket 做处理:bucket 中的 每个 tid 都是从父模板到子模板顺序,取得每个 tid 的 strategies,每个 Strategy 以 metric:{metric}/tags:{tags} 为 key 存储,然后根据 key 可以使子模板覆盖父模板,覆盖之后拿到 Strategy 列表,列表中的 Strategy 的 Strategy.Tpl 再设置成这个 bucket 的最新( 最后 ) tid 模板。

把每个 bucket 合起来就是一个 hostId 的 Strategy 了,所有 hostId 合起来就是所有的 Strategy 了。

 

Portal 部分

Portal 用来配置报警策略,它的建库 SQL 是整个 open-falcon 的核心,文件是:scripts/schema.sql, Heartbeat Server 从 Portal 数据库读取数据,如果不看 Portal 只看 Heartbeat Server 可能不好理解。

schema.sql 包括的表有:
host
grp
grp_host
tpl
strategy
expression
grp_tpl
plugin_dir
action
cluster
mockcfg

它们的关系:机器可以放入机器组,机器组和模板绑定,模板可以继承;策略和模板对应,一个模板可以对应多个策略,事实上基于模板来查询策略(子模块会覆盖父模板的策略);模板会绑定 action_id,action 中包含 用户手机号和邮箱、callback 等信息;额外的,机器组也会绑定插件目录;对于 Expression,直接和 action 关联。

 

 

总结一下,open-falcon 是一套挺复杂的监控系统,其中我觉得 Graph 和 Judge 最复杂。如果我用起来,可以优化的点:

1. Portal 中的 host 表可以不使用,机器信息通过 机器管理系统 获取;

2. 每个组件配置文件中的机器列表,最好通过 zookeeper 来动态获取;

3. 目前使用了 rrd 存储,是否可以尝试下 opentsdb 或者 influxdb。

 

open-falcon 源码解析(一)

open-falcon 是小米开源的企业级监控系统,我最近在抽空读它的源码,是为了我能够在很短的时间内搭建出一套好用的监控系统,而且如果有不满足的需求可以很快修改。

下面是 open-falcon 的架构图,组件还挺多的,到目前为止我读完了 Agent、Transfer、Graph 和 Query 四个部分。

falcon-arch

 

Agent 部分

Agent 获取的数据格式为:
type MetricValue struct {
Endpoint  string      `json:”endpoint”`
Metric    string      `json:”metric”`
Value     interface{} `json:”value”`
Step      int64       `json:”step”`
Type      string      `json:”counterType”`
Tags      string      `json:”tags”`
Timestamp int64       `json:”timestamp”`
}

1. Type 是 rrd DsType(数据源类型),比如 GAUGE、COUNTER 和 DERIVE;
2. 当收集 网卡、硬盘使用率、硬盘 IO、进程、端口、目录大小、url 时,会使用 Tags,其他不会,因为这些信息需要额外知道比如 网卡设备、硬盘分区、硬盘设备、进程名称、端口号、目录名称、url 链接等信息;
3. Agent 会起一个 goroutine,获取要检测的 url 链接、端口号、进程名、目录大小等信息,这些信息是本机无法知道;
4. Step 值由配置文件中的 Transfer.Interval 决定;
5. 每个 Metric 收集和传输时间间隔由 Transfer.Interval 决定,收集到的信息会传递给 Transfer;
6. 支持「插件」,插件目录列表通过 RPC 获取,目录列表中的所有目录下的脚本都会当做插件执行,脚本名称要包括执行超时时间(以 _ 分割),而且脚本的输出要符合 MetricValue 的格式。

 

Transfer 部分

Transfer 收到 Agent 发来的数据后,会先做一下清理,不合法的数据会被忽略,比如 Type 不合法,Value 为空,Step <= 0 等。

然后 Transfer 把格式改成 MetaData:
type MetaData struct {
Metric string `json:”metric”`
Endpoint string `json:”endpoint”`
Timestamp int64 `json:”timestamp”`
Step int64 `json:”step”`
Value float64 `json:”value”`
CounterType string `json:”counterType”`
Tags map[string]string `json:”tags”`
}
1. CounterType 是 MetricValue 的 Type 值 。
2. Tags 会从 key1=value1,key2=value2 字符串变成类似 {key1:value1, key2:value2} 的 map。

然后 Transfer 把数据插入 Graph 和 Judge 内存队列,插入到哪台 Graph 机器 或者 Judge 机器 (称为 node )由一致性 hash 决定,队列以 node 为 key,根据 node 可以拿到队列。

然后对于 Graph 和 Judge 的每一个 node,都起一个 goroutine 来发送,根据 node 拿到 addr,对每一个 addr 已经初始化了 RPC 连接池,从连接池中选一个连接,然后 RPC 调用。

这里我有一个疑问:如果 Graph 或者 Judge 的一个 node 挂了,那么一致性 hash 会自动摘除吗?目前看起来不会。一个解决办法是所有 Graph 或者 Judge 机器都向 zookeeper 注册一个临时节点,而 Transfer 来监听变化,如果变化就更新一致性 hash。

另外,数据在插入 Graph 的队列之前会被改成下面的格式:
type GraphItem struct {
Endpoint  string            `json:”endpoint”`
Metric    string            `json:”metric”`
Tags      map[string]string `json:”tags”`
Value     float64           `json:”value”`
Timestamp int64             `json:”timestamp”`
DsType    string            `json:”dstype”`
Step      int               `json:”step”`
Heartbeat int               `json:”heartbeat”`
Min       string            `json:”min”`
Max       string            `json:”max”`
}

1. DsType、Step、Heartbeat、Min 和 Max 都是 rrd 的概念;
2. Step 不能小于 30s;
3. 如果 MetaData 的 CounterType 是 GAUGE,则 DsType 也是 GAUGE,如果 CounterType 是 COUNTER 或 DERIVE,DsType 都会被改成 DERIVE;
4. DsType 如果是 GAUGE,Min 和 Max 分别是 U、U,如果是 DERIVE,Min 和 Max 分别是 0、U。

rrd 的相关内容参考这里

 

Graph 部分

收到的 GraphItem 数据存储在 GraphItemMap 结构的 GraphItems 变量中,GraphItemMap 结构如下:
type GraphItemMap struct {
sync.RWMutex
A    []map[string]*SafeLinkedList
Size int
}

1. map[string]*SafeLinkedList 的 key 格式是 checksum_dsType_step(称为 ckey),其中 checksum 是 Endpoint, Metric 和 Tags 三者的 md5,*SafeLinkedList 存的则是 GraphItem;
2. A 中 有 Size 个 map[string]*SafeLinkedList,0 到 Size-1 这些数字由 hashKey(ckey) % Size 取得。

Graph 收到数据后,会做三件事:
1. 存入定义的 GraphItems 结构中,Graph 会事先启动一个叫 rrdtool 的 goroutine,不断从 GraphItems 中读取 GraphItem 存入 rrd 数据库。

rrd 文件路径格式是:基础目录/md5前两位/md5/dsType/step.rrd

为了让 rrd 性能满足需求,设置了各种合并策略,比如 1 分钟一个点存 12 小时,5 分钟一个点存 2 天(取平均、最大、最小) 等。

2. 更新索引。

索引由两个缓存变量保存:unIndexedItemCache 和 indexedItemCache,前者保存未建立索引的数据,后者保存已经建立索引的数据。
它们结构一样,它们都以 Endpoint、Metric 和 Tags 的 md5 为 key,value 结构如下:
type IndexCacheItem struct {
UUID string
Item *cmodel.GraphItem
}
UUID 是 endpoint/metric/tags/dstype/step 组成的字符串。

有一个专门的 goroutine 从 unIndexedItemCache 去数据,来循环建立(增量)索引。

有三种索引,分别是 endpoint_ts 索引、tag_endpoint 索引 和 endpoint_counter 索引,索引信息存在 Mysql 中。

额外的,Graph 提供 /proc http 接口来更新全量索引(数据从 indexedItemCache 中获取),默认建立两天内数据的索引。

3. 存入 HistoryCache,HistoryCache 供查看最近收到的 GraphItem,HistoryCache同样以 Endpoint、Metric 和 Tags 的 Checksum 为 key,每个 key 默认只保存三条 GraphItem。

 

Query 部分

查询组件,向 Graph 查询数据。

1. history 接口。

查询参数为:
type GraphHistoryParam struct {
Start int `json:”start”`
End int `json:”end”`
CF string `json:”cf”`
EndpointCounters []cmodel.GraphInfoParam `json:”endpoint_counters”`
}

GraphInfoParam 结构为:
type GraphInfoParam struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

上两个结构合并成如下一个个结构,然后通过 RPC 向 Graph 查询:
type GraphQueryParam struct {
Start int64 `json:”start”`
End int64 `json:”end”`
ConsolFun string `json:”consolFuc”`
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

查询结果保存在下面的结构:
type GraphQueryResponse struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
DsType string `json:”dstype”`
Step int `json:”step”`
Values []*RRDData `json:”Values”`
}

RRDData 如下:
type RRDData struct {
Timestamp int64 `json:”timestamp”`
Value JsonFloat `json:”value”`
}

查询过程:
1). 根据 Endpoint 和 Counter 查询出 dsType 和 step,其中 Counter 需要是 Metric/Tags 格式;
2). 计算出 rrd 文件路径,获取 rrd 中的数据;
3). 算出 ckey,获取缓存中的数据。
4). 聚合处理。

2. info 接口。

请求结构是:
type GraphInfoParam struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

返回结构是:
type GraphInfoResp struct {
ConsolFun string `json:”consolFun”`
Step int `json:”step”`
Filename string `json:”filename”`
}

3. last 接口。

请求结构是:
type GraphLastParam struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
}

返回结构是:
type GraphLastResp struct {
Endpoint string `json:”endpoint”`
Counter string `json:”counter”`
Value *RRDData `json:”value”`
}