mirror of
https://github.com/Buriburizaem0n/nezha_domains.git
synced 2026-02-05 21:20:06 +00:00
✨ dashboard: 服务监控请求时间间隔
This commit is contained in:
@@ -204,6 +204,7 @@ type monitorForm struct {
|
||||
Cover uint8
|
||||
Notify string
|
||||
SkipServersRaw string
|
||||
Duration uint64
|
||||
}
|
||||
|
||||
func (ma *memberAPI) addOrEditMonitor(c *gin.Context) {
|
||||
@@ -218,6 +219,7 @@ func (ma *memberAPI) addOrEditMonitor(c *gin.Context) {
|
||||
m.SkipServersRaw = mf.SkipServersRaw
|
||||
m.Cover = mf.Cover
|
||||
m.Notify = mf.Notify == "on"
|
||||
m.Duration = mf.Duration
|
||||
}
|
||||
if err == nil {
|
||||
if m.ID == 0 {
|
||||
@@ -226,14 +228,15 @@ func (ma *memberAPI) addOrEditMonitor(c *gin.Context) {
|
||||
err = dao.DB.Save(&m).Error
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
err = dao.ServiceSentinelShared.OnMonitorUpdate(m)
|
||||
}
|
||||
if err != nil {
|
||||
c.JSON(http.StatusOK, model.Response{
|
||||
Code: http.StatusBadRequest,
|
||||
Message: fmt.Sprintf("请求错误:%s", err),
|
||||
})
|
||||
return
|
||||
} else {
|
||||
dao.ServiceSentinelShared.OnMonitorUpdate()
|
||||
}
|
||||
c.JSON(http.StatusOK, model.Response{
|
||||
Code: http.StatusOK,
|
||||
|
||||
@@ -17,7 +17,11 @@ import (
|
||||
"github.com/naiba/nezha/service/dao"
|
||||
)
|
||||
|
||||
var serviceSentinelDispatchBus chan model.Monitor
|
||||
|
||||
func init() {
|
||||
serviceSentinelDispatchBus = make(chan model.Monitor)
|
||||
|
||||
shanghai, err := time.LoadLocation("Asia/Shanghai")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@@ -55,7 +59,7 @@ func initSystem() {
|
||||
dao.DB.AutoMigrate(model.Server{}, model.User{},
|
||||
model.Notification{}, model.AlertRule{}, model.Monitor{},
|
||||
model.MonitorHistory{}, model.Cron{}, model.Transfer{})
|
||||
dao.NewServiceSentinel()
|
||||
dao.NewServiceSentinel(serviceSentinelDispatchBus)
|
||||
|
||||
loadServers() //加载服务器列表
|
||||
loadCrons() //加载计划任务
|
||||
@@ -65,6 +69,7 @@ func initSystem() {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 流量记录打点
|
||||
_, err = dao.Cron.AddFunc("0 * * * *", recordTransferHourlyUsage)
|
||||
if err != nil {
|
||||
@@ -173,7 +178,7 @@ func loadCrons() {
|
||||
func main() {
|
||||
cleanMonitorHistory()
|
||||
go rpc.ServeRPC(dao.Conf.GRPCPort)
|
||||
go rpc.DispatchTask(time.Second * 30)
|
||||
go rpc.DispatchTask(serviceSentinelDispatchBus)
|
||||
go dao.AlertSentinelStart()
|
||||
srv := controller.ServeWeb(dao.Conf.HTTPPort)
|
||||
graceful.Graceful(func() error {
|
||||
|
||||
@@ -3,7 +3,6 @@ package rpc
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
@@ -25,41 +24,36 @@ func ServeRPC(port uint) {
|
||||
server.Serve(listen)
|
||||
}
|
||||
|
||||
func DispatchTask(duration time.Duration) {
|
||||
var index uint64 = 0
|
||||
for {
|
||||
var hasAliveAgent bool
|
||||
tasks := dao.ServiceSentinelShared.Monitors()
|
||||
func DispatchTask(serviceSentinelDispatchBus <-chan model.Monitor) {
|
||||
workedServerIndex := 0
|
||||
for task := range serviceSentinelDispatchBus {
|
||||
round := 0
|
||||
prevIndex := workedServerIndex
|
||||
dao.SortedServerLock.RLock()
|
||||
startedAt := time.Now()
|
||||
for i := 0; i < len(tasks); i++ {
|
||||
if index >= uint64(len(dao.SortedServerList)) {
|
||||
index = 0
|
||||
if !hasAliveAgent {
|
||||
break
|
||||
}
|
||||
hasAliveAgent = false
|
||||
}
|
||||
|
||||
// 1. 如果服务器不在线,跳过这个服务器
|
||||
if dao.SortedServerList[index].TaskStream == nil {
|
||||
i--
|
||||
index++
|
||||
// 如果已经轮了一整圈没有合适机器去请求,跳出循环
|
||||
for round == 0 && prevIndex != workedServerIndex {
|
||||
// 如果到了圈尾,再回到圈头,圈数加一,游标重置
|
||||
if workedServerIndex == len(dao.SortedServerList) {
|
||||
workedServerIndex = 0
|
||||
round++
|
||||
continue
|
||||
}
|
||||
// 2. 如果此任务不可使用此服务器请求,跳过这个服务器(有些 IPv6 only 开了 NAT64 的机器请求 IPv4 总会出问题)
|
||||
if (tasks[i].Cover == model.MonitorCoverAll && tasks[i].SkipServers[dao.SortedServerList[index].ID]) ||
|
||||
(tasks[i].Cover == model.MonitorCoverIgnoreAll && !tasks[i].SkipServers[dao.SortedServerList[index].ID]) {
|
||||
i--
|
||||
index++
|
||||
// 如果服务器不在线,跳过这个服务器
|
||||
if dao.SortedServerList[workedServerIndex].TaskStream == nil {
|
||||
workedServerIndex++
|
||||
continue
|
||||
}
|
||||
|
||||
hasAliveAgent = true
|
||||
dao.SortedServerList[index].TaskStream.Send(tasks[i].PB())
|
||||
index++
|
||||
// 如果此任务不可使用此服务器请求,跳过这个服务器(有些 IPv6 only 开了 NAT64 的机器请求 IPv4 总会出问题)
|
||||
if (task.Cover == model.MonitorCoverAll && task.SkipServers[dao.SortedServerList[workedServerIndex].ID]) ||
|
||||
(task.Cover == model.MonitorCoverIgnoreAll && !task.SkipServers[dao.SortedServerList[workedServerIndex].ID]) {
|
||||
workedServerIndex++
|
||||
continue
|
||||
}
|
||||
// 找到合适机器执行任务,跳出循环
|
||||
dao.SortedServerList[workedServerIndex].TaskStream.Send(task.PB())
|
||||
workedServerIndex++
|
||||
break
|
||||
}
|
||||
dao.SortedServerLock.RUnlock()
|
||||
time.Sleep(time.Until(startedAt.Add(duration)))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user