💥 v2.0 必须更新面板,新增服务监控

This commit is contained in:
naiba
2021-01-16 00:45:49 +08:00
parent 0ce8017875
commit a41c792577
38 changed files with 1015 additions and 453 deletions

View File

@@ -2,13 +2,18 @@ package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/blang/semver"
"github.com/genkiroid/cert"
"github.com/go-ping/ping"
"github.com/p14yground/go-github-selfupdate/selfupdate"
"github.com/spf13/cobra"
"google.golang.org/grpc"
@@ -105,7 +110,6 @@ func run(cmd *cobra.Command, args []string) {
var err error
var conn *grpc.ClientConn
var hc pb.NezhaService_HeartbeatClient
retry := func() {
log.Println("Error to close connection ...")
@@ -125,43 +129,90 @@ func run(cmd *cobra.Command, args []string) {
}
client = pb.NewNezhaServiceClient(conn)
// 第一步注册
_, err = client.Register(ctx, monitor.GetHost().PB())
_, err = client.ReportSystemInfo(ctx, monitor.GetHost().PB())
if err != nil {
log.Printf("client.Register err: %v", err)
log.Printf("client.ReportSystemInfo err: %v", err)
retry()
continue
}
// 心跳接收控制命令
hc, err = client.Heartbeat(ctx, &pb.Beat{
Timestamp: fmt.Sprintf("%v", time.Now()),
})
// 执行 Task
tasks, err := client.RequestTask(ctx, monitor.GetHost().PB())
if err != nil {
log.Printf("client.Heartbeat err: %v", err)
log.Printf("client.RequestTask err: %v", err)
retry()
continue
}
err = receiveCommand(hc)
err = receiveTasks(tasks)
log.Printf("receiveCommand exit to main: %v", err)
retry()
}
}
func receiveCommand(hc pb.NezhaService_HeartbeatClient) error {
func receiveTasks(tasks pb.NezhaService_RequestTaskClient) error {
var err error
var action *pb.Command
defer log.Printf("receiveCommand exit %v %v => %v", time.Now(), action, err)
var task *pb.Task
defer log.Printf("receiveTasks exit %v %v => %v", time.Now(), task, err)
for {
action, err = hc.Recv()
if err == io.EOF {
return nil
}
task, err = tasks.Recv()
if err != nil {
return err
}
switch action.GetType() {
var result pb.TaskResult
result.Id = task.GetId()
switch task.GetType() {
case model.MonitorTypeHTTPGET:
start := time.Now()
resp, err := http.Get(task.GetData())
if err == nil {
result.Delay = float32(time.Now().Sub(start).Microseconds()) / 1000.0
if resp.StatusCode > 299 || resp.StatusCode < 200 {
err = errors.New("\n应用错误" + resp.Status)
}
}
var certs cert.Certs
if err == nil {
if strings.HasPrefix(task.GetData(), "https://") {
certs, err = cert.NewCerts([]string{task.GetData()})
}
}
if err == nil {
if len(certs) == 0 {
err = errors.New("\n获取SSL证书错误未获取到证书")
}
}
if err == nil {
result.Data = certs[0].Issuer
result.Successful = true
} else {
result.Data = err.Error()
}
case model.MonitorTypeICMPPing:
pinger, err := ping.NewPinger(task.GetData())
if err == nil {
pinger.Count = 10
err = pinger.Run() // Blocks until finished.
}
if err == nil {
stat := pinger.Statistics()
result.Delay = float32(stat.AvgRtt.Microseconds()) / 1000.0
result.Successful = true
} else {
result.Data = err.Error()
}
case model.MonitorTypeTCPPing:
start := time.Now()
conn, err := net.DialTimeout("tcp", task.GetData(), time.Second*10)
if err == nil {
conn.Close()
result.Delay = float32(time.Now().Sub(start).Microseconds()) / 1000.0
result.Successful = true
} else {
result.Data = err.Error()
}
default:
log.Printf("Unknown action: %v", action)
log.Printf("Unknown action: %v", task)
}
client.ReportTask(ctx, &result)
}
}
@@ -172,14 +223,14 @@ func reportState() {
for {
if client != nil {
monitor.TrackNetworkSpeed()
_, err = client.ReportState(ctx, monitor.GetState(dao.ReportDelay).PB())
_, err = client.ReportSystemState(ctx, monitor.GetState(dao.ReportDelay).PB())
if err != nil {
log.Printf("reportState error %v", err)
time.Sleep(delayWhenError)
}
if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) {
lastReportHostInfo = time.Now()
client.Register(ctx, monitor.GetHost().PB())
client.ReportSystemInfo(ctx, monitor.GetHost().PB())
}
}
}

View File

@@ -20,9 +20,64 @@ func (cp *commonPage) serve() {
cr := cp.r.Group("")
cr.Use(mygin.Authorize(mygin.AuthorizeOption{}))
cr.GET("/", cp.home)
cr.GET("/service", cp.service)
cr.GET("/ws", cp.ws)
}
type ServiceItem struct {
Monitor model.Monitor
TotalUp uint64
TotalDown uint64
Delay *[30]float32
Up *[30]int
Down *[30]int
}
func (p *commonPage) service(c *gin.Context) {
var ms []model.Monitor
dao.DB.Find(&ms)
year, month, day := time.Now().Date()
today := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
var mhs []model.MonitorHistory
dao.DB.Where("created_at >= ?", today.AddDate(0, 0, -29)).Find(&mhs)
msm := make(map[uint64]*ServiceItem)
for i := 0; i < len(ms); i++ {
msm[ms[i].ID] = &ServiceItem{
Monitor: ms[i],
Delay: &[30]float32{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Up: &[30]int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Down: &[30]int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
}
}
// 整合数据
for i := 0; i < len(mhs); i++ {
dayIndex := 29
if mhs[i].CreatedAt.Before(today) {
dayIndex = 28 - (int(today.Sub(mhs[i].CreatedAt).Hours()) / 24)
}
if mhs[i].Successful {
msm[mhs[i].MonitorID].TotalUp++
msm[mhs[i].MonitorID].Delay[dayIndex] = (msm[mhs[i].MonitorID].Delay[dayIndex]*float32(msm[mhs[i].MonitorID].Up[dayIndex]) + mhs[i].Delay) / float32(msm[mhs[i].MonitorID].Up[dayIndex]+1)
msm[mhs[i].MonitorID].Up[dayIndex]++
} else {
msm[mhs[i].MonitorID].TotalDown++
msm[mhs[i].MonitorID].Down[dayIndex]++
}
}
u, ok := c.Get(model.CtxKeyAuthorizedUser)
data := mygin.CommonEnvironment(c, gin.H{
"Title": "服务状态",
"Services": msm,
})
if ok {
data["Admin"] = u
}
c.HTML(http.StatusOK, "theme-"+dao.Conf.Site.Theme+"/service", data)
}
func (cp *commonPage) home(c *gin.Context) {
dao.ServerLock.RLock()
defer dao.ServerLock.RUnlock()

View File

@@ -13,7 +13,6 @@ import (
"github.com/naiba/nezha/service/dao"
)
// ServeWeb ..
func ServeWeb(port uint) {
gin.SetMode(gin.ReleaseMode)
if dao.Conf.Debug {
@@ -43,6 +42,35 @@ func ServeWeb(port uint) {
"ts": func(s string) string {
return strings.TrimSpace(s)
},
"divU64": func(a, b uint64) float32 {
if b == 0 {
if a > 0 {
return 100
}
return 0
}
return float32(a) / float32(b) * 100
},
"div": func(a, b int) float32 {
if b == 0 {
if a > 0 {
return 100
}
return 0
}
return float32(a) / float32(b) * 100
},
"addU64": func(a, b uint64) uint64 {
return a + b
},
"add": func(a, b int) int {
return a + b
},
"dayBefore": func(i int) string {
year, month, day := time.Now().Date()
today := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
return today.AddDate(0, 0, i-29).Format("1月2号")
},
})
r.Static("/static", "resource/static")
r.LoadHTMLGlob("resource/template/**/*")

View File

@@ -33,6 +33,7 @@ func (ma *memberAPI) serve() {
mr.POST("/logout", ma.logout)
mr.POST("/server", ma.addOrEditServer)
mr.POST("/monitor", ma.addOrEditMonitor)
mr.POST("/notification", ma.addOrEditNotification)
mr.POST("/alert-rule", ma.addOrEditAlertRule)
mr.POST("/setting", ma.updateSetting)
@@ -64,6 +65,8 @@ func (ma *memberAPI) delete(c *gin.Context) {
if err == nil {
alertmanager.OnDeleteNotification(id)
}
case "monitor":
err = dao.DB.Delete(&model.Monitor{}, "id = ?", id).Error
case "alert-rule":
err = dao.DB.Delete(&model.AlertRule{}, "id = ?", id).Error
if err == nil {
@@ -125,7 +128,7 @@ func (ma *memberAPI) addOrEditServer(c *gin.Context) {
s.State = dao.ServerList[s.ID].State
} else {
s.Host = &model.Host{}
s.State = &model.State{}
s.State = &model.HostState{}
}
dao.ServerList[s.ID] = &s
dao.ReSortServer()
@@ -134,6 +137,42 @@ func (ma *memberAPI) addOrEditServer(c *gin.Context) {
})
}
type monitorForm struct {
ID uint64
Name string
Target string
Type uint8
}
func (ma *memberAPI) addOrEditMonitor(c *gin.Context) {
var mf monitorForm
var m model.Monitor
err := c.ShouldBindJSON(&mf)
if err == nil {
m.Name = mf.Name
m.Target = mf.Target
m.Type = mf.Type
m.ID = mf.ID
}
if err == nil {
if m.ID == 0 {
err = dao.DB.Create(&m).Error
} else {
err = dao.DB.Save(&m).Error
}
}
if err != nil {
c.JSON(http.StatusOK, model.Response{
Code: http.StatusBadRequest,
Message: fmt.Sprintf("请求错误:%s", err),
})
return
}
c.JSON(http.StatusOK, model.Response{
Code: http.StatusOK,
})
}
type notificationForm struct {
ID uint64
Name string

View File

@@ -23,6 +23,7 @@ func (mp *memberPage) serve() {
Redirect: "/login",
}))
mr.GET("/server", mp.server)
mr.GET("/monitor", mp.monitor)
mr.GET("/notification", mp.notification)
mr.GET("/setting", mp.setting)
}
@@ -36,6 +37,15 @@ func (mp *memberPage) server(c *gin.Context) {
}))
}
func (mp *memberPage) monitor(c *gin.Context) {
var monitors []model.Monitor
dao.DB.Find(&monitors)
c.HTML(http.StatusOK, "dashboard/monitor", mygin.CommonEnvironment(c, gin.H{
"Title": "服务监控",
"Monitors": monitors,
}))
}
func (mp *memberPage) notification(c *gin.Context) {
var nf []model.Notification
dao.DB.Find(&nf)

View File

@@ -34,14 +34,16 @@ func init() {
}
func initDB() {
dao.DB.AutoMigrate(model.Server{}, model.User{}, model.Notification{}, model.AlertRule{})
dao.DB.AutoMigrate(model.Server{}, model.User{},
model.Notification{}, model.AlertRule{}, model.Monitor{},
model.MonitorHistory{})
// load cache
var servers []model.Server
dao.DB.Find(&servers)
for _, s := range servers {
innerS := s
innerS.Host = &model.Host{}
innerS.State = &model.State{}
innerS.State = &model.HostState{}
dao.ServerList[innerS.ID] = &innerS
}
dao.ReSortServer()
@@ -50,5 +52,6 @@ func initDB() {
func main() {
go controller.ServeWeb(dao.Conf.HTTPPort)
go rpc.ServeRPC(5555)
go rpc.DispatchTask(time.Minute * 10)
alertmanager.Start()
}

View File

@@ -2,15 +2,18 @@ package rpc
import (
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
"github.com/naiba/nezha/model"
pb "github.com/naiba/nezha/proto"
"github.com/naiba/nezha/service/dao"
rpcService "github.com/naiba/nezha/service/rpc"
)
// ServeRPC ...
func ServeRPC(port uint) {
server := grpc.NewServer()
pb.RegisterNezhaServiceServer(server, &rpcService.NezhaHandler{
@@ -22,3 +25,34 @@ func ServeRPC(port uint) {
}
server.Serve(listen)
}
func DispatchTask(duration time.Duration) {
var index uint64 = 0
for {
var tasks []model.Monitor
var hasAliveAgent bool
dao.DB.Find(&tasks)
dao.ServerLock.RLock()
for i := 0; i < len(tasks); i++ {
if index >= uint64(len(dao.SortedServerList)) {
index = 0
if !hasAliveAgent {
break
}
hasAliveAgent = false
}
if dao.SortedServerList[index].TaskStream == nil {
i--
index++
continue
}
hasAliveAgent = true
log.Println("DispatchTask 确认派发 >>>>>", i, index)
dao.SortedServerList[index].TaskStream.Send(tasks[i].PB())
log.Println("DispatchTask 确认派发 <<<<<", i, index)
index++
}
dao.ServerLock.RUnlock()
time.Sleep(duration)
}
}

View File

@@ -1,13 +1,38 @@
package main
import (
"fmt"
"log"
"net"
"os/exec"
"time"
"github.com/genkiroid/cert"
"github.com/go-ping/ping"
"github.com/shirou/gopsutil/v3/disk"
)
func main() {
conn, err := net.DialTimeout("tcp", "example.com:80", time.Second*10)
if err != nil {
panic(err)
}
println(conn)
pinger, err := ping.NewPinger("example.com")
if err != nil {
panic(err)
}
pinger.Count = 3
err = pinger.Run() // Blocks until finished.
if err != nil {
panic(err)
}
fmt.Printf("%+v", pinger.Statistics())
certs, err := cert.NewCerts([]string{"example.com"})
if err != nil {
panic(err)
}
fmt.Println(certs)
dparts, _ := disk.Partitions(false)
for _, part := range dparts {
u, _ := disk.Usage(part.Mountpoint)
@@ -18,12 +43,12 @@ func main() {
}
func cmdExec() {
cmd := exec.Command("ping", "qiongbi.net", "-c2")
cmd := exec.Command("ping", "example.com", "-c2")
output, err := cmd.Output()
log.Println("output:", string(output))
log.Println("err:", err)
cmd = exec.Command("ping", "qiongbi", "-c2")
cmd = exec.Command("ping", "example", "-c2")
output, err = cmd.Output()
log.Println("output:", string(output))
log.Println("err:", err)