可选监控哪些分区及网卡

This commit is contained in:
naiba
2022-03-19 17:22:15 +08:00
parent 84866384f4
commit 4fa357755b
7 changed files with 243 additions and 95 deletions

View File

@@ -10,17 +10,23 @@ import (
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/AlecAivazis/survey/v2"
"github.com/blang/semver"
"github.com/go-ping/ping"
"github.com/gorilla/websocket"
"github.com/p14yground/go-github-selfupdate/selfupdate"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/host"
psnet "github.com/shirou/gopsutil/v3/net"
flag "github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"github.com/naiba/nezha/cmd/agent/monitor"
"github.com/naiba/nezha/cmd/agent/processgroup"
@@ -31,7 +37,7 @@ import (
"github.com/naiba/nezha/service/rpc"
)
type AgentConfig struct {
type AgentCliParam struct {
SkipConnectionCount bool
SkipProcsCount bool
DisableAutoUpdate bool
@@ -52,9 +58,10 @@ var (
)
var (
agentConf AgentConfig
updateCh = make(chan struct{}) // Agent 自动更新间隔
httpClient = &http.Client{
agentCliParam AgentCliParam
agentConfig model.AgentConfig
updateCh = make(chan struct{}) // Agent 自动更新间隔
httpClient = &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
@@ -70,6 +77,12 @@ const (
func init() {
http.DefaultClient.Timeout = time.Second * 5
flag.CommandLine.ParseErrorsWhitelist.UnknownFlags = true
ex, err := os.Executable()
if err != nil {
panic(err)
}
agentConfig.Read(filepath.Dir(ex) + "/config.yml")
}
func main() {
@@ -95,24 +108,31 @@ func main() {
// 来自于 GoReleaser 的版本号
monitor.Version = version
flag.BoolVarP(&agentConf.Debug, "debug", "d", false, "开启调试信息")
flag.StringVarP(&agentConf.Server, "server", "s", "localhost:5555", "管理面板RPC端口")
flag.StringVarP(&agentConf.ClientSecret, "password", "p", "", "Agent连接Secret")
flag.IntVar(&agentConf.ReportDelay, "report-delay", 1, "系统状态上报间隔")
flag.BoolVar(&agentConf.SkipConnectionCount, "skip-conn", false, "不监控连接数")
flag.BoolVar(&agentConf.SkipProcsCount, "skip-procs", false, "不监控进程数")
flag.BoolVar(&agentConf.DisableCommandExecute, "disable-command-execute", false, "禁止在此机器上执行命令")
flag.BoolVar(&agentConf.DisableAutoUpdate, "disable-auto-update", false, "禁用自动升级")
flag.BoolVar(&agentConf.DisableForceUpdate, "disable-force-update", false, "禁用强制升级")
flag.BoolVar(&agentConf.TLS, "tls", false, "启用SSL/TLS加密")
var isEditAgentConfig bool
flag.BoolVarP(&agentCliParam.Debug, "debug", "d", false, "开启调试信息")
flag.BoolVarP(&isEditAgentConfig, "edit-agent-config", "", false, "修改要监控的网卡/分区白名单")
flag.StringVarP(&agentCliParam.Server, "server", "s", "localhost:5555", "管理面板RPC端口")
flag.StringVarP(&agentCliParam.ClientSecret, "password", "p", "", "Agent连接Secret")
flag.IntVar(&agentCliParam.ReportDelay, "report-delay", 1, "系统状态上报间隔")
flag.BoolVar(&agentCliParam.SkipConnectionCount, "skip-conn", false, "不监控连接数")
flag.BoolVar(&agentCliParam.SkipProcsCount, "skip-procs", false, "不监控进程数")
flag.BoolVar(&agentCliParam.DisableCommandExecute, "disable-command-execute", false, "禁止在此机器上执行命令")
flag.BoolVar(&agentCliParam.DisableAutoUpdate, "disable-auto-update", false, "禁用自动升级")
flag.BoolVar(&agentCliParam.DisableForceUpdate, "disable-force-update", false, "禁用强制升级")
flag.BoolVar(&agentCliParam.TLS, "tls", false, "启用SSL/TLS加密")
flag.Parse()
if agentConf.ClientSecret == "" {
if isEditAgentConfig {
editAgentConfig()
return
}
if agentCliParam.ClientSecret == "" {
flag.Usage()
return
}
if agentConf.ReportDelay < 1 || agentConf.ReportDelay > 4 {
if agentCliParam.ReportDelay < 1 || agentCliParam.ReportDelay > 4 {
println("report-delay 的区间为 1-4")
return
}
@@ -122,10 +142,10 @@ func main() {
func run() {
auth := rpc.AuthHandler{
ClientSecret: agentConf.ClientSecret,
ClientSecret: agentCliParam.ClientSecret,
}
if !agentConf.DisableCommandExecute {
if !agentCliParam.DisableCommandExecute {
go pty.DownloadDependency()
}
// 上报服务器信息
@@ -133,7 +153,7 @@ func run() {
// 更新IP信息
go monitor.UpdateIP()
if _, err := semver.Parse(version); err == nil && !agentConf.DisableAutoUpdate {
if _, err := semver.Parse(version); err == nil && !agentCliParam.DisableAutoUpdate {
go func() {
for range updateCh {
go func() {
@@ -164,12 +184,12 @@ func run() {
for {
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
var securityOption grpc.DialOption
if agentConf.TLS {
if agentCliParam.TLS {
securityOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12}))
} else {
securityOption = grpc.WithInsecure()
securityOption = grpc.WithTransportCredentials(insecure.NewCredentials())
}
conn, err = grpc.DialContext(timeOutCtx, agentConf.Server, securityOption, grpc.WithPerRPCCredentials(&auth))
conn, err = grpc.DialContext(timeOutCtx, agentCliParam.Server, securityOption, grpc.WithPerRPCCredentials(&auth))
if err != nil {
println("与面板建立连接失败:", err)
cancel()
@@ -180,7 +200,7 @@ func run() {
client = pb.NewNezhaServiceClient(conn)
// 第一步注册
timeOutCtx, cancel = context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemInfo(timeOutCtx, monitor.GetHost().PB())
_, err = client.ReportSystemInfo(timeOutCtx, monitor.GetHost(&agentConfig).PB())
if err != nil {
println("上报系统信息失败:", err)
cancel()
@@ -190,7 +210,7 @@ func run() {
cancel()
inited = true
// 执行 Task
tasks, err := client.RequestTask(context.Background(), monitor.GetHost().PB())
tasks, err := client.RequestTask(context.Background(), monitor.GetHost(&agentConfig).PB())
if err != nil {
println("请求任务失败:", err)
retry()
@@ -254,9 +274,9 @@ func reportState() {
for {
// 为了更准确的记录时段流量inited 后再上传状态信息
if client != nil && inited {
monitor.TrackNetworkSpeed()
monitor.TrackNetworkSpeed(&agentConfig)
timeOutCtx, cancel := context.WithTimeout(context.Background(), networkTimeOut)
_, err = client.ReportSystemState(timeOutCtx, monitor.GetState(agentConf.SkipConnectionCount, agentConf.SkipProcsCount).PB())
_, err = client.ReportSystemState(timeOutCtx, monitor.GetState(&agentConfig, agentCliParam.SkipConnectionCount, agentCliParam.SkipProcsCount).PB())
cancel()
if err != nil {
println("reportState error", err)
@@ -264,10 +284,10 @@ func reportState() {
}
if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) {
lastReportHostInfo = time.Now()
client.ReportSystemInfo(context.Background(), monitor.GetHost().PB())
client.ReportSystemInfo(context.Background(), monitor.GetHost(&agentConfig).PB())
}
}
time.Sleep(time.Second * time.Duration(agentConf.ReportDelay))
time.Sleep(time.Second * time.Duration(agentCliParam.ReportDelay))
}
}
@@ -288,7 +308,7 @@ func doSelfUpdate(useLocalVersion bool) {
}
func handleUpgradeTask(task *pb.Task, result *pb.TaskResult) {
if agentConf.DisableForceUpdate {
if agentCliParam.DisableForceUpdate {
return
}
doSelfUpdate(false)
@@ -347,7 +367,7 @@ func handleHttpGetTask(task *pb.Task, result *pb.TaskResult) {
}
func handleCommandTask(task *pb.Task, result *pb.TaskResult) {
if agentConf.DisableCommandExecute {
if agentCliParam.DisableCommandExecute {
result.Data = "此 Agent 已禁止命令执行"
return
}
@@ -396,7 +416,7 @@ type WindowSize struct {
}
func handleTerminalTask(task *pb.Task) {
if agentConf.DisableCommandExecute {
if agentCliParam.DisableCommandExecute {
println("此 Agent 已禁止命令执行")
return
}
@@ -411,7 +431,7 @@ func handleTerminalTask(task *pb.Task) {
protocol += "s"
}
header := http.Header{}
header.Add("Secret", agentConf.ClientSecret)
header.Add("Secret", agentCliParam.ClientSecret)
conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s://%s/terminal/%s", protocol, terminal.Host, terminal.Session), header)
if err != nil {
println("Terminal 连接失败:", err)
@@ -480,8 +500,72 @@ func handleTerminalTask(task *pb.Task) {
}
}
func editAgentConfig() {
nc, err := psnet.IOCounters(true)
if err != nil {
panic(err)
}
var nicAllowlistOptions []string
for _, v := range nc {
nicAllowlistOptions = append(nicAllowlistOptions, v.Name)
}
var diskAllowlistOptions []string
diskList, err := disk.Partitions(false)
if err != nil {
panic(err)
}
for _, p := range diskList {
diskAllowlistOptions = append(diskAllowlistOptions, fmt.Sprintf("%s\t%s\t%s", p.Mountpoint, p.Fstype, p.Device))
}
var qs = []*survey.Question{
{
Name: "nic",
Prompt: &survey.MultiSelect{
Message: "选择要监控的网卡",
Options: nicAllowlistOptions,
},
},
{
Name: "disk",
Prompt: &survey.MultiSelect{
Message: "选择要监控的硬盘分区",
Options: diskAllowlistOptions,
},
},
}
answers := struct {
Nic []string
Disk []string
}{}
err = survey.Ask(qs, &answers, survey.WithValidator(survey.Required))
if err != nil {
fmt.Println("选择错误", err.Error())
return
}
agentConfig.HardDrivePartitionAllowlist = []string{}
for _, v := range answers.Disk {
agentConfig.HardDrivePartitionAllowlist = append(agentConfig.HardDrivePartitionAllowlist, strings.Split(v, "\t")[0])
}
agentConfig.NICAllowlist = make(map[string]bool)
for _, v := range answers.Nic {
agentConfig.NICAllowlist[v] = true
}
if err = agentConfig.Save(); err != nil {
panic(err)
}
fmt.Println("修改自定义配置成功,重启 Agnet 后生效")
}
func println(v ...interface{}) {
if agentConf.Debug {
if agentCliParam.Debug {
fmt.Printf("NEZHA@%s>> ", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(v...)
}

View File

@@ -3,7 +3,6 @@ package monitor
import (
"fmt"
"os/exec"
"regexp"
"runtime"
"strconv"
"strings"
@@ -31,7 +30,6 @@ var (
excludeNetInterfaces = []string{
"lo", "tun", "docker", "veth", "br-", "vmbr", "vnet", "kube",
}
getMacDiskNo = regexp.MustCompile(`\/dev\/disk(\d)s.*`)
)
var (
@@ -39,7 +37,7 @@ var (
cachedBootTime time.Time
)
func GetHost() *model.Host {
func GetHost(agentConfig *model.AgentConfig) *model.Host {
hi, _ := host.Info()
var cpuType string
if hi.VirtualizationSystem != "" {
@@ -57,7 +55,7 @@ func GetHost() *model.Host {
cpus = append(cpus, fmt.Sprintf("%s %d %s Core", model, count, cpuType))
}
mv, _ := mem.VirtualMemory()
diskTotal, _ := getDiskTotalAndUsed()
diskTotal, _ := getDiskTotalAndUsed(agentConfig)
var swapMemTotal uint64
if runtime.GOOS == "windows" {
@@ -87,7 +85,7 @@ func GetHost() *model.Host {
}
}
func GetState(skipConnectionCount bool, skipProcsCount bool) *model.HostState {
func GetState(agentConfig *model.AgentConfig, skipConnectionCount bool, skipProcsCount bool) *model.HostState {
var procs []int32
if !skipProcsCount {
procs, _ = process.Pids()
@@ -110,7 +108,7 @@ func GetState(skipConnectionCount bool, skipProcsCount bool) *model.HostState {
cpuPercent = cp[0]
}
_, diskUsed := getDiskTotalAndUsed()
_, diskUsed := getDiskTotalAndUsed(agentConfig)
loadStat, _ := load.Avg()
var tcpConnCount, udpConnCount uint64
@@ -157,13 +155,19 @@ func GetState(skipConnectionCount bool, skipProcsCount bool) *model.HostState {
}
}
func TrackNetworkSpeed() {
func TrackNetworkSpeed(agentConfig *model.AgentConfig) {
var innerNetInTransfer, innerNetOutTransfer uint64
nc, err := net.IOCounters(true)
if err == nil {
for _, v := range nc {
if isListContainsStr(excludeNetInterfaces, v.Name) {
continue
if len(agentConfig.NICAllowlist) > 0 {
if !agentConfig.NICAllowlist[v.Name] {
continue
}
} else {
if isListContainsStr(excludeNetInterfaces, v.Name) {
continue
}
}
innerNetInTransfer += v.BytesRecv
innerNetOutTransfer += v.BytesSent
@@ -180,49 +184,49 @@ func TrackNetworkSpeed() {
}
}
func getDiskTotalAndUsed() (total uint64, used uint64) {
diskList, _ := disk.Partitions(false)
func getDiskTotalAndUsed(agentConfig *model.AgentConfig) (total uint64, used uint64) {
devices := make(map[string]string)
countedDiskForMac := make(map[string]struct{})
for _, d := range diskList {
fsType := strings.ToLower(d.Fstype)
// 不统计 K8s 的虚拟挂载点https://github.com/shirou/gopsutil/issues/1007
if devices[d.Device] == "" && isListContainsStr(expectDiskFsTypes, fsType) && !strings.Contains(d.Mountpoint, "/var/lib/kubelet") {
devices[d.Device] = d.Mountpoint
if len(agentConfig.HardDrivePartitionAllowlist) > 0 {
// 如果配置了白名单,使用白名单的列表
for i, v := range agentConfig.HardDrivePartitionAllowlist {
devices[strconv.Itoa(i)] = v
}
} else {
// 否则使用默认过滤规则
diskList, _ := disk.Partitions(false)
for _, d := range diskList {
fsType := strings.ToLower(d.Fstype)
// 不统计 K8s 的虚拟挂载点https://github.com/shirou/gopsutil/issues/1007
if devices[d.Device] == "" && isListContainsStr(expectDiskFsTypes, fsType) && !strings.Contains(d.Mountpoint, "/var/lib/kubelet") {
devices[d.Device] = d.Mountpoint
}
}
}
for device, mountPath := range devices {
diskUsageOf, _ := disk.Usage(mountPath)
// 这里是针对 Mac 机器的处理https://github.com/giampaolo/psutil/issues/1509
matches := getMacDiskNo.FindStringSubmatch(device)
if len(matches) == 2 {
if _, has := countedDiskForMac[matches[1]]; !has {
countedDiskForMac[matches[1]] = struct{}{}
total += diskUsageOf.Total
}
} else {
for _, mountPath := range devices {
diskUsageOf, err := disk.Usage(mountPath)
if err == nil {
total += diskUsageOf.Total
used += diskUsageOf.Used
}
used += diskUsageOf.Used
}
// Fallback 到这个方法,仅统计根路径,适用于OpenVZ之类的.
if runtime.GOOS == "linux" {
if total == 0 && used == 0 {
cmd := exec.Command("df")
out, err := cmd.CombinedOutput()
if err == nil {
s := strings.Split(string(out), "\n")
for _, c := range s {
info := strings.Fields(c)
if len(info) == 6 {
if info[5] == "/" {
total, _ = strconv.ParseUint(info[1], 0, 64)
used, _ = strconv.ParseUint(info[2], 0, 64)
// 默认获取的是1K块为单位的.
total = total * 1024
used = used * 1024
}
if runtime.GOOS == "linux" && total == 0 && used == 0 {
cmd := exec.Command("df")
out, err := cmd.CombinedOutput()
if err == nil {
s := strings.Split(string(out), "\n")
for _, c := range s {
info := strings.Fields(c)
if len(info) == 6 {
if info[5] == "/" {
total, _ = strconv.ParseUint(info[1], 0, 64)
used, _ = strconv.ParseUint(info[2], 0, 64)
// 默认获取的是1K块为单位的.
total = total * 1024
used = used * 1024
}
}
}