️ pprof 性能调优 / 计划任务支持秒级

This commit is contained in:
naiba
2021-09-29 19:58:02 +08:00
parent 1f1e0b6db7
commit 47dfa4777b
22 changed files with 149 additions and 105 deletions

View File

@@ -30,25 +30,21 @@ import (
type AgentConfig struct {
SkipConnectionCount bool
SkipProcsCount bool
DisableAutoUpdate bool
Debug bool
Server string
ClientSecret string
}
func init() {
http.DefaultClient.Timeout = time.Second * 5
flag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true
}
var (
version string
agentConf AgentConfig
version string
client pb.NezhaServiceClient
inited bool
)
var (
client pb.NezhaServiceClient
inited bool
agentConf AgentConfig
updateCh = make(chan struct{}) // Agent 自动更新间隔
httpClient = &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
@@ -63,6 +59,11 @@ const (
networkTimeOut = time.Second * 5 // 普通网络超时
)
func init() {
http.DefaultClient.Timeout = time.Second * 5
flag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true
}
func main() {
// 来自于 GoReleaser 的版本号
monitor.Version = version
@@ -71,6 +72,7 @@ func main() {
flag.StringVarP(&agentConf.Server, "server", "s", "localhost:5555", "管理面板RPC端口")
flag.StringVarP(&agentConf.ClientSecret, "password", "p", "", "Agent连接Secret")
flag.BoolVar(&agentConf.SkipConnectionCount, "skip-conn", false, "不监控连接数")
flag.BoolVar(&agentConf.SkipProcsCount, "skip-procs", false, "不监控进程数")
flag.BoolVar(&agentConf.DisableAutoUpdate, "disable-auto-update", false, "禁用自动升级")
flag.Parse()
@@ -88,7 +90,6 @@ func run() {
}
go pty.DownloadDependency()
// 上报服务器信息
go reportState()
// 更新IP信息
@@ -168,8 +169,8 @@ func receiveTasks(tasks pb.NezhaService_RequestTaskClient) error {
}
go func() {
defer func() {
if recover() != nil {
println("task panic", task)
if err := recover(); err != nil {
println("task panic", task, err)
}
}()
doTask(task)
@@ -209,7 +210,7 @@ func reportState() {
if client != nil && inited {
monitor.TrackNetworkSpeed()
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemState(timeOutCtx, monitor.GetState(agentConf.SkipConnectionCount).PB())
_, err = client.ReportSystemState(timeOutCtx, monitor.GetState(agentConf.SkipConnectionCount, agentConf.SkipProcsCount).PB())
cancel()
if err != nil {
println("reportState error", err)
@@ -229,7 +230,7 @@ func doSelfUpdate() {
println("检查更新:", v)
latest, err := selfupdate.UpdateSelf(v, "naiba/nezha")
if err != nil {
println("自动更新失败:", err)
println("更新失败:", err)
return
}
if !latest.Version.Equals(v) {
@@ -282,7 +283,7 @@ func handleHttpGetTask(task *pb.Task, result *pb.TaskResult) {
}
if err == nil {
// 检查 SSL 证书信息
if len(resp.TLS.PeerCertificates) > 0 {
if resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 {
c := resp.TLS.PeerCertificates[0]
result.Data = c.Issuer.CommonName + "|" + c.NotAfter.In(time.Local).String()
}

View File

@@ -5,7 +5,6 @@ import (
"regexp"
"runtime"
"strings"
"sync/atomic"
"syscall"
"time"
@@ -15,20 +14,27 @@ import (
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
"github.com/naiba/nezha/model"
)
var Version string = "debug"
var netInSpeed, netOutSpeed, netInTransfer, netOutTransfer, lastUpdate uint64
var expectDiskFsTypes = []string{
"apfs", "ext4", "ext3", "ext2", "f2fs", "reiserfs", "jfs", "btrfs",
"fuseblk", "zfs", "simfs", "ntfs", "fat32", "exfat", "xfs", "fuse.rclone",
}
var excludeNetInterfaces = []string{
"lo", "tun", "docker", "veth", "br-", "vmbr", "vnet", "kube",
}
var getMacDiskNo = regexp.MustCompile(`\/dev\/disk(\d)s.*`)
var (
Version string = "debug"
expectDiskFsTypes = []string{
"apfs", "ext4", "ext3", "ext2", "f2fs", "reiserfs", "jfs", "btrfs",
"fuseblk", "zfs", "simfs", "ntfs", "fat32", "exfat", "xfs", "fuse.rclone",
}
excludeNetInterfaces = []string{
"lo", "tun", "docker", "veth", "br-", "vmbr", "vnet", "kube",
}
getMacDiskNo = regexp.MustCompile(`\/dev\/disk(\d)s.*`)
)
var (
netInSpeed, netOutSpeed, netInTransfer, netOutTransfer, lastUpdateNetStats uint64
cachedBootTime time.Time
)
func GetHost() *model.Host {
hi, _ := host.Info()
@@ -58,6 +64,8 @@ func GetHost() *model.Host {
swapMemTotal = mv.SwapTotal
}
cachedBootTime = time.Now().Add(time.Duration(-1 * int64(hi.BootTime*1000)))
return &model.Host{
Platform: hi.OS,
PlatformVersion: hi.PlatformVersion,
@@ -74,8 +82,12 @@ func GetHost() *model.Host {
}
}
func GetState(skipConnectionCount bool) *model.HostState {
hi, _ := host.Info()
func GetState(skipConnectionCount bool, skipProcsCount bool) *model.HostState {
var procs []int32
if !skipProcsCount {
procs, _ = process.Pids()
}
mv, _ := mem.VirtualMemory()
var swapMemUsed uint64
@@ -92,11 +104,11 @@ func GetState(skipConnectionCount bool) *model.HostState {
if err == nil {
cpuPercent = cp[0]
}
_, diskUsed := getDiskTotalAndUsed()
loadStat, _ := load.Avg()
var tcpConnCount, udpConnCount uint64
if !skipConnectionCount {
conns, _ := net.Connections("all")
for i := 0; i < len(conns); i++ {
@@ -114,17 +126,17 @@ func GetState(skipConnectionCount bool) *model.HostState {
MemUsed: mv.Total - mv.Available,
SwapUsed: swapMemUsed,
DiskUsed: diskUsed,
NetInTransfer: atomic.LoadUint64(&netInTransfer),
NetOutTransfer: atomic.LoadUint64(&netOutTransfer),
NetInSpeed: atomic.LoadUint64(&netInSpeed),
NetOutSpeed: atomic.LoadUint64(&netOutSpeed),
Uptime: hi.Uptime,
NetInTransfer: netInTransfer,
NetOutTransfer: netOutTransfer,
NetInSpeed: netInSpeed,
NetOutSpeed: netOutSpeed,
Uptime: uint64(time.Since(cachedBootTime).Seconds()),
Load1: loadStat.Load1,
Load5: loadStat.Load5,
Load15: loadStat.Load15,
TcpConnCount: tcpConnCount,
UdpConnCount: udpConnCount,
ProcessCount: hi.Procs,
ProcessCount: uint64(len(procs)),
}
}
@@ -140,14 +152,14 @@ func TrackNetworkSpeed() {
innerNetOutTransfer += v.BytesSent
}
now := uint64(time.Now().Unix())
diff := now - atomic.LoadUint64(&lastUpdate)
diff := now - lastUpdateNetStats
if diff > 0 {
atomic.StoreUint64(&netInSpeed, (innerNetInTransfer-atomic.LoadUint64(&netInTransfer))/diff)
atomic.StoreUint64(&netOutSpeed, (innerNetOutTransfer-atomic.LoadUint64(&netOutTransfer))/diff)
netInSpeed = (innerNetInTransfer - netInTransfer) / diff
netOutSpeed = (innerNetOutTransfer - netOutTransfer) / diff
}
atomic.StoreUint64(&netInTransfer, innerNetInTransfer)
atomic.StoreUint64(&netOutTransfer, innerNetOutTransfer)
atomic.StoreUint64(&lastUpdate, now)
netInTransfer = innerNetInTransfer
netOutTransfer = innerNetOutTransfer
lastUpdateNetStats = now
}
}

View File

@@ -35,13 +35,21 @@ func UpdateIP() {
for {
ipv4 := fetchGeoIP(geoIPApiList, false)
ipv6 := fetchGeoIP(geoIPApiList, true)
cachedIP = fmt.Sprintf("ip(v4:%s,v6:[%s])", ipv4.IP, ipv6.IP)
if ipv4.IP == "" && ipv6.IP == "" {
time.Sleep(time.Minute)
continue
}
if ipv4.IP == "" || ipv6.IP == "" {
cachedIP = fmt.Sprintf("%s%s", ipv4.IP, ipv6.IP)
} else {
cachedIP = fmt.Sprintf("%s/%s", ipv4.IP, ipv6.IP)
}
if ipv4.CountryCode != "" {
cachedCountry = ipv4.CountryCode
} else if ipv6.CountryCode != "" {
cachedCountry = ipv6.CountryCode
}
time.Sleep(time.Minute * 10)
time.Sleep(time.Minute * 30)
}
}

View File

@@ -112,7 +112,7 @@ func ServeWeb(port uint) *http.Server {
},
})
r.Static("/static", "resource/static")
r.LoadHTMLGlob("resource/template/**/*")
r.LoadHTMLGlob("resource/template/**/*.html")
routers(r)
page404 := func(c *gin.Context) {

View File

@@ -267,19 +267,22 @@ func (ma *memberAPI) addOrEditCron(c *gin.Context) {
cr.Cover = cf.Cover
err = json.Unmarshal([]byte(cf.ServersRaw), &cr.Servers)
}
if err == nil {
_, err = cron.ParseStandard(cr.Scheduler)
}
tx := dao.DB.Begin()
if err == nil {
if cf.ID == 0 {
err = dao.DB.Create(&cr).Error
err = tx.Create(&cr).Error
} else {
err = dao.DB.Save(&cr).Error
err = tx.Save(&cr).Error
}
}
if err == nil {
cr.CronID, err = dao.Cron.AddFunc(cr.Scheduler, dao.CronTrigger(cr))
}
if err == nil {
err = tx.Commit().Error
} else {
tx.Rollback()
}
if err != nil {
c.JSON(http.StatusOK, model.Response{
Code: http.StatusBadRequest,

View File

@@ -25,7 +25,7 @@ func init() {
// 初始化 dao 包
dao.Conf = &model.Config{}
dao.Cron = cron.New(cron.WithLocation(shanghai))
dao.Cron = cron.New(cron.WithSeconds(), cron.WithLocation(shanghai))
dao.Crons = make(map[uint64]*model.Cron)
dao.ServerList = make(map[uint64]*model.Server)
dao.SecretToID = make(map[string]uint64)
@@ -60,13 +60,13 @@ func initSystem() {
loadCrons() //加载计划任务
// 清理 服务请求记录 和 流量记录 的旧数据
_, err := dao.Cron.AddFunc("30 3 * * *", cleanMonitorHistory)
_, err := dao.Cron.AddFunc("0 30 3 * * *", cleanMonitorHistory)
if err != nil {
panic(err)
}
// 流量记录打点
_, err = dao.Cron.AddFunc("0 * * * *", recordTransferHourlyUsage)
_, err = dao.Cron.AddFunc("0 0 * * * *", recordTransferHourlyUsage)
if err != nil {
panic(err)
}
@@ -162,10 +162,11 @@ func loadCrons() {
}
cr.CronID, err = dao.Cron.AddFunc(cr.Scheduler, dao.CronTrigger(cr))
if err != nil {
panic(err)
if err == nil {
dao.Crons[cr.ID] = &cr
} else {
log.Println("NEZHA>> 计划任务调度失败", cr, err)
}
dao.Crons[cr.ID] = &cr
}
dao.Cron.Start()
}

View File

@@ -1,7 +1,25 @@
package main
import "github.com/naiba/nezha/cmd/agent/pty"
import (
"log"
"github.com/robfig/cron/v3"
)
func main() {
pty.DownloadDependency()
c := cron.New(cron.WithSeconds())
_, err := c.AddFunc("* * * * * *", func() {
log.Println("bingo second")
})
if err != nil {
panic(err)
}
_, err = c.AddFunc("* * * * *", func() {
log.Println("bingo minute")
})
if err != nil {
panic(err)
}
c.Start()
select {}
}