open-falcon 源码解析(二)

Judge 部分

1. Transfer 会把 MetaData 格式的数据改成 JudgeItem 格式之后再发送 Judge,  JudgeItem 格式是:

type JudgeItem struct {
Endpoint string `json:”endpoint”`
Metric string `json:”metric”`
Value float64 `json:”value”`
Timestamp int64 `json:”timestamp”`
JudgeType string `json:”judgeType”`
Tags map[string]string `json:”tags”`
}

JudgeType 是 MetaData  的 CounterType 值。

2. Judge 收到数据之后,放在一个大 Map 里,初始化如下:

8D316166-2447-467B-A9D7-8366DBFF4475

JudgeItemMap 结构如下:
type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}

M 的 key 是 Endpoint、Metric 和 Tags 的 md5,SafeLinkedList 中的元素则是 JudgeItem。

3. Judge 收到数据之后,以 md5 的前两位为 key 存在 HistoryBigMap 中,然后开始判断是否可以报警。在配置文件中有一个参数叫做 remain,用来定义一条报警数据(以 md5 区分)最多的条数,如果超过删除多余的旧数据。

4. 用于判断报警的 Strategy 结构如下:

type Strategy struct {
Id int `json:”id”`
Metric string `json:”metric”`
Tags map[string]string `json:”tags”`
Func string `json:”func”` // e.g. max(#3) all(#3)
Operator string `json:”operator”` // e.g. < !=
RightValue float64 `json:”rightValue”` // critical value
MaxStep int `json:”maxStep”`
Priority int `json:”priority”`
Note string `json:”note”`
Tpl *Template `json:”tpl”`
}
Func 类似 all(#3)、sum(#3)、avg(#10)、diff(#10),第一部分是说数据如何计算,数字是 limit 值,limit 是指用于判定的监控数据条数,少于则不判定;
MaxStep 表示最多报多少次警。

strategy 也保存在 map 里,以 Endpoint_Metric 为 key,判断是否报警的时候也要判断JudgeItem 和 Strategy 的 Tags 是否匹配,否则不处理。

判断报警主要根据 Func、Operator、RightValue,比如 Func 是 avg(#10),如果一个 key 的数据少于 10 就放弃,大于 10 则算出平均值,这个值叫做 leftValue,然后用 leftValue、Operator 和 rightValue 比较,如果成功则表示触发。

5. 然后会创建一个 event 来判断是否发送 event 到 alarm 的 redis,event 的格式是:

type Event struct {
Id string `json:”id”`
Strategy *Strategy `json:”strategy”`
Expression *Expression `json:”expression”`
Status string `json:”status”` // OK or PROBLEM
Endpoint string `json:”endpoint”`
LeftValue float64 `json:”leftValue”`
CurrentStep int `json:”currentStep”`
EventTime int64 `json:”eventTime”`
PushedTags map[string]string `json:”pushedTags”`
}

Id 用 s_strategy.Id_md5 标识。

如果此次 event 触发报警,而上一次 event 是 OK 的,则发送此次 event,CurrentStep 设置为1;如果报警次数已经大于等于 MaxStep,不发送 event;如果报警太频繁,间隔时间小于 MinInterval,不发送;否则发送,CurrentStep 加1。

如果此次 event 是 OK 的,而上次 event 是 PROBLEM,也发送。

发送 event 到 alarm redis,是使用 LPUSH 指令,默认 key 是event:p{Priority},Priority 是优先级( 取自 Event.Strategy.Priority 或者 Event.Expression.Priority)。

6. 除了上面的 Strategy 之外,还会同时执行 Expression 判断。

type Expression struct {
Id int `json:”id”`
Metric string `json:”metric”`
Tags map[string]string `json:”tags”`
Func string `json:”func”` // e.g. max(#3) all(#3)
Operator string `json:”operator”` // e.g. < !=
RightValue float64 `json:”rightValue”` // critical value
MaxStep int `json:”maxStep”`
Priority int `json:”priority”`
Note string `json:”note”`
ActionId int `json:”actionId”`
}

方式和 Strategy 基本类似,Strategy 倾向于基于机器和组的报警,Expression 则是自定义 tag 报警,更加灵活。

7. 会起一个 goroutine 不断从 Hbs 同步 Strategy 和 Expression。

8. 还会起一个 goroutine 清理 JudgeItem 数据,防止占用太多内存,默认清理 7 天前的数据。

 

alarm 部分

1. alarm 在配置文件中定义了 highQueues 和 lowQueues 两个 redis 队列,前者是高优先级,后者是低优先级。对于 highQueues 和 lowQueues 都会起一个 goroutine,读取这些队列中的 event,使用的指令如下:

BRPOP  highQueue1 highQueue2 highQueue3 … 0

如果前面的队列有数据则返回,如果没有则按照顺序检查下一个队列,0 表示无限的等待下去( 如果是正数表示等待超时时间 )。

一个 BRPOP 指令会读取一个 event,然后去消费。

2. 消费的过程,先取出 event 的 actionId,actionId 为 Event.Expression.ActionId 或者 Event.Strategy.Tpl.ActionId,然后根据 actionId 去 Portal 服务取出 action,action 的格式为:

type Action struct {
Id int `json:”id”`
Uic string `json:”uic”`
Url string `json:”url”`
Callback int `json:”callback”`
BeforeCallbackSms int `json:”before_callback_sms”`
BeforeCallbackMail int `json:”before_callback_mail”`
AfterCallbackSms int `json:”after_callback_sms”`
AfterCallbackMail int `json:”after_callback_mail”`
}

如果 action.Callback 为 1,则先去调用 callback,Action 定义了是否在 callback 之前或之后发送短信或邮件,有四个变量表示:BeforeCallbackSms、BeforeCallbackMail、AfterCallbackSms、AfterCallbackMail,Action.Url 就是要 callback 的地址。

如果在 callback 之前或者之后发送短信或邮件,根据 Action.Uic (teams) 去 Uic 服务请求手机和邮箱,然后写入( LPUSH )短信或者邮件的 redis 队列,短信和邮箱的格式分别是:
type Sms struct {
Tos string `json:”tos”`
Content string `json:”content”`
}

type Mail struct {
Tos string `json:”tos”`
Subject string `json:”subject”`
Content string `json:”content”`
}

3. 处理完 callback 的逻辑之后,才开始真正的消费,对于高优先级队列,直接写入短信和邮件发送队列(对于短信代码里写了当 Priority < 3 才发送),对于低优先级队列,做合并处理(对于短信当 Priority < 3 才合并)。

合并之前先把 event 转换成 SmsDto 和 MailDto 格式,然后分别扔进 UserSmsQueue 和 UserMailQueue 队列(还是通过 LPUSH )。

type SmsDto struct {
Priority int `json:”priority”`
Metric string `json:”metric”`
Content string `json:”content”`
Phone string `json:”phone”`
Status string `json:”status”`
}

type MailDto struct {
Priority int `json:”priority”`
Metric string `json:”metric”`
Subject string `json:”subject”`
Content string `json:”content”`
Email string `json:”email”`
Status string `json:”status”`
}

4. 然后起两个 goroutine,分别合并 Sms 和 Mail,合并间隔是 1 分钟。

UserSmsQueue 和 UserMailQueue 的合并规则类似,以 UserSmsQueue 举例:

1). 先 pop 出 UserSmsQueue 中的所有数据;

2). 以 Priority、Status、Phone、Metric 为 key,data 为 []*SmsDto。

如果 data 长度为 1,写入短信发送队列;如果 data > 1,则把数据发送到 Links 服务,以便提供一个可以查看的 url,然后把 Priority、Status、Metric、url、 data 长度等信息写入短信发送队列。

 

sender 部分

sender 非常简单,它从 alarm 写入的短信和邮件发送队列读取数据,发送短信和邮件。

配置文件中要指定短信和邮件的发送  api,sender 使用  post 方法来调用 api,短信 api  接受的参数需要是:tos 和 content,邮件 api 接受的参数需要是:tos、subject 和 content。

sender 使用 带缓存的 channel 来控制发送并发量。

 

Heartbeat Server 部分

也就是 hbs – 心跳服务器,它其实是个缓存服务器,从 Mysql 中同步数据放在内存中,供其他组件读取。

1. 从 Mysql 同步的数据分布放在下面的结构中:

GroupPlugins
GroupTemplates
HostGroupsMap
HostMap
TemplateCache
Strategies
HostTemplateIds
ExpressionCache
MonitoredHosts

2. Agent 会通过 rpc 上报机器信息给 hbs,hbs 会把机器信息存入 Mysql host 表中,同时打上时间戳放入 Agents 内存变量中。额外的,hbs 起一个 goroutine 每天清理一次数据,一天之内没上报的机器信息会被删除,考虑一下机器改名的情况,由于 Agents 以 hostname 为 key,新 hostname 会被上报,而老 hostname 的数据在一天之后被删除;

3. hbs 还给 Agent 提供 插件、信任 IP 列表 和 BuiltinMetrics( 包括 net.port.listen、proc.num、du.bs、url.check.health 四项);

4. hbs 还负责给 Judge 提供 Strategy 和 Expressions。

Strategy 的逻辑还挺复杂的,得说说:

1). 首先拿到 host 和 Template id 对应关系:hidTids;

2). 拿到 host  列表:hosts,包括 hostId 和 hostname;

3). 拿到 Template 信息:tpls;

4). 拿到 Strategy 信息:strategies;

5). 计算出 Template 对应的 Strategy:tpl2Strategies,一个 Template 可以被很多个 Strategy 使用;

6). 然后对 hidTids 中的每一个 hostId 获取它的 Strategy,hostId 对应一个 tplIds( []int{} ),也就是一台机器可以对应多个 Template;

7). 开始处理 tplIds,先获取 tplIds 中每一个 tplId 的 Parent 以及 Parent 的 Parent (包括 tplId 自己,递归处理),tplId 和 Parent 们会排序,从根 Parent 开始,到 tplId 结束,放在 tpl_buckets 中( [][]int{} );

8). 处理 tpl_buckets,有继承关系的放在同一个 bucket 中(就是有继承的话取最长的),比如

11123ou23213

会变成

23432423oj1

放入 uniq_tpl_buckets 中( [][]int{} )。

9). 对 uniq_tpl_buckets 中的每一个 bucket 做处理:bucket 中的 每个 tid 都是从父模板到子模板顺序,取得每个 tid 的 strategies,每个 Strategy 以 metric:{metric}/tags:{tags} 为 key 存储,然后根据 key 可以使子模板覆盖父模板,覆盖之后拿到 Strategy 列表,列表中的 Strategy 的 Strategy.Tpl 再设置成这个 bucket 的最新( 最后 ) tid 模板。

把每个 bucket 合起来就是一个 hostId 的 Strategy 了,所有 hostId 合起来就是所有的 Strategy 了。

 

Portal 部分

Portal 用来配置报警策略,它的建库 SQL 是整个 open-falcon 的核心,文件是:scripts/schema.sql, Heartbeat Server 从 Portal 数据库读取数据,如果不看 Portal 只看 Heartbeat Server 可能不好理解。

schema.sql 包括的表有:
host
grp
grp_host
tpl
strategy
expression
grp_tpl
plugin_dir
action
cluster
mockcfg

它们的关系:机器可以放入机器组,机器组和模板绑定,模板可以继承;策略和模板对应,一个模板可以对应多个策略,事实上基于模板来查询策略(子模块会覆盖父模板的策略);模板会绑定 action_id,action 中包含 用户手机号和邮箱、callback 等信息;额外的,机器组也会绑定插件目录;对于 Expression,直接和 action 关联。

 

 

总结一下,open-falcon 是一套挺复杂的监控系统,其中我觉得 Graph 和 Judge 最复杂。如果我用起来,可以优化的点:

1. Portal 中的 host 表可以不使用,机器信息通过 机器管理系统 获取;

2. 每个组件配置文件中的机器列表,最好通过 zookeeper 来动态获取;

3. 目前使用了 rrd 存储,是否可以尝试下 opentsdb 或者 influxdb。

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注