WIP: 补全各模块的通知分组设置

This commit is contained in:
Akkia
2022-04-15 03:13:53 +08:00
parent 27cd794142
commit 322467673f
11 changed files with 103 additions and 73 deletions

View File

@@ -21,11 +21,13 @@ type NotificationHistory struct {
}
// 报警规则
var AlertsLock sync.RWMutex
var Alerts []*model.AlertRule
var alertsStore map[uint64]map[uint64][][]interface{} // [alert_id][server_id] -> 对应报警规则的检查结果
var alertsPrevState map[uint64]map[uint64]uint // [alert_id][server_id] -> 对应报警规则的上一次报警状态
var AlertsCycleTransferStatsStore map[uint64]*model.CycleTransferStats // [alert_id] -> 对应报警规则的周期流量统计
var (
AlertsLock sync.RWMutex
Alerts []*model.AlertRule
alertsStore map[uint64]map[uint64][][]interface{} // [alert_id][server_id] -> 对应报警规则的检查结果
alertsPrevState map[uint64]map[uint64]uint // [alert_id][server_id] -> 对应报警规则的上一次报警状态
AlertsCycleTransferStatsStore map[uint64]*model.CycleTransferStats // [alert_id] -> 对应报警规则的周期流量统计
)
// addCycleTransferStatsInfo 向AlertsCycleTransferStatsStore中添加周期流量报警统计信息
func addCycleTransferStatsInfo(alert *model.AlertRule) {
@@ -62,10 +64,15 @@ func AlertSentinelStart() {
if err := DB.Find(&Alerts).Error; err != nil {
panic(err)
}
for i := 0; i < len(Alerts); i++ {
alertsStore[Alerts[i].ID] = make(map[uint64][][]interface{})
alertsPrevState[Alerts[i].ID] = make(map[uint64]uint)
addCycleTransferStatsInfo(Alerts[i])
for _, alert := range Alerts {
// 旧版本可能不存在通知组 为其添加默认值
if alert.NotificationTag == "" {
alert.NotificationTag = "default"
DB.Save(alert)
}
alertsStore[alert.ID] = make(map[uint64][][]interface{})
alertsPrevState[alert.ID] = make(map[uint64]uint)
addCycleTransferStatsInfo(alert)
}
AlertsLock.Unlock()
@@ -143,11 +150,11 @@ func checkStatus() {
if !passed {
alertsPrevState[alert.ID][server.ID] = _RuleCheckFail
message := fmt.Sprintf("[主机故障] %s(%s) 规则:%s", server.Name, IPDesensitize(server.Host.IP), alert.Name)
go SendNotification(message, true)
go SendNotification(alert.NotificationTag, message, true)
} else {
if alertsPrevState[alert.ID][server.ID] == _RuleCheckFail {
message := fmt.Sprintf("[主机恢复] %s(%s) 规则:%s", server.Name, IPDesensitize(server.Host.IP), alert.Name)
go SendNotification(message, true)
go SendNotification(alert.NotificationTag, message, true)
}
alertsPrevState[alert.ID][server.ID] = _RuleCheckPass
}

View File

@@ -35,12 +35,12 @@ func LoadNotifications() {
if err := DB.Find(&notifications).Error; err != nil {
panic(err)
}
for _, n := range notifications {
for i := range notifications {
// 旧版本的Tag可能不存在 自动设置为默认值
if n.Tag == "" {
SetDefaultNotificationTagInDB(&n)
if notifications[i].Tag == "" {
SetDefaultNotificationTagInDB(&notifications[i])
}
AddNotificationToList(&n)
AddNotificationToList(&notifications[i])
}
}
@@ -70,23 +70,16 @@ func OnRefreshOrAddNotification(n *model.Notification) {
// AddNotificationToList 添加通知方式到map中
func AddNotificationToList(n *model.Notification) {
notificationsLock.Lock()
defer notificationsLock.Unlock()
// 当前 Tag 不存在,创建对应该 Tag 的 子 map 后再添加
if _, ok := NotificationList[n.Tag]; !ok {
NotificationList[n.Tag] = make(map[uint64]*model.Notification)
}
NotificationList[n.Tag][n.ID] = n
NotificationIDToTag[n.ID] = n.Tag
}
// UpdateNotificationInList 在 map 中更新通知方式
func UpdateNotificationInList(n *model.Notification) {
notificationsLock.Lock()
defer notificationsLock.Unlock()
NotificationList[n.Tag][n.ID] = n
}
@@ -137,10 +130,14 @@ func SendNotification(notificationTag string, desc string, mutable bool) {
// 向该通知方式组的所有通知方式发出通知
notificationsLock.RLock()
defer notificationsLock.RUnlock()
for _, n := range NotificationList[notificationTag] {
log.Println("尝试通知", n.Name)
}
for _, n := range NotificationList[notificationTag] {
if err := n.Send(desc); err != nil {
log.Println("NEZHA>> 发送通知失败:", err)
log.Println("NEZHA>> 向 ", n.Name, " 发送通知失败:", err)
} else {
log.Println("NEZHA>> 向 ", n.Name, " 发送通知成功:")
}
}
}

View File

@@ -149,18 +149,23 @@ func (ss *ServiceSentinel) loadMonitorHistory() {
var err error
ss.monitorsLock.Lock()
defer ss.monitorsLock.Unlock()
for i := 0; i < len(monitors); i++ {
task := *monitors[i]
for _, monitor := range monitors {
// 旧版本可能不存在通知组 为其设置默认组
if monitor.NotificationTag == "" {
monitor.NotificationTag = "default"
DB.Save(monitor)
}
task := *monitor
// 通过cron定时将服务监控任务传递给任务调度管道
monitors[i].CronJobID, err = Cron.AddFunc(task.CronSpec(), func() {
monitor.CronJobID, err = Cron.AddFunc(task.CronSpec(), func() {
ss.dispatchBus <- task
})
if err != nil {
panic(err)
}
ss.monitors[monitors[i].ID] = monitors[i]
ss.serviceCurrentStatusData[monitors[i].ID] = make([]model.MonitorHistory, _CurrentStatusSize)
ss.serviceStatusToday[monitors[i].ID] = &_TodayStatsOfMonitor{}
ss.monitors[monitor.ID] = monitor
ss.serviceCurrentStatusData[monitor.ID] = make([]model.MonitorHistory, _CurrentStatusSize)
ss.serviceStatusToday[monitor.ID] = &_TodayStatsOfMonitor{}
}
year, month, day := time.Now().Date()
@@ -356,7 +361,7 @@ func (ss *ServiceSentinel) worker() {
isNeedSendNotification := (ss.lastStatus[mh.MonitorID] != "" || stateStr == "故障") && ss.monitors[mh.MonitorID].Notify
ss.lastStatus[mh.MonitorID] = stateStr
if isNeedSendNotification {
go SendNotification(fmt.Sprintf("[服务%s] %s", stateStr, ss.monitors[mh.MonitorID].Name), true)
go SendNotification(ss.monitors[mh.MonitorID].NotificationTag, fmt.Sprintf("[服务%s] %s", stateStr, ss.monitors[mh.MonitorID].Name), true)
}
ss.monitorsLock.RUnlock()
}
@@ -400,7 +405,7 @@ func (ss *ServiceSentinel) worker() {
if errMsg != "" {
ss.monitorsLock.RLock()
if ss.monitors[mh.MonitorID].Notify {
go SendNotification(fmt.Sprintf("[SSL] %s %s", ss.monitors[mh.MonitorID].Name, errMsg), true)
go SendNotification(ss.monitors[mh.MonitorID].NotificationTag, fmt.Sprintf("[SSL] %s %s", ss.monitors[mh.MonitorID].Name, errMsg), true)
}
ss.monitorsLock.RUnlock()
}

View File

@@ -107,22 +107,22 @@ func CleanMonitorHistory() {
var specialServerIDs []uint64
var alerts []model.AlertRule
DB.Find(&alerts)
for i := 0; i < len(alerts); i++ {
for j := 0; j < len(alerts[i].Rules); j++ {
for _, alert := range alerts {
for _, rule := range alert.Rules {
// 是不是流量记录规则
if !alerts[i].Rules[j].IsTransferDurationRule() {
if !rule.IsTransferDurationRule() {
continue
}
dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart()
dataCouldRemoveBefore := rule.GetTransferDurationStart()
// 判断规则影响的机器范围
if alerts[i].Rules[j].Cover == model.RuleCoverAll {
if rule.Cover == model.RuleCoverAll {
// 更新全局可以清理的数据点
if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) {
allServerKeep = dataCouldRemoveBefore
}
} else {
// 更新特定机器可以清理数据点
for id := range alerts[i].Rules[j].Ignore {
for id := range rule.Ignore {
if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) {
specialServerKeep[id] = dataCouldRemoveBefore
specialServerIDs = append(specialServerIDs, id)