feat: add network monitor hitory (#316) · 三网ping

* feat: add network monitor hitory

* fix: revert proto change and add indexStore

* fix: update monitor delete unuse monitor history

* fix: delete unuse monitor type

---------

Co-authored-by: LvGJ <lvgj1998@gmail.com>
This commit is contained in:
Ko no dio
2024-02-12 14:16:04 +08:00
committed by GitHub
parent c9bcba6f28
commit e8b8e59bd7
22 changed files with 1043 additions and 23 deletions

View File

@@ -1,9 +1,11 @@
package singleton
import (
"sync"
"time"
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/pkg/utils"
"sync"
)
var (
@@ -11,7 +13,10 @@ var (
UserIDToApiTokenList = make(map[uint64][]string)
ApiLock sync.RWMutex
ServerAPI = &ServerAPIService{}
ServerAPI = &ServerAPIService{}
MonitorAPI = &MonitorAPIService{}
once = &sync.Once{}
)
type ServerAPIService struct{}
@@ -51,6 +56,23 @@ type ServerInfoResponse struct {
Result []*CommonServerInfo `json:"result"`
}
type MonitorAPIService struct {
}
type MonitorInfoResponse struct {
CommonResponse
Result []*MonitorInfo `json:"result"`
}
type MonitorInfo struct {
MonitorID uint64 `json:"monitor_id"`
ServerID uint64 `json:"server_id"`
MonitorName string `json:"monitor_name"`
ServerName string `json:"server_name"`
CreatedAt []int64 `json:"created_at"`
AvgDelay []float32 `json:"avg_delay"`
}
func InitAPI() {
ApiTokenList = make(map[string]*model.ApiToken)
UserIDToApiTokenList = make(map[uint64][]string)
@@ -203,3 +225,45 @@ func (s *ServerAPIService) GetAllList() *ServerInfoResponse {
}
return res
}
func (m *MonitorAPIService) GetMonitorHistories(query map[string]any) *MonitorInfoResponse {
var (
resultMap = make(map[uint64]*MonitorInfo)
monitorHistories []*model.MonitorHistory
sortedMonitorIDs []uint64
)
res := &MonitorInfoResponse{
CommonResponse: CommonResponse{
Code: 0,
Message: "success",
},
}
if err := DB.Model(&model.MonitorHistory{}).Select("monitor_id, created_at, server_id, avg_delay").
Where(query).Where("created_at >= ?", time.Now().Add(-24*time.Hour)).Order("monitor_id, created_at").
Scan(&monitorHistories).Error; err != nil {
res.CommonResponse = CommonResponse{
Code: 500,
Message: err.Error(),
}
} else {
for _, history := range monitorHistories {
infos, ok := resultMap[history.MonitorID]
if !ok {
infos = &MonitorInfo{
MonitorID: history.MonitorID,
ServerID: history.ServerID,
MonitorName: ServiceSentinelShared.monitors[history.MonitorID].Name,
ServerName: ServerList[history.ServerID].Name,
}
resultMap[history.MonitorID] = infos
sortedMonitorIDs = append(sortedMonitorIDs, history.MonitorID)
}
infos.CreatedAt = append(infos.CreatedAt, history.CreatedAt.Truncate(time.Minute).Unix()*1000)
infos.AvgDelay = append(infos.AvgDelay, history.AvgDelay)
}
for _, monitorID := range sortedMonitorIDs {
res.Result = append(res.Result, resultMap[monitorID])
}
}
return res
}

View File

@@ -8,9 +8,10 @@ import (
"sync"
"time"
"github.com/nicksnyder/go-i18n/v2/i18n"
"github.com/naiba/nezha/model"
pb "github.com/naiba/nezha/proto"
"github.com/nicksnyder/go-i18n/v2/i18n"
)
const (
@@ -36,12 +37,13 @@ func NewServiceSentinel(serviceSentinelDispatchBus chan<- model.Monitor) {
ServiceSentinelShared = &ServiceSentinel{
serviceReportChannel: make(chan ReportData, 200),
serviceStatusToday: make(map[uint64]*_TodayStatsOfMonitor),
serviceCurrentStatusIndex: make(map[uint64]int),
serviceCurrentStatusIndex: make(map[uint64]*indexStore),
serviceCurrentStatusData: make(map[uint64][]*pb.TaskResult),
lastStatus: make(map[uint64]int),
serviceResponseDataStoreCurrentUp: make(map[uint64]uint64),
serviceResponseDataStoreCurrentDown: make(map[uint64]uint64),
serviceResponseDataStoreCurrentAvgDelay: make(map[uint64]float32),
serviceResponsePing: make(map[uint64]map[uint64]*pingStore),
monitors: make(map[uint64]*model.Monitor),
sslCertCache: make(map[uint64]string),
// 30天数据缓存
@@ -95,11 +97,12 @@ type ServiceSentinel struct {
serviceResponseDataStoreLock sync.RWMutex
serviceStatusToday map[uint64]*_TodayStatsOfMonitor // [monitor_id] -> _TodayStatsOfMonitor
serviceCurrentStatusIndex map[uint64]int // [monitor_id] -> 该监控ID对应的 serviceCurrentStatusData 的最新索引下标
serviceCurrentStatusIndex map[uint64]*indexStore // [monitor_id] -> 该监控ID对应的 serviceCurrentStatusData 的最新索引下标
serviceCurrentStatusData map[uint64][]*pb.TaskResult // [monitor_id] -> []model.MonitorHistory
serviceResponseDataStoreCurrentUp map[uint64]uint64 // [monitor_id] -> 当前服务在线计数
serviceResponseDataStoreCurrentDown map[uint64]uint64 // [monitor_id] -> 当前服务离线计数
serviceResponseDataStoreCurrentAvgDelay map[uint64]float32 // [monitor_id] -> 当前服务离线计数
serviceResponsePing map[uint64]map[uint64]*pingStore // [monitor_id] -> ClientID -> delay
lastStatus map[uint64]int
sslCertCache map[uint64]string
@@ -111,6 +114,16 @@ type ServiceSentinel struct {
monthlyStatus map[uint64]*model.ServiceItemResponse // [monitor_id] -> model.ServiceItemResponse
}
type indexStore struct {
index int
t time.Time
}
type pingStore struct {
count int
ping float32
}
func (ss *ServiceSentinel) refreshMonthlyServiceStatus() {
// 刷新数据防止无人访问
ss.LoadStats()
@@ -326,6 +339,34 @@ func (ss *ServiceSentinel) worker() {
continue
}
mh := r.Data
if mh.Type == model.TaskTypeTCPPing || mh.Type == model.TaskTypeICMPPing {
monitorTcpMap, ok := ss.serviceResponsePing[mh.GetId()]
if !ok {
monitorTcpMap = make(map[uint64]*pingStore)
ss.serviceResponsePing[mh.GetId()] = monitorTcpMap
}
ts, ok := monitorTcpMap[r.Reporter]
if !ok {
ts = &pingStore{}
}
ts.count++
ts.ping = (ts.ping*float32(ts.count-1) + mh.Delay) / float32(ts.count)
if ts.count == Conf.AvgPingCount {
if ts.ping > float32(Conf.MaxTCPPingValue) {
ts.ping = float32(Conf.MaxTCPPingValue)
}
ts.count = 0
if err := DB.Create(&model.MonitorHistory{
MonitorID: mh.GetId(),
AvgDelay: ts.ping,
Data: mh.Data,
ServerID: r.Reporter,
}).Error; err != nil {
log.Println("NEZHA>> 服务监控数据持久化失败:", err)
}
}
monitorTcpMap[r.Reporter] = ts
}
ss.serviceResponseDataStoreLock.Lock()
// 写入当天状态
if mh.Successful {
@@ -336,9 +377,20 @@ func (ss *ServiceSentinel) worker() {
} else {
ss.serviceStatusToday[mh.GetId()].Down++
}
currentTime := time.Now()
if ss.serviceCurrentStatusIndex[mh.GetId()] == nil {
ss.serviceCurrentStatusIndex[mh.GetId()] = &indexStore{
t: currentTime,
index: 0,
}
}
// 写入当前数据
ss.serviceCurrentStatusData[mh.GetId()][ss.serviceCurrentStatusIndex[mh.GetId()]] = mh
ss.serviceCurrentStatusIndex[mh.GetId()]++
if ss.serviceCurrentStatusIndex[mh.GetId()].t.Before(currentTime) {
ss.serviceCurrentStatusIndex[mh.GetId()].t = currentTime.Add(30 * time.Second)
ss.serviceCurrentStatusData[mh.GetId()][ss.serviceCurrentStatusIndex[mh.GetId()].index] = mh
ss.serviceCurrentStatusIndex[mh.GetId()].index++
}
// 更新当前状态
ss.serviceResponseDataStoreCurrentUp[mh.GetId()] = 0
@@ -365,8 +417,11 @@ func (ss *ServiceSentinel) worker() {
stateCode := GetStatusCode(upPercent)
// 数据持久化
if ss.serviceCurrentStatusIndex[mh.GetId()] == _CurrentStatusSize {
ss.serviceCurrentStatusIndex[mh.GetId()] = 0
if ss.serviceCurrentStatusIndex[mh.GetId()].index == _CurrentStatusSize {
ss.serviceCurrentStatusIndex[mh.GetId()] = &indexStore{
index: 0,
t: currentTime,
}
if err := DB.Create(&model.MonitorHistory{
MonitorID: mh.GetId(),
AvgDelay: ss.serviceResponseDataStoreCurrentAvgDelay[mh.GetId()],

View File

@@ -99,6 +99,10 @@ func RecordTransferHourlyUsage() {
func CleanMonitorHistory() {
// 清理已被删除的服务器的监控记录与流量记录
DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30))
// 由于网络监控记录的数据较多,并且前端仅使用了 1 天的数据
// 考虑到 sqlite 数据量问题,仅保留一天数据,
// server_id = 0 的数据会用于/service页面的可用性展示
DB.Unscoped().Delete(&model.MonitorHistory{}, "(created_at < ? AND server_id != 0) OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -1))
DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)")
// 计算可清理流量记录的时长
var allServerKeep time.Time